app.chat

Chat history storage and CSV retrieval for post-analysis Q&A.

This package provides the ChatManager class for persisting per-case chat conversations and assembling AI prompt context, as well as CSV retrieval utilities for injecting artifact data into chat prompts.

Modules:

manager: Core ChatManager class (history, context, token budgeting). csv_retrieval: Heuristic CSV matching and row formatting.

 1"""Chat history storage and CSV retrieval for post-analysis Q&A.
 2
 3This package provides the :class:`ChatManager` class for persisting
 4per-case chat conversations and assembling AI prompt context, as well
 5as CSV retrieval utilities for injecting artifact data into chat prompts.
 6
 7Modules:
 8    manager: Core ChatManager class (history, context, token budgeting).
 9    csv_retrieval: Heuristic CSV matching and row formatting.
10"""
11
12from .manager import ChatManager
13
14__all__ = ["ChatManager", "csv_retrieval", "manager"]
class ChatManager:
 58class ChatManager:
 59    """Persist and retrieve case-scoped chat history records.
 60
 61    Each instance is bound to a single case directory and manages a
 62    ``chat_history.jsonl`` file containing timestamped user/assistant
 63    message pairs.  The manager also assembles context blocks for AI
 64    prompts by combining analysis results, investigation context, and
 65    system metadata.
 66
 67    Attributes:
 68        MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
 69        case_dir: Resolved path to the case directory.
 70        chat_file: Path to the ``chat_history.jsonl`` file.
 71    """
 72
 73    MAX_CONTEXT_TOKENS = 100000
 74
 75    def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None:
 76        """Initialise the chat manager for a case directory.
 77
 78        Args:
 79            case_dir: Path to the case directory.  Created if it does
 80                not exist when messages are first written.
 81            max_context_tokens: Optional override for the maximum token
 82                budget.  Falls back to :attr:`MAX_CONTEXT_TOKENS` when
 83                *None* or invalid.
 84        """
 85        self.case_dir = Path(case_dir)
 86        self.chat_file = self.case_dir / "chat_history.jsonl"
 87        self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)
 88
 89    # ------------------------------------------------------------------
 90    # Message persistence
 91    # ------------------------------------------------------------------
 92
 93    def add_message(
 94        self,
 95        role: str,
 96        content: str,
 97        metadata: dict[str, Any] | None = None,
 98    ) -> None:
 99        """Append one message entry to the case chat JSONL history.
100
101        The message is written as a single JSON line with a UTC ISO 8601
102        timestamp.  The file is opened, written, and flushed for each call
103        to minimise data loss on unexpected termination.
104
105        Args:
106            role: Message role -- must be ``"user"`` or ``"assistant"``.
107            content: The message text.
108            metadata: Optional dictionary of extra metadata to attach to
109                the record (e.g. token counts, retrieval info).
110
111        Raises:
112            ValueError: If *role* is not in :data:`VALID_ROLES`.
113            TypeError: If *content* is not a string or *metadata* is not a
114                dict when provided.
115        """
116        normalized_role = str(role).strip().lower()
117        if normalized_role not in VALID_ROLES:
118            allowed = ", ".join(sorted(VALID_ROLES))
119            raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.")
120        if not isinstance(content, str):
121            raise TypeError("content must be a string.")
122        if metadata is not None and not isinstance(metadata, dict):
123            raise TypeError("metadata must be a dictionary when provided.")
124
125        message: dict[str, Any] = {
126            "timestamp": _utc_now_iso8601_ms(),
127            "role": normalized_role,
128            "content": content,
129        }
130        if metadata is not None:
131            message["metadata"] = metadata
132
133        line = json.dumps(message, separators=(",", ":")) + "\n"
134        self.chat_file.parent.mkdir(parents=True, exist_ok=True)
135        with self.chat_file.open("ab", buffering=0) as chat_stream:
136            chat_stream.write(line.encode("utf-8"))
137            chat_stream.flush()
138
139    def get_history(self) -> list[dict[str, Any]]:
140        """Load the full chat history in insertion order.
141
142        Reads every line from ``chat_history.jsonl``, skipping blank lines
143        and malformed JSON entries (which are logged as warnings).
144
145        Returns:
146            A list of message dictionaries, each containing at least
147            ``timestamp``, ``role``, and ``content`` keys.
148        """
149        if not self.chat_file.exists():
150            return []
151
152        history: list[dict[str, Any]] = []
153        with self.chat_file.open("r", encoding="utf-8") as chat_stream:
154            for line_no, raw_line in enumerate(chat_stream, 1):
155                line = raw_line.strip()
156                if not line:
157                    continue
158                try:
159                    record = json.loads(line)
160                except json.JSONDecodeError:
161                    log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file)
162                    continue
163                if isinstance(record, dict):
164                    history.append(record)
165        return history
166
167    def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]:
168        """Return the most recent complete user/assistant message pairs.
169
170        Messages are paired in order: a ``user`` message followed by the
171        next ``assistant`` message forms a pair.  Only the last
172        *max_pairs* complete pairs are returned.
173
174        Args:
175            max_pairs: Maximum number of user/assistant pairs to return.
176
177        Returns:
178            A flat list of message dictionaries alternating
179            ``[user, assistant, user, assistant, ...]``.
180        """
181        if max_pairs <= 0:
182            return []
183
184        history = self.get_history()
185        paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = []
186        pending_user: dict[str, Any] | None = None
187
188        for message in history:
189            role = message.get("role")
190            if role == "user":
191                pending_user = message
192                continue
193            if role == "assistant" and pending_user is not None:
194                paired_messages.append((pending_user, message))
195                pending_user = None
196
197        recent_pairs = paired_messages[-max_pairs:]
198        recent_history: list[dict[str, Any]] = []
199        for user_message, assistant_message in recent_pairs:
200            recent_history.append(user_message)
201            recent_history.append(assistant_message)
202        return recent_history
203
204    def clear(self) -> None:
205        """Delete the chat history file when present.
206
207        This is a destructive operation -- all chat messages for this
208        case are permanently removed.
209        """
210        if self.chat_file.exists():
211            self.chat_file.unlink()
212
213    # ------------------------------------------------------------------
214    # Context assembly
215    # ------------------------------------------------------------------
216
217    def build_chat_context(
218        self,
219        analysis_results: Mapping[str, Any] | None,
220        investigation_context: str,
221        metadata: Mapping[str, Any] | None,
222    ) -> str:
223        """Build a compact, complete context block for chat prompts.
224
225        Assembles investigation context, system metadata (hostname, OS,
226        domain), executive summary, and per-artifact findings into a
227        single multi-section text string suitable for injection into an
228        AI system prompt.
229
230        Args:
231            analysis_results: The full analysis results mapping (may
232                contain ``summary`` and ``per_artifact`` keys).
233            investigation_context: Free-text investigation context
234                provided by the analyst.
235            metadata: Evidence metadata mapping (hostname, os_version,
236                domain, etc.).
237
238        Returns:
239            A formatted multi-section context string.
240        """
241        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
242        per_artifact_lines = self._format_per_artifact_findings(analysis)
243        findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}"
244        return self._assemble_context(
245            analysis_results, investigation_context, metadata, findings_section,
246        )
247
248    def rebuild_context_with_compressed_findings(
249        self,
250        analysis_results: Mapping[str, Any] | None,
251        investigation_context: str,
252        metadata: Mapping[str, Any] | None,
253        compressed_findings: str,
254    ) -> str:
255        """Rebuild the context block using pre-compressed per-artifact findings.
256
257        Identical to :meth:`build_chat_context` except that the
258        per-artifact section is replaced with an externally compressed
259        version of the findings, used when the full context exceeds the
260        token budget.
261
262        Args:
263            analysis_results: The full analysis results mapping.
264            investigation_context: Free-text investigation context.
265            metadata: Evidence metadata mapping.
266            compressed_findings: Pre-compressed per-artifact findings
267                text to substitute into the context block.
268
269        Returns:
270            A formatted multi-section context string with compressed
271            findings.
272        """
273        findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}"
274        return self._assemble_context(
275            analysis_results, investigation_context, metadata, findings_section,
276        )
277
278    def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
279        """Return *True* when the context block exceeds 80 % of the token budget.
280
281        Args:
282            context_block: The assembled context text to measure.
283            token_budget: Maximum token allowance for the context window.
284
285        Returns:
286            *True* if the estimated token count of *context_block* exceeds
287            80 % of *token_budget*, *False* otherwise.
288        """
289        if token_budget <= 0:
290            return False
291        return self.estimate_token_count(context_block) > int(token_budget * 0.8)
292
293    # ------------------------------------------------------------------
294    # CSV data retrieval (delegates to csv_retrieval module)
295    # ------------------------------------------------------------------
296
297    def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]:
298        """Best-effort retrieval of raw CSV rows for data-centric chat questions.
299
300        Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`.
301
302        Args:
303            question: The user's chat question text.
304            parsed_dir: Path to the directory containing parsed artifact
305                CSV files.
306
307        Returns:
308            A dictionary with a ``retrieved`` boolean.  When *True*, also
309            includes ``artifacts`` (list of matched CSV filenames) and
310            ``data`` (formatted row text).
311        """
312        return _retrieve_csv_data(question, parsed_dir)
313
314    # ------------------------------------------------------------------
315    # Token budgeting
316    # ------------------------------------------------------------------
317
318    def estimate_token_count(self, text: str) -> int:
319        """Estimate token count using a rough 4-characters-per-token ratio.
320
321        Args:
322            text: The string to estimate tokens for.
323
324        Returns:
325            Approximate token count (integer).
326        """
327        if not text:
328            return 0
329        return int(len(text) / 4)
330
331    def fit_history(
332        self,
333        history: list[dict[str, Any]],
334        max_tokens: int,
335    ) -> list[dict[str, Any]]:
336        """Trim conversation history to fit within *max_tokens*.
337
338        Pairs up user/assistant messages and drops the oldest complete
339        pairs first until the estimated total token count fits within
340        the budget.
341
342        Args:
343            history: Flat list of message dictionaries to trim.
344            max_tokens: Maximum token budget for the returned history.
345
346        Returns:
347            A (possibly shorter) flat list of message dictionaries that
348            fits within *max_tokens*.
349        """
350        if max_tokens <= 0:
351            return []
352        if not history:
353            return []
354
355        # Pair up messages so we can drop oldest pairs.
356        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
357        pending_user: dict[str, Any] | None = None
358        for msg in history:
359            role = msg.get("role")
360            if role == "user":
361                pending_user = msg
362            elif role == "assistant" and pending_user is not None:
363                pairs.append((pending_user, msg))
364                pending_user = None
365
366        # Drop oldest pairs until total fits.
367        while pairs:
368            total = sum(
369                self.estimate_token_count(str(u.get("content", "")))
370                + self.estimate_token_count(str(a.get("content", "")))
371                for u, a in pairs
372            )
373            if total <= max_tokens:
374                break
375            pairs.pop(0)
376
377        result: list[dict[str, Any]] = []
378        for user_msg, assistant_msg in pairs:
379            result.append(user_msg)
380            result.append(assistant_msg)
381        return result
382
383    # ------------------------------------------------------------------
384    # Private helpers
385    # ------------------------------------------------------------------
386
387    @classmethod
388    def _resolve_max_context_tokens(cls, value: Any) -> int:
389        """Coerce *value* to a positive integer token limit.
390
391        Falls back to :attr:`MAX_CONTEXT_TOKENS` when *value* is *None*
392        or cannot be converted to an integer.
393
394        Args:
395            value: Candidate token limit value.
396
397        Returns:
398            A positive integer (minimum 1).
399        """
400        try:
401            resolved = int(value) if value is not None else int(cls.MAX_CONTEXT_TOKENS)
402        except (TypeError, ValueError):
403            resolved = int(cls.MAX_CONTEXT_TOKENS)
404        return max(1, resolved)
405
406    def _assemble_context(
407        self,
408        analysis_results: Mapping[str, Any] | None,
409        investigation_context: str,
410        metadata: Mapping[str, Any] | None,
411        findings_section: str,
412    ) -> str:
413        """Assemble context sections shared by build and rebuild methods.
414
415        Extracts metadata fields, formats the standard sections, and
416        appends the caller-provided findings section.
417
418        Args:
419            analysis_results: The full analysis results mapping.
420            investigation_context: Free-text investigation context.
421            metadata: Evidence metadata mapping.
422            findings_section: Pre-formatted findings section string
423                (including its header line).
424
425        Returns:
426            A formatted multi-section context string.
427        """
428        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
429        metadata_map = metadata if isinstance(metadata, Mapping) else {}
430
431        hostname = _stringify(metadata_map.get("hostname"), default="Unknown")
432        os_value = _stringify(
433            metadata_map.get("os_version") or metadata_map.get("os"),
434            default="Unknown",
435        )
436        domain = _stringify(metadata_map.get("domain"), default="Unknown")
437        summary = _stringify(analysis.get("summary"), default="No executive summary available.")
438        context_text = _stringify(
439            investigation_context,
440            default="No investigation context provided.",
441        )
442
443        sections = [
444            f"Investigation Context:\n{context_text}",
445            (
446                "System Under Analysis:\n"
447                f"- Hostname: {hostname}\n"
448                f"- OS: {os_value}\n"
449                f"- Domain: {domain}"
450            ),
451            f"Executive Summary:\n{summary}",
452            findings_section,
453        ]
454        return "\n\n".join(sections)
455
456    def _format_per_artifact_findings(self, analysis_results: Mapping[str, Any]) -> str:
457        """Format per-artifact findings as a bulleted text block.
458
459        Handles multiple input shapes (dict keyed by artifact name, list
460        of finding dicts, or list of raw strings) and normalises them
461        into ``- artifact_name: analysis_text`` lines.
462
463        Args:
464            analysis_results: The full analysis results mapping.
465
466        Returns:
467            A newline-joined string of bullet-pointed findings, or a
468            placeholder message when no findings are available.
469        """
470        raw_findings = analysis_results.get("per_artifact")
471        if raw_findings is None:
472            raw_findings = analysis_results.get("per_artifact_findings")
473
474        findings: list[tuple[str, str]] = []
475        if isinstance(raw_findings, Mapping):
476            items: list[Any] = []
477            for artifact_name, value in raw_findings.items():
478                if isinstance(value, Mapping):
479                    merged = dict(value)
480                    merged.setdefault("artifact_name", artifact_name)
481                    items.append(merged)
482                else:
483                    items.append({"artifact_name": artifact_name, "analysis": value})
484        elif isinstance(raw_findings, list):
485            items = list(raw_findings)
486        else:
487            items = []
488
489        for item in items:
490            if isinstance(item, Mapping):
491                artifact_name = _stringify(
492                    item.get("artifact_name") or item.get("name") or item.get("artifact_key"),
493                    default="Unknown Artifact",
494                )
495                analysis_text = _stringify(
496                    item.get("analysis")
497                    or item.get("finding")
498                    or item.get("summary")
499                    or item.get("text"),
500                )
501            else:
502                artifact_name = "Unknown Artifact"
503                analysis_text = _stringify(item)
504
505            if analysis_text:
506                findings.append((artifact_name, analysis_text))
507
508        if not findings:
509            return "- No per-artifact findings available."
510
511        return "\n".join(
512            f"- {artifact_name}: {analysis_text}"
513            for artifact_name, analysis_text in findings
514        )

Persist and retrieve case-scoped chat history records.

Each instance is bound to a single case directory and manages a chat_history.jsonl file containing timestamped user/assistant message pairs. The manager also assembles context blocks for AI prompts by combining analysis results, investigation context, and system metadata.

Attributes:
  • MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
  • case_dir: Resolved path to the case directory.
  • chat_file: Path to the chat_history.jsonl file.
ChatManager(case_dir: str | pathlib.Path, max_context_tokens: int | None = None)
75    def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None:
76        """Initialise the chat manager for a case directory.
77
78        Args:
79            case_dir: Path to the case directory.  Created if it does
80                not exist when messages are first written.
81            max_context_tokens: Optional override for the maximum token
82                budget.  Falls back to :attr:`MAX_CONTEXT_TOKENS` when
83                *None* or invalid.
84        """
85        self.case_dir = Path(case_dir)
86        self.chat_file = self.case_dir / "chat_history.jsonl"
87        self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)

Initialise the chat manager for a case directory.

Arguments:
  • case_dir: Path to the case directory. Created if it does not exist when messages are first written.
  • max_context_tokens: Optional override for the maximum token budget. Falls back to MAX_CONTEXT_TOKENS when None or invalid.
MAX_CONTEXT_TOKENS = 100000
case_dir
chat_file
def add_message( self, role: str, content: str, metadata: dict[str, typing.Any] | None = None) -> None:
 93    def add_message(
 94        self,
 95        role: str,
 96        content: str,
 97        metadata: dict[str, Any] | None = None,
 98    ) -> None:
 99        """Append one message entry to the case chat JSONL history.
100
101        The message is written as a single JSON line with a UTC ISO 8601
102        timestamp.  The file is opened, written, and flushed for each call
103        to minimise data loss on unexpected termination.
104
105        Args:
106            role: Message role -- must be ``"user"`` or ``"assistant"``.
107            content: The message text.
108            metadata: Optional dictionary of extra metadata to attach to
109                the record (e.g. token counts, retrieval info).
110
111        Raises:
112            ValueError: If *role* is not in :data:`VALID_ROLES`.
113            TypeError: If *content* is not a string or *metadata* is not a
114                dict when provided.
115        """
116        normalized_role = str(role).strip().lower()
117        if normalized_role not in VALID_ROLES:
118            allowed = ", ".join(sorted(VALID_ROLES))
119            raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.")
120        if not isinstance(content, str):
121            raise TypeError("content must be a string.")
122        if metadata is not None and not isinstance(metadata, dict):
123            raise TypeError("metadata must be a dictionary when provided.")
124
125        message: dict[str, Any] = {
126            "timestamp": _utc_now_iso8601_ms(),
127            "role": normalized_role,
128            "content": content,
129        }
130        if metadata is not None:
131            message["metadata"] = metadata
132
133        line = json.dumps(message, separators=(",", ":")) + "\n"
134        self.chat_file.parent.mkdir(parents=True, exist_ok=True)
135        with self.chat_file.open("ab", buffering=0) as chat_stream:
136            chat_stream.write(line.encode("utf-8"))
137            chat_stream.flush()

Append one message entry to the case chat JSONL history.

The message is written as a single JSON line with a UTC ISO 8601 timestamp. The file is opened, written, and flushed for each call to minimise data loss on unexpected termination.

Arguments:
  • role: Message role -- must be "user" or "assistant".
  • content: The message text.
  • metadata: Optional dictionary of extra metadata to attach to the record (e.g. token counts, retrieval info).
Raises:
  • ValueError: If role is not in VALID_ROLES.
  • TypeError: If content is not a string or metadata is not a dict when provided.
def get_history(self) -> list[dict[str, typing.Any]]:
139    def get_history(self) -> list[dict[str, Any]]:
140        """Load the full chat history in insertion order.
141
142        Reads every line from ``chat_history.jsonl``, skipping blank lines
143        and malformed JSON entries (which are logged as warnings).
144
145        Returns:
146            A list of message dictionaries, each containing at least
147            ``timestamp``, ``role``, and ``content`` keys.
148        """
149        if not self.chat_file.exists():
150            return []
151
152        history: list[dict[str, Any]] = []
153        with self.chat_file.open("r", encoding="utf-8") as chat_stream:
154            for line_no, raw_line in enumerate(chat_stream, 1):
155                line = raw_line.strip()
156                if not line:
157                    continue
158                try:
159                    record = json.loads(line)
160                except json.JSONDecodeError:
161                    log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file)
162                    continue
163                if isinstance(record, dict):
164                    history.append(record)
165        return history

Load the full chat history in insertion order.

Reads every line from chat_history.jsonl, skipping blank lines and malformed JSON entries (which are logged as warnings).

Returns:

A list of message dictionaries, each containing at least timestamp, role, and content keys.

def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, typing.Any]]:
167    def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]:
168        """Return the most recent complete user/assistant message pairs.
169
170        Messages are paired in order: a ``user`` message followed by the
171        next ``assistant`` message forms a pair.  Only the last
172        *max_pairs* complete pairs are returned.
173
174        Args:
175            max_pairs: Maximum number of user/assistant pairs to return.
176
177        Returns:
178            A flat list of message dictionaries alternating
179            ``[user, assistant, user, assistant, ...]``.
180        """
181        if max_pairs <= 0:
182            return []
183
184        history = self.get_history()
185        paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = []
186        pending_user: dict[str, Any] | None = None
187
188        for message in history:
189            role = message.get("role")
190            if role == "user":
191                pending_user = message
192                continue
193            if role == "assistant" and pending_user is not None:
194                paired_messages.append((pending_user, message))
195                pending_user = None
196
197        recent_pairs = paired_messages[-max_pairs:]
198        recent_history: list[dict[str, Any]] = []
199        for user_message, assistant_message in recent_pairs:
200            recent_history.append(user_message)
201            recent_history.append(assistant_message)
202        return recent_history

Return the most recent complete user/assistant message pairs.

Messages are paired in order: a user message followed by the next assistant message forms a pair. Only the last max_pairs complete pairs are returned.

Arguments:
  • max_pairs: Maximum number of user/assistant pairs to return.
Returns:

A flat list of message dictionaries alternating [user, assistant, user, assistant, ...].

def clear(self) -> None:
204    def clear(self) -> None:
205        """Delete the chat history file when present.
206
207        This is a destructive operation -- all chat messages for this
208        case are permanently removed.
209        """
210        if self.chat_file.exists():
211            self.chat_file.unlink()

Delete the chat history file when present.

This is a destructive operation -- all chat messages for this case are permanently removed.

def build_chat_context( self, analysis_results: Optional[Mapping[str, Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]]) -> str:
217    def build_chat_context(
218        self,
219        analysis_results: Mapping[str, Any] | None,
220        investigation_context: str,
221        metadata: Mapping[str, Any] | None,
222    ) -> str:
223        """Build a compact, complete context block for chat prompts.
224
225        Assembles investigation context, system metadata (hostname, OS,
226        domain), executive summary, and per-artifact findings into a
227        single multi-section text string suitable for injection into an
228        AI system prompt.
229
230        Args:
231            analysis_results: The full analysis results mapping (may
232                contain ``summary`` and ``per_artifact`` keys).
233            investigation_context: Free-text investigation context
234                provided by the analyst.
235            metadata: Evidence metadata mapping (hostname, os_version,
236                domain, etc.).
237
238        Returns:
239            A formatted multi-section context string.
240        """
241        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
242        per_artifact_lines = self._format_per_artifact_findings(analysis)
243        findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}"
244        return self._assemble_context(
245            analysis_results, investigation_context, metadata, findings_section,
246        )

Build a compact, complete context block for chat prompts.

Assembles investigation context, system metadata (hostname, OS, domain), executive summary, and per-artifact findings into a single multi-section text string suitable for injection into an AI system prompt.

Arguments:
  • analysis_results: The full analysis results mapping (may contain summary and per_artifact keys).
  • investigation_context: Free-text investigation context provided by the analyst.
  • metadata: Evidence metadata mapping (hostname, os_version, domain, etc.).
Returns:

A formatted multi-section context string.

def rebuild_context_with_compressed_findings( self, analysis_results: Optional[Mapping[str, Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]], compressed_findings: str) -> str:
248    def rebuild_context_with_compressed_findings(
249        self,
250        analysis_results: Mapping[str, Any] | None,
251        investigation_context: str,
252        metadata: Mapping[str, Any] | None,
253        compressed_findings: str,
254    ) -> str:
255        """Rebuild the context block using pre-compressed per-artifact findings.
256
257        Identical to :meth:`build_chat_context` except that the
258        per-artifact section is replaced with an externally compressed
259        version of the findings, used when the full context exceeds the
260        token budget.
261
262        Args:
263            analysis_results: The full analysis results mapping.
264            investigation_context: Free-text investigation context.
265            metadata: Evidence metadata mapping.
266            compressed_findings: Pre-compressed per-artifact findings
267                text to substitute into the context block.
268
269        Returns:
270            A formatted multi-section context string with compressed
271            findings.
272        """
273        findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}"
274        return self._assemble_context(
275            analysis_results, investigation_context, metadata, findings_section,
276        )

Rebuild the context block using pre-compressed per-artifact findings.

Identical to build_chat_context() except that the per-artifact section is replaced with an externally compressed version of the findings, used when the full context exceeds the token budget.

Arguments:
  • analysis_results: The full analysis results mapping.
  • investigation_context: Free-text investigation context.
  • metadata: Evidence metadata mapping.
  • compressed_findings: Pre-compressed per-artifact findings text to substitute into the context block.
Returns:

A formatted multi-section context string with compressed findings.

def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
278    def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
279        """Return *True* when the context block exceeds 80 % of the token budget.
280
281        Args:
282            context_block: The assembled context text to measure.
283            token_budget: Maximum token allowance for the context window.
284
285        Returns:
286            *True* if the estimated token count of *context_block* exceeds
287            80 % of *token_budget*, *False* otherwise.
288        """
289        if token_budget <= 0:
290            return False
291        return self.estimate_token_count(context_block) > int(token_budget * 0.8)

Return True when the context block exceeds 80 % of the token budget.

Arguments:
  • context_block: The assembled context text to measure.
  • token_budget: Maximum token allowance for the context window.
Returns:

True if the estimated token count of context_block exceeds 80 % of token_budget, False otherwise.

def retrieve_csv_data( self, question: str, parsed_dir: str | pathlib.Path) -> dict[str, typing.Any]:
297    def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]:
298        """Best-effort retrieval of raw CSV rows for data-centric chat questions.
299
300        Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`.
301
302        Args:
303            question: The user's chat question text.
304            parsed_dir: Path to the directory containing parsed artifact
305                CSV files.
306
307        Returns:
308            A dictionary with a ``retrieved`` boolean.  When *True*, also
309            includes ``artifacts`` (list of matched CSV filenames) and
310            ``data`` (formatted row text).
311        """
312        return _retrieve_csv_data(question, parsed_dir)

Best-effort retrieval of raw CSV rows for data-centric chat questions.

Delegates to ~app.chat.csv_retrieval.retrieve_csv_data().

Arguments:
  • question: The user's chat question text.
  • parsed_dir: Path to the directory containing parsed artifact CSV files.
Returns:

A dictionary with a retrieved boolean. When True, also includes artifacts (list of matched CSV filenames) and data (formatted row text).

def estimate_token_count(self, text: str) -> int:
318    def estimate_token_count(self, text: str) -> int:
319        """Estimate token count using a rough 4-characters-per-token ratio.
320
321        Args:
322            text: The string to estimate tokens for.
323
324        Returns:
325            Approximate token count (integer).
326        """
327        if not text:
328            return 0
329        return int(len(text) / 4)

Estimate token count using a rough 4-characters-per-token ratio.

Arguments:
  • text: The string to estimate tokens for.
Returns:

Approximate token count (integer).

def fit_history( self, history: list[dict[str, typing.Any]], max_tokens: int) -> list[dict[str, typing.Any]]:
331    def fit_history(
332        self,
333        history: list[dict[str, Any]],
334        max_tokens: int,
335    ) -> list[dict[str, Any]]:
336        """Trim conversation history to fit within *max_tokens*.
337
338        Pairs up user/assistant messages and drops the oldest complete
339        pairs first until the estimated total token count fits within
340        the budget.
341
342        Args:
343            history: Flat list of message dictionaries to trim.
344            max_tokens: Maximum token budget for the returned history.
345
346        Returns:
347            A (possibly shorter) flat list of message dictionaries that
348            fits within *max_tokens*.
349        """
350        if max_tokens <= 0:
351            return []
352        if not history:
353            return []
354
355        # Pair up messages so we can drop oldest pairs.
356        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
357        pending_user: dict[str, Any] | None = None
358        for msg in history:
359            role = msg.get("role")
360            if role == "user":
361                pending_user = msg
362            elif role == "assistant" and pending_user is not None:
363                pairs.append((pending_user, msg))
364                pending_user = None
365
366        # Drop oldest pairs until total fits.
367        while pairs:
368            total = sum(
369                self.estimate_token_count(str(u.get("content", "")))
370                + self.estimate_token_count(str(a.get("content", "")))
371                for u, a in pairs
372            )
373            if total <= max_tokens:
374                break
375            pairs.pop(0)
376
377        result: list[dict[str, Any]] = []
378        for user_msg, assistant_msg in pairs:
379            result.append(user_msg)
380            result.append(assistant_msg)
381        return result

Trim conversation history to fit within max_tokens.

Pairs up user/assistant messages and drops the oldest complete pairs first until the estimated total token count fits within the budget.

Arguments:
  • history: Flat list of message dictionaries to trim.
  • max_tokens: Maximum token budget for the returned history.
Returns:

A (possibly shorter) flat list of message dictionaries that fits within max_tokens.