app.chat.manager

Chat history storage and context management for post-analysis Q&A.

Provides the ChatManager class that persists per-case chat conversations as JSONL files and builds context blocks for AI follow-up questions after an analysis is complete.

Key responsibilities:

  • Message persistence -- Append-only JSONL storage of user/assistant message pairs with UTC timestamps, analogous to the audit trail but scoped to interactive chat.
  • Context assembly -- Combines investigation context, system metadata, executive summary, and per-artifact findings into a single text block suitable for injection into an AI system prompt.
  • Token budgeting -- Estimates token counts and trims conversation history to fit within a configurable context window, dropping the oldest pairs first.
  • CSV data retrieval -- Delegates to ~app.chat.csv_retrieval for heuristic matching of user questions to parsed artifact CSV files.
Attributes:
  • VALID_ROLES: Frozenset of accepted message role strings ("user" and "assistant").
  1"""Chat history storage and context management for post-analysis Q&A.
  2
  3Provides the :class:`ChatManager` class that persists per-case chat
  4conversations as JSONL files and builds context blocks for AI follow-up
  5questions after an analysis is complete.
  6
  7Key responsibilities:
  8
  9* **Message persistence** -- Append-only JSONL storage of user/assistant
 10  message pairs with UTC timestamps, analogous to the audit trail but
 11  scoped to interactive chat.
 12* **Context assembly** -- Combines investigation context, system metadata,
 13  executive summary, and per-artifact findings into a single text block
 14  suitable for injection into an AI system prompt.
 15* **Token budgeting** -- Estimates token counts and trims conversation
 16  history to fit within a configurable context window, dropping the oldest
 17  pairs first.
 18* **CSV data retrieval** -- Delegates to :mod:`~app.chat.csv_retrieval`
 19  for heuristic matching of user questions to parsed artifact CSV files.
 20
 21Attributes:
 22    VALID_ROLES: Frozenset of accepted message role strings
 23        (``"user"`` and ``"assistant"``).
 24"""
 25
 26from __future__ import annotations
 27
 28import json
 29import logging
 30from pathlib import Path
 31from typing import Any, Mapping
 32
 33from ..audit import _utc_now_iso8601_ms
 34from .csv_retrieval import retrieve_csv_data as _retrieve_csv_data
 35
 36__all__ = ["ChatManager"]
 37
 38log = logging.getLogger(__name__)
 39
 40VALID_ROLES = frozenset({"user", "assistant"})
 41
 42
 43def _stringify(value: Any, default: str = "") -> str:
 44    """Convert *value* to a stripped string, returning *default* when empty.
 45
 46    Args:
 47        value: Arbitrary value to stringify.
 48        default: Fallback string when *value* is *None* or blank.
 49
 50    Returns:
 51        The stripped string representation or *default*.
 52    """
 53    text = str(value).strip() if value is not None else ""
 54    return text or default
 55
 56
 57class ChatManager:
 58    """Persist and retrieve case-scoped chat history records.
 59
 60    Each instance is bound to a single case directory and manages a
 61    ``chat_history.jsonl`` file containing timestamped user/assistant
 62    message pairs.  The manager also assembles context blocks for AI
 63    prompts by combining analysis results, investigation context, and
 64    system metadata.
 65
 66    Attributes:
 67        MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
 68        case_dir: Resolved path to the case directory.
 69        chat_file: Path to the ``chat_history.jsonl`` file.
 70    """
 71
 72    MAX_CONTEXT_TOKENS = 100000
 73
 74    def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None:
 75        """Initialise the chat manager for a case directory.
 76
 77        Args:
 78            case_dir: Path to the case directory.  Created if it does
 79                not exist when messages are first written.
 80            max_context_tokens: Optional override for the maximum token
 81                budget.  Falls back to :attr:`MAX_CONTEXT_TOKENS` when
 82                *None* or invalid.
 83        """
 84        self.case_dir = Path(case_dir)
 85        self.chat_file = self.case_dir / "chat_history.jsonl"
 86        self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)
 87
 88    # ------------------------------------------------------------------
 89    # Message persistence
 90    # ------------------------------------------------------------------
 91
 92    def add_message(
 93        self,
 94        role: str,
 95        content: str,
 96        metadata: dict[str, Any] | None = None,
 97    ) -> None:
 98        """Append one message entry to the case chat JSONL history.
 99
100        The message is written as a single JSON line with a UTC ISO 8601
101        timestamp.  The file is opened, written, and flushed for each call
102        to minimise data loss on unexpected termination.
103
104        Args:
105            role: Message role -- must be ``"user"`` or ``"assistant"``.
106            content: The message text.
107            metadata: Optional dictionary of extra metadata to attach to
108                the record (e.g. token counts, retrieval info).
109
110        Raises:
111            ValueError: If *role* is not in :data:`VALID_ROLES`.
112            TypeError: If *content* is not a string or *metadata* is not a
113                dict when provided.
114        """
115        normalized_role = str(role).strip().lower()
116        if normalized_role not in VALID_ROLES:
117            allowed = ", ".join(sorted(VALID_ROLES))
118            raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.")
119        if not isinstance(content, str):
120            raise TypeError("content must be a string.")
121        if metadata is not None and not isinstance(metadata, dict):
122            raise TypeError("metadata must be a dictionary when provided.")
123
124        message: dict[str, Any] = {
125            "timestamp": _utc_now_iso8601_ms(),
126            "role": normalized_role,
127            "content": content,
128        }
129        if metadata is not None:
130            message["metadata"] = metadata
131
132        line = json.dumps(message, separators=(",", ":")) + "\n"
133        self.chat_file.parent.mkdir(parents=True, exist_ok=True)
134        with self.chat_file.open("ab", buffering=0) as chat_stream:
135            chat_stream.write(line.encode("utf-8"))
136            chat_stream.flush()
137
138    def get_history(self) -> list[dict[str, Any]]:
139        """Load the full chat history in insertion order.
140
141        Reads every line from ``chat_history.jsonl``, skipping blank lines
142        and malformed JSON entries (which are logged as warnings).
143
144        Returns:
145            A list of message dictionaries, each containing at least
146            ``timestamp``, ``role``, and ``content`` keys.
147        """
148        if not self.chat_file.exists():
149            return []
150
151        history: list[dict[str, Any]] = []
152        with self.chat_file.open("r", encoding="utf-8") as chat_stream:
153            for line_no, raw_line in enumerate(chat_stream, 1):
154                line = raw_line.strip()
155                if not line:
156                    continue
157                try:
158                    record = json.loads(line)
159                except json.JSONDecodeError:
160                    log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file)
161                    continue
162                if isinstance(record, dict):
163                    history.append(record)
164        return history
165
166    def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]:
167        """Return the most recent complete user/assistant message pairs.
168
169        Messages are paired in order: a ``user`` message followed by the
170        next ``assistant`` message forms a pair.  Only the last
171        *max_pairs* complete pairs are returned.
172
173        Args:
174            max_pairs: Maximum number of user/assistant pairs to return.
175
176        Returns:
177            A flat list of message dictionaries alternating
178            ``[user, assistant, user, assistant, ...]``.
179        """
180        if max_pairs <= 0:
181            return []
182
183        history = self.get_history()
184        paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = []
185        pending_user: dict[str, Any] | None = None
186
187        for message in history:
188            role = message.get("role")
189            if role == "user":
190                pending_user = message
191                continue
192            if role == "assistant" and pending_user is not None:
193                paired_messages.append((pending_user, message))
194                pending_user = None
195
196        recent_pairs = paired_messages[-max_pairs:]
197        recent_history: list[dict[str, Any]] = []
198        for user_message, assistant_message in recent_pairs:
199            recent_history.append(user_message)
200            recent_history.append(assistant_message)
201        return recent_history
202
203    def clear(self) -> None:
204        """Delete the chat history file when present.
205
206        This is a destructive operation -- all chat messages for this
207        case are permanently removed.
208        """
209        if self.chat_file.exists():
210            self.chat_file.unlink()
211
212    # ------------------------------------------------------------------
213    # Context assembly
214    # ------------------------------------------------------------------
215
216    def build_chat_context(
217        self,
218        analysis_results: Mapping[str, Any] | None,
219        investigation_context: str,
220        metadata: Mapping[str, Any] | None,
221    ) -> str:
222        """Build a compact, complete context block for chat prompts.
223
224        Assembles investigation context, system metadata (hostname, OS,
225        domain), executive summary, and per-artifact findings into a
226        single multi-section text string suitable for injection into an
227        AI system prompt.
228
229        Args:
230            analysis_results: The full analysis results mapping (may
231                contain ``summary`` and ``per_artifact`` keys).
232            investigation_context: Free-text investigation context
233                provided by the analyst.
234            metadata: Evidence metadata mapping (hostname, os_version,
235                domain, etc.).
236
237        Returns:
238            A formatted multi-section context string.
239        """
240        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
241        per_artifact_lines = self._format_per_artifact_findings(analysis)
242        findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}"
243        return self._assemble_context(
244            analysis_results, investigation_context, metadata, findings_section,
245        )
246
247    def rebuild_context_with_compressed_findings(
248        self,
249        analysis_results: Mapping[str, Any] | None,
250        investigation_context: str,
251        metadata: Mapping[str, Any] | None,
252        compressed_findings: str,
253    ) -> str:
254        """Rebuild the context block using pre-compressed per-artifact findings.
255
256        Identical to :meth:`build_chat_context` except that the
257        per-artifact section is replaced with an externally compressed
258        version of the findings, used when the full context exceeds the
259        token budget.
260
261        Args:
262            analysis_results: The full analysis results mapping.
263            investigation_context: Free-text investigation context.
264            metadata: Evidence metadata mapping.
265            compressed_findings: Pre-compressed per-artifact findings
266                text to substitute into the context block.
267
268        Returns:
269            A formatted multi-section context string with compressed
270            findings.
271        """
272        findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}"
273        return self._assemble_context(
274            analysis_results, investigation_context, metadata, findings_section,
275        )
276
277    def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
278        """Return *True* when the context block exceeds 80 % of the token budget.
279
280        Args:
281            context_block: The assembled context text to measure.
282            token_budget: Maximum token allowance for the context window.
283
284        Returns:
285            *True* if the estimated token count of *context_block* exceeds
286            80 % of *token_budget*, *False* otherwise.
287        """
288        if token_budget <= 0:
289            return False
290        return self.estimate_token_count(context_block) > int(token_budget * 0.8)
291
292    # ------------------------------------------------------------------
293    # CSV data retrieval (delegates to csv_retrieval module)
294    # ------------------------------------------------------------------
295
296    def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]:
297        """Best-effort retrieval of raw CSV rows for data-centric chat questions.
298
299        Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`.
300
301        Args:
302            question: The user's chat question text.
303            parsed_dir: Path to the directory containing parsed artifact
304                CSV files.
305
306        Returns:
307            A dictionary with a ``retrieved`` boolean.  When *True*, also
308            includes ``artifacts`` (list of matched CSV filenames) and
309            ``data`` (formatted row text).
310        """
311        return _retrieve_csv_data(question, parsed_dir)
312
313    # ------------------------------------------------------------------
314    # Token budgeting
315    # ------------------------------------------------------------------
316
317    def estimate_token_count(self, text: str) -> int:
318        """Estimate token count using a rough 4-characters-per-token ratio.
319
320        Args:
321            text: The string to estimate tokens for.
322
323        Returns:
324            Approximate token count (integer).
325        """
326        if not text:
327            return 0
328        return int(len(text) / 4)
329
330    def fit_history(
331        self,
332        history: list[dict[str, Any]],
333        max_tokens: int,
334    ) -> list[dict[str, Any]]:
335        """Trim conversation history to fit within *max_tokens*.
336
337        Pairs up user/assistant messages and drops the oldest complete
338        pairs first until the estimated total token count fits within
339        the budget.
340
341        Args:
342            history: Flat list of message dictionaries to trim.
343            max_tokens: Maximum token budget for the returned history.
344
345        Returns:
346            A (possibly shorter) flat list of message dictionaries that
347            fits within *max_tokens*.
348        """
349        if max_tokens <= 0:
350            return []
351        if not history:
352            return []
353
354        # Pair up messages so we can drop oldest pairs.
355        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
356        pending_user: dict[str, Any] | None = None
357        for msg in history:
358            role = msg.get("role")
359            if role == "user":
360                pending_user = msg
361            elif role == "assistant" and pending_user is not None:
362                pairs.append((pending_user, msg))
363                pending_user = None
364
365        # Drop oldest pairs until total fits.
366        while pairs:
367            total = sum(
368                self.estimate_token_count(str(u.get("content", "")))
369                + self.estimate_token_count(str(a.get("content", "")))
370                for u, a in pairs
371            )
372            if total <= max_tokens:
373                break
374            pairs.pop(0)
375
376        result: list[dict[str, Any]] = []
377        for user_msg, assistant_msg in pairs:
378            result.append(user_msg)
379            result.append(assistant_msg)
380        return result
381
382    # ------------------------------------------------------------------
383    # Private helpers
384    # ------------------------------------------------------------------
385
386    @classmethod
387    def _resolve_max_context_tokens(cls, value: Any) -> int:
388        """Coerce *value* to a positive integer token limit.
389
390        Falls back to :attr:`MAX_CONTEXT_TOKENS` when *value* is *None*
391        or cannot be converted to an integer.
392
393        Args:
394            value: Candidate token limit value.
395
396        Returns:
397            A positive integer (minimum 1).
398        """
399        try:
400            resolved = int(value) if value is not None else int(cls.MAX_CONTEXT_TOKENS)
401        except (TypeError, ValueError):
402            resolved = int(cls.MAX_CONTEXT_TOKENS)
403        return max(1, resolved)
404
405    def _assemble_context(
406        self,
407        analysis_results: Mapping[str, Any] | None,
408        investigation_context: str,
409        metadata: Mapping[str, Any] | None,
410        findings_section: str,
411    ) -> str:
412        """Assemble context sections shared by build and rebuild methods.
413
414        Extracts metadata fields, formats the standard sections, and
415        appends the caller-provided findings section.
416
417        Args:
418            analysis_results: The full analysis results mapping.
419            investigation_context: Free-text investigation context.
420            metadata: Evidence metadata mapping.
421            findings_section: Pre-formatted findings section string
422                (including its header line).
423
424        Returns:
425            A formatted multi-section context string.
426        """
427        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
428        metadata_map = metadata if isinstance(metadata, Mapping) else {}
429
430        hostname = _stringify(metadata_map.get("hostname"), default="Unknown")
431        os_value = _stringify(
432            metadata_map.get("os_version") or metadata_map.get("os"),
433            default="Unknown",
434        )
435        domain = _stringify(metadata_map.get("domain"), default="Unknown")
436        summary = _stringify(analysis.get("summary"), default="No executive summary available.")
437        context_text = _stringify(
438            investigation_context,
439            default="No investigation context provided.",
440        )
441
442        sections = [
443            f"Investigation Context:\n{context_text}",
444            (
445                "System Under Analysis:\n"
446                f"- Hostname: {hostname}\n"
447                f"- OS: {os_value}\n"
448                f"- Domain: {domain}"
449            ),
450            f"Executive Summary:\n{summary}",
451            findings_section,
452        ]
453        return "\n\n".join(sections)
454
455    def _format_per_artifact_findings(self, analysis_results: Mapping[str, Any]) -> str:
456        """Format per-artifact findings as a bulleted text block.
457
458        Handles multiple input shapes (dict keyed by artifact name, list
459        of finding dicts, or list of raw strings) and normalises them
460        into ``- artifact_name: analysis_text`` lines.
461
462        Args:
463            analysis_results: The full analysis results mapping.
464
465        Returns:
466            A newline-joined string of bullet-pointed findings, or a
467            placeholder message when no findings are available.
468        """
469        raw_findings = analysis_results.get("per_artifact")
470        if raw_findings is None:
471            raw_findings = analysis_results.get("per_artifact_findings")
472
473        findings: list[tuple[str, str]] = []
474        if isinstance(raw_findings, Mapping):
475            items: list[Any] = []
476            for artifact_name, value in raw_findings.items():
477                if isinstance(value, Mapping):
478                    merged = dict(value)
479                    merged.setdefault("artifact_name", artifact_name)
480                    items.append(merged)
481                else:
482                    items.append({"artifact_name": artifact_name, "analysis": value})
483        elif isinstance(raw_findings, list):
484            items = list(raw_findings)
485        else:
486            items = []
487
488        for item in items:
489            if isinstance(item, Mapping):
490                artifact_name = _stringify(
491                    item.get("artifact_name") or item.get("name") or item.get("artifact_key"),
492                    default="Unknown Artifact",
493                )
494                analysis_text = _stringify(
495                    item.get("analysis")
496                    or item.get("finding")
497                    or item.get("summary")
498                    or item.get("text"),
499                )
500            else:
501                artifact_name = "Unknown Artifact"
502                analysis_text = _stringify(item)
503
504            if analysis_text:
505                findings.append((artifact_name, analysis_text))
506
507        if not findings:
508            return "- No per-artifact findings available."
509
510        return "\n".join(
511            f"- {artifact_name}: {analysis_text}"
512            for artifact_name, analysis_text in findings
513        )
class ChatManager:
 58class ChatManager:
 59    """Persist and retrieve case-scoped chat history records.
 60
 61    Each instance is bound to a single case directory and manages a
 62    ``chat_history.jsonl`` file containing timestamped user/assistant
 63    message pairs.  The manager also assembles context blocks for AI
 64    prompts by combining analysis results, investigation context, and
 65    system metadata.
 66
 67    Attributes:
 68        MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
 69        case_dir: Resolved path to the case directory.
 70        chat_file: Path to the ``chat_history.jsonl`` file.
 71    """
 72
 73    MAX_CONTEXT_TOKENS = 100000
 74
 75    def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None:
 76        """Initialise the chat manager for a case directory.
 77
 78        Args:
 79            case_dir: Path to the case directory.  Created if it does
 80                not exist when messages are first written.
 81            max_context_tokens: Optional override for the maximum token
 82                budget.  Falls back to :attr:`MAX_CONTEXT_TOKENS` when
 83                *None* or invalid.
 84        """
 85        self.case_dir = Path(case_dir)
 86        self.chat_file = self.case_dir / "chat_history.jsonl"
 87        self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)
 88
 89    # ------------------------------------------------------------------
 90    # Message persistence
 91    # ------------------------------------------------------------------
 92
 93    def add_message(
 94        self,
 95        role: str,
 96        content: str,
 97        metadata: dict[str, Any] | None = None,
 98    ) -> None:
 99        """Append one message entry to the case chat JSONL history.
100
101        The message is written as a single JSON line with a UTC ISO 8601
102        timestamp.  The file is opened, written, and flushed for each call
103        to minimise data loss on unexpected termination.
104
105        Args:
106            role: Message role -- must be ``"user"`` or ``"assistant"``.
107            content: The message text.
108            metadata: Optional dictionary of extra metadata to attach to
109                the record (e.g. token counts, retrieval info).
110
111        Raises:
112            ValueError: If *role* is not in :data:`VALID_ROLES`.
113            TypeError: If *content* is not a string or *metadata* is not a
114                dict when provided.
115        """
116        normalized_role = str(role).strip().lower()
117        if normalized_role not in VALID_ROLES:
118            allowed = ", ".join(sorted(VALID_ROLES))
119            raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.")
120        if not isinstance(content, str):
121            raise TypeError("content must be a string.")
122        if metadata is not None and not isinstance(metadata, dict):
123            raise TypeError("metadata must be a dictionary when provided.")
124
125        message: dict[str, Any] = {
126            "timestamp": _utc_now_iso8601_ms(),
127            "role": normalized_role,
128            "content": content,
129        }
130        if metadata is not None:
131            message["metadata"] = metadata
132
133        line = json.dumps(message, separators=(",", ":")) + "\n"
134        self.chat_file.parent.mkdir(parents=True, exist_ok=True)
135        with self.chat_file.open("ab", buffering=0) as chat_stream:
136            chat_stream.write(line.encode("utf-8"))
137            chat_stream.flush()
138
139    def get_history(self) -> list[dict[str, Any]]:
140        """Load the full chat history in insertion order.
141
142        Reads every line from ``chat_history.jsonl``, skipping blank lines
143        and malformed JSON entries (which are logged as warnings).
144
145        Returns:
146            A list of message dictionaries, each containing at least
147            ``timestamp``, ``role``, and ``content`` keys.
148        """
149        if not self.chat_file.exists():
150            return []
151
152        history: list[dict[str, Any]] = []
153        with self.chat_file.open("r", encoding="utf-8") as chat_stream:
154            for line_no, raw_line in enumerate(chat_stream, 1):
155                line = raw_line.strip()
156                if not line:
157                    continue
158                try:
159                    record = json.loads(line)
160                except json.JSONDecodeError:
161                    log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file)
162                    continue
163                if isinstance(record, dict):
164                    history.append(record)
165        return history
166
167    def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]:
168        """Return the most recent complete user/assistant message pairs.
169
170        Messages are paired in order: a ``user`` message followed by the
171        next ``assistant`` message forms a pair.  Only the last
172        *max_pairs* complete pairs are returned.
173
174        Args:
175            max_pairs: Maximum number of user/assistant pairs to return.
176
177        Returns:
178            A flat list of message dictionaries alternating
179            ``[user, assistant, user, assistant, ...]``.
180        """
181        if max_pairs <= 0:
182            return []
183
184        history = self.get_history()
185        paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = []
186        pending_user: dict[str, Any] | None = None
187
188        for message in history:
189            role = message.get("role")
190            if role == "user":
191                pending_user = message
192                continue
193            if role == "assistant" and pending_user is not None:
194                paired_messages.append((pending_user, message))
195                pending_user = None
196
197        recent_pairs = paired_messages[-max_pairs:]
198        recent_history: list[dict[str, Any]] = []
199        for user_message, assistant_message in recent_pairs:
200            recent_history.append(user_message)
201            recent_history.append(assistant_message)
202        return recent_history
203
204    def clear(self) -> None:
205        """Delete the chat history file when present.
206
207        This is a destructive operation -- all chat messages for this
208        case are permanently removed.
209        """
210        if self.chat_file.exists():
211            self.chat_file.unlink()
212
213    # ------------------------------------------------------------------
214    # Context assembly
215    # ------------------------------------------------------------------
216
217    def build_chat_context(
218        self,
219        analysis_results: Mapping[str, Any] | None,
220        investigation_context: str,
221        metadata: Mapping[str, Any] | None,
222    ) -> str:
223        """Build a compact, complete context block for chat prompts.
224
225        Assembles investigation context, system metadata (hostname, OS,
226        domain), executive summary, and per-artifact findings into a
227        single multi-section text string suitable for injection into an
228        AI system prompt.
229
230        Args:
231            analysis_results: The full analysis results mapping (may
232                contain ``summary`` and ``per_artifact`` keys).
233            investigation_context: Free-text investigation context
234                provided by the analyst.
235            metadata: Evidence metadata mapping (hostname, os_version,
236                domain, etc.).
237
238        Returns:
239            A formatted multi-section context string.
240        """
241        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
242        per_artifact_lines = self._format_per_artifact_findings(analysis)
243        findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}"
244        return self._assemble_context(
245            analysis_results, investigation_context, metadata, findings_section,
246        )
247
248    def rebuild_context_with_compressed_findings(
249        self,
250        analysis_results: Mapping[str, Any] | None,
251        investigation_context: str,
252        metadata: Mapping[str, Any] | None,
253        compressed_findings: str,
254    ) -> str:
255        """Rebuild the context block using pre-compressed per-artifact findings.
256
257        Identical to :meth:`build_chat_context` except that the
258        per-artifact section is replaced with an externally compressed
259        version of the findings, used when the full context exceeds the
260        token budget.
261
262        Args:
263            analysis_results: The full analysis results mapping.
264            investigation_context: Free-text investigation context.
265            metadata: Evidence metadata mapping.
266            compressed_findings: Pre-compressed per-artifact findings
267                text to substitute into the context block.
268
269        Returns:
270            A formatted multi-section context string with compressed
271            findings.
272        """
273        findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}"
274        return self._assemble_context(
275            analysis_results, investigation_context, metadata, findings_section,
276        )
277
278    def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
279        """Return *True* when the context block exceeds 80 % of the token budget.
280
281        Args:
282            context_block: The assembled context text to measure.
283            token_budget: Maximum token allowance for the context window.
284
285        Returns:
286            *True* if the estimated token count of *context_block* exceeds
287            80 % of *token_budget*, *False* otherwise.
288        """
289        if token_budget <= 0:
290            return False
291        return self.estimate_token_count(context_block) > int(token_budget * 0.8)
292
293    # ------------------------------------------------------------------
294    # CSV data retrieval (delegates to csv_retrieval module)
295    # ------------------------------------------------------------------
296
297    def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]:
298        """Best-effort retrieval of raw CSV rows for data-centric chat questions.
299
300        Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`.
301
302        Args:
303            question: The user's chat question text.
304            parsed_dir: Path to the directory containing parsed artifact
305                CSV files.
306
307        Returns:
308            A dictionary with a ``retrieved`` boolean.  When *True*, also
309            includes ``artifacts`` (list of matched CSV filenames) and
310            ``data`` (formatted row text).
311        """
312        return _retrieve_csv_data(question, parsed_dir)
313
314    # ------------------------------------------------------------------
315    # Token budgeting
316    # ------------------------------------------------------------------
317
318    def estimate_token_count(self, text: str) -> int:
319        """Estimate token count using a rough 4-characters-per-token ratio.
320
321        Args:
322            text: The string to estimate tokens for.
323
324        Returns:
325            Approximate token count (integer).
326        """
327        if not text:
328            return 0
329        return int(len(text) / 4)
330
331    def fit_history(
332        self,
333        history: list[dict[str, Any]],
334        max_tokens: int,
335    ) -> list[dict[str, Any]]:
336        """Trim conversation history to fit within *max_tokens*.
337
338        Pairs up user/assistant messages and drops the oldest complete
339        pairs first until the estimated total token count fits within
340        the budget.
341
342        Args:
343            history: Flat list of message dictionaries to trim.
344            max_tokens: Maximum token budget for the returned history.
345
346        Returns:
347            A (possibly shorter) flat list of message dictionaries that
348            fits within *max_tokens*.
349        """
350        if max_tokens <= 0:
351            return []
352        if not history:
353            return []
354
355        # Pair up messages so we can drop oldest pairs.
356        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
357        pending_user: dict[str, Any] | None = None
358        for msg in history:
359            role = msg.get("role")
360            if role == "user":
361                pending_user = msg
362            elif role == "assistant" and pending_user is not None:
363                pairs.append((pending_user, msg))
364                pending_user = None
365
366        # Drop oldest pairs until total fits.
367        while pairs:
368            total = sum(
369                self.estimate_token_count(str(u.get("content", "")))
370                + self.estimate_token_count(str(a.get("content", "")))
371                for u, a in pairs
372            )
373            if total <= max_tokens:
374                break
375            pairs.pop(0)
376
377        result: list[dict[str, Any]] = []
378        for user_msg, assistant_msg in pairs:
379            result.append(user_msg)
380            result.append(assistant_msg)
381        return result
382
383    # ------------------------------------------------------------------
384    # Private helpers
385    # ------------------------------------------------------------------
386
387    @classmethod
388    def _resolve_max_context_tokens(cls, value: Any) -> int:
389        """Coerce *value* to a positive integer token limit.
390
391        Falls back to :attr:`MAX_CONTEXT_TOKENS` when *value* is *None*
392        or cannot be converted to an integer.
393
394        Args:
395            value: Candidate token limit value.
396
397        Returns:
398            A positive integer (minimum 1).
399        """
400        try:
401            resolved = int(value) if value is not None else int(cls.MAX_CONTEXT_TOKENS)
402        except (TypeError, ValueError):
403            resolved = int(cls.MAX_CONTEXT_TOKENS)
404        return max(1, resolved)
405
406    def _assemble_context(
407        self,
408        analysis_results: Mapping[str, Any] | None,
409        investigation_context: str,
410        metadata: Mapping[str, Any] | None,
411        findings_section: str,
412    ) -> str:
413        """Assemble context sections shared by build and rebuild methods.
414
415        Extracts metadata fields, formats the standard sections, and
416        appends the caller-provided findings section.
417
418        Args:
419            analysis_results: The full analysis results mapping.
420            investigation_context: Free-text investigation context.
421            metadata: Evidence metadata mapping.
422            findings_section: Pre-formatted findings section string
423                (including its header line).
424
425        Returns:
426            A formatted multi-section context string.
427        """
428        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
429        metadata_map = metadata if isinstance(metadata, Mapping) else {}
430
431        hostname = _stringify(metadata_map.get("hostname"), default="Unknown")
432        os_value = _stringify(
433            metadata_map.get("os_version") or metadata_map.get("os"),
434            default="Unknown",
435        )
436        domain = _stringify(metadata_map.get("domain"), default="Unknown")
437        summary = _stringify(analysis.get("summary"), default="No executive summary available.")
438        context_text = _stringify(
439            investigation_context,
440            default="No investigation context provided.",
441        )
442
443        sections = [
444            f"Investigation Context:\n{context_text}",
445            (
446                "System Under Analysis:\n"
447                f"- Hostname: {hostname}\n"
448                f"- OS: {os_value}\n"
449                f"- Domain: {domain}"
450            ),
451            f"Executive Summary:\n{summary}",
452            findings_section,
453        ]
454        return "\n\n".join(sections)
455
456    def _format_per_artifact_findings(self, analysis_results: Mapping[str, Any]) -> str:
457        """Format per-artifact findings as a bulleted text block.
458
459        Handles multiple input shapes (dict keyed by artifact name, list
460        of finding dicts, or list of raw strings) and normalises them
461        into ``- artifact_name: analysis_text`` lines.
462
463        Args:
464            analysis_results: The full analysis results mapping.
465
466        Returns:
467            A newline-joined string of bullet-pointed findings, or a
468            placeholder message when no findings are available.
469        """
470        raw_findings = analysis_results.get("per_artifact")
471        if raw_findings is None:
472            raw_findings = analysis_results.get("per_artifact_findings")
473
474        findings: list[tuple[str, str]] = []
475        if isinstance(raw_findings, Mapping):
476            items: list[Any] = []
477            for artifact_name, value in raw_findings.items():
478                if isinstance(value, Mapping):
479                    merged = dict(value)
480                    merged.setdefault("artifact_name", artifact_name)
481                    items.append(merged)
482                else:
483                    items.append({"artifact_name": artifact_name, "analysis": value})
484        elif isinstance(raw_findings, list):
485            items = list(raw_findings)
486        else:
487            items = []
488
489        for item in items:
490            if isinstance(item, Mapping):
491                artifact_name = _stringify(
492                    item.get("artifact_name") or item.get("name") or item.get("artifact_key"),
493                    default="Unknown Artifact",
494                )
495                analysis_text = _stringify(
496                    item.get("analysis")
497                    or item.get("finding")
498                    or item.get("summary")
499                    or item.get("text"),
500                )
501            else:
502                artifact_name = "Unknown Artifact"
503                analysis_text = _stringify(item)
504
505            if analysis_text:
506                findings.append((artifact_name, analysis_text))
507
508        if not findings:
509            return "- No per-artifact findings available."
510
511        return "\n".join(
512            f"- {artifact_name}: {analysis_text}"
513            for artifact_name, analysis_text in findings
514        )

Persist and retrieve case-scoped chat history records.

Each instance is bound to a single case directory and manages a chat_history.jsonl file containing timestamped user/assistant message pairs. The manager also assembles context blocks for AI prompts by combining analysis results, investigation context, and system metadata.

Attributes:
  • MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
  • case_dir: Resolved path to the case directory.
  • chat_file: Path to the chat_history.jsonl file.
ChatManager(case_dir: str | pathlib.Path, max_context_tokens: int | None = None)
75    def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None:
76        """Initialise the chat manager for a case directory.
77
78        Args:
79            case_dir: Path to the case directory.  Created if it does
80                not exist when messages are first written.
81            max_context_tokens: Optional override for the maximum token
82                budget.  Falls back to :attr:`MAX_CONTEXT_TOKENS` when
83                *None* or invalid.
84        """
85        self.case_dir = Path(case_dir)
86        self.chat_file = self.case_dir / "chat_history.jsonl"
87        self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)

Initialise the chat manager for a case directory.

Arguments:
  • case_dir: Path to the case directory. Created if it does not exist when messages are first written.
  • max_context_tokens: Optional override for the maximum token budget. Falls back to MAX_CONTEXT_TOKENS when None or invalid.
MAX_CONTEXT_TOKENS = 100000
case_dir
chat_file
def add_message( self, role: str, content: str, metadata: dict[str, typing.Any] | None = None) -> None:
 93    def add_message(
 94        self,
 95        role: str,
 96        content: str,
 97        metadata: dict[str, Any] | None = None,
 98    ) -> None:
 99        """Append one message entry to the case chat JSONL history.
100
101        The message is written as a single JSON line with a UTC ISO 8601
102        timestamp.  The file is opened, written, and flushed for each call
103        to minimise data loss on unexpected termination.
104
105        Args:
106            role: Message role -- must be ``"user"`` or ``"assistant"``.
107            content: The message text.
108            metadata: Optional dictionary of extra metadata to attach to
109                the record (e.g. token counts, retrieval info).
110
111        Raises:
112            ValueError: If *role* is not in :data:`VALID_ROLES`.
113            TypeError: If *content* is not a string or *metadata* is not a
114                dict when provided.
115        """
116        normalized_role = str(role).strip().lower()
117        if normalized_role not in VALID_ROLES:
118            allowed = ", ".join(sorted(VALID_ROLES))
119            raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.")
120        if not isinstance(content, str):
121            raise TypeError("content must be a string.")
122        if metadata is not None and not isinstance(metadata, dict):
123            raise TypeError("metadata must be a dictionary when provided.")
124
125        message: dict[str, Any] = {
126            "timestamp": _utc_now_iso8601_ms(),
127            "role": normalized_role,
128            "content": content,
129        }
130        if metadata is not None:
131            message["metadata"] = metadata
132
133        line = json.dumps(message, separators=(",", ":")) + "\n"
134        self.chat_file.parent.mkdir(parents=True, exist_ok=True)
135        with self.chat_file.open("ab", buffering=0) as chat_stream:
136            chat_stream.write(line.encode("utf-8"))
137            chat_stream.flush()

Append one message entry to the case chat JSONL history.

The message is written as a single JSON line with a UTC ISO 8601 timestamp. The file is opened, written, and flushed for each call to minimise data loss on unexpected termination.

Arguments:
  • role: Message role -- must be "user" or "assistant".
  • content: The message text.
  • metadata: Optional dictionary of extra metadata to attach to the record (e.g. token counts, retrieval info).
Raises:
  • ValueError: If role is not in VALID_ROLES.
  • TypeError: If content is not a string or metadata is not a dict when provided.
def get_history(self) -> list[dict[str, typing.Any]]:
139    def get_history(self) -> list[dict[str, Any]]:
140        """Load the full chat history in insertion order.
141
142        Reads every line from ``chat_history.jsonl``, skipping blank lines
143        and malformed JSON entries (which are logged as warnings).
144
145        Returns:
146            A list of message dictionaries, each containing at least
147            ``timestamp``, ``role``, and ``content`` keys.
148        """
149        if not self.chat_file.exists():
150            return []
151
152        history: list[dict[str, Any]] = []
153        with self.chat_file.open("r", encoding="utf-8") as chat_stream:
154            for line_no, raw_line in enumerate(chat_stream, 1):
155                line = raw_line.strip()
156                if not line:
157                    continue
158                try:
159                    record = json.loads(line)
160                except json.JSONDecodeError:
161                    log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file)
162                    continue
163                if isinstance(record, dict):
164                    history.append(record)
165        return history

Load the full chat history in insertion order.

Reads every line from chat_history.jsonl, skipping blank lines and malformed JSON entries (which are logged as warnings).

Returns:

A list of message dictionaries, each containing at least timestamp, role, and content keys.

def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, typing.Any]]:
167    def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]:
168        """Return the most recent complete user/assistant message pairs.
169
170        Messages are paired in order: a ``user`` message followed by the
171        next ``assistant`` message forms a pair.  Only the last
172        *max_pairs* complete pairs are returned.
173
174        Args:
175            max_pairs: Maximum number of user/assistant pairs to return.
176
177        Returns:
178            A flat list of message dictionaries alternating
179            ``[user, assistant, user, assistant, ...]``.
180        """
181        if max_pairs <= 0:
182            return []
183
184        history = self.get_history()
185        paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = []
186        pending_user: dict[str, Any] | None = None
187
188        for message in history:
189            role = message.get("role")
190            if role == "user":
191                pending_user = message
192                continue
193            if role == "assistant" and pending_user is not None:
194                paired_messages.append((pending_user, message))
195                pending_user = None
196
197        recent_pairs = paired_messages[-max_pairs:]
198        recent_history: list[dict[str, Any]] = []
199        for user_message, assistant_message in recent_pairs:
200            recent_history.append(user_message)
201            recent_history.append(assistant_message)
202        return recent_history

Return the most recent complete user/assistant message pairs.

Messages are paired in order: a user message followed by the next assistant message forms a pair. Only the last max_pairs complete pairs are returned.

Arguments:
  • max_pairs: Maximum number of user/assistant pairs to return.
Returns:

A flat list of message dictionaries alternating [user, assistant, user, assistant, ...].

def clear(self) -> None:
204    def clear(self) -> None:
205        """Delete the chat history file when present.
206
207        This is a destructive operation -- all chat messages for this
208        case are permanently removed.
209        """
210        if self.chat_file.exists():
211            self.chat_file.unlink()

Delete the chat history file when present.

This is a destructive operation -- all chat messages for this case are permanently removed.

def build_chat_context( self, analysis_results: Optional[Mapping[str, Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]]) -> str:
217    def build_chat_context(
218        self,
219        analysis_results: Mapping[str, Any] | None,
220        investigation_context: str,
221        metadata: Mapping[str, Any] | None,
222    ) -> str:
223        """Build a compact, complete context block for chat prompts.
224
225        Assembles investigation context, system metadata (hostname, OS,
226        domain), executive summary, and per-artifact findings into a
227        single multi-section text string suitable for injection into an
228        AI system prompt.
229
230        Args:
231            analysis_results: The full analysis results mapping (may
232                contain ``summary`` and ``per_artifact`` keys).
233            investigation_context: Free-text investigation context
234                provided by the analyst.
235            metadata: Evidence metadata mapping (hostname, os_version,
236                domain, etc.).
237
238        Returns:
239            A formatted multi-section context string.
240        """
241        analysis = analysis_results if isinstance(analysis_results, Mapping) else {}
242        per_artifact_lines = self._format_per_artifact_findings(analysis)
243        findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}"
244        return self._assemble_context(
245            analysis_results, investigation_context, metadata, findings_section,
246        )

Build a compact, complete context block for chat prompts.

Assembles investigation context, system metadata (hostname, OS, domain), executive summary, and per-artifact findings into a single multi-section text string suitable for injection into an AI system prompt.

Arguments:
  • analysis_results: The full analysis results mapping (may contain summary and per_artifact keys).
  • investigation_context: Free-text investigation context provided by the analyst.
  • metadata: Evidence metadata mapping (hostname, os_version, domain, etc.).
Returns:

A formatted multi-section context string.

def rebuild_context_with_compressed_findings( self, analysis_results: Optional[Mapping[str, Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]], compressed_findings: str) -> str:
248    def rebuild_context_with_compressed_findings(
249        self,
250        analysis_results: Mapping[str, Any] | None,
251        investigation_context: str,
252        metadata: Mapping[str, Any] | None,
253        compressed_findings: str,
254    ) -> str:
255        """Rebuild the context block using pre-compressed per-artifact findings.
256
257        Identical to :meth:`build_chat_context` except that the
258        per-artifact section is replaced with an externally compressed
259        version of the findings, used when the full context exceeds the
260        token budget.
261
262        Args:
263            analysis_results: The full analysis results mapping.
264            investigation_context: Free-text investigation context.
265            metadata: Evidence metadata mapping.
266            compressed_findings: Pre-compressed per-artifact findings
267                text to substitute into the context block.
268
269        Returns:
270            A formatted multi-section context string with compressed
271            findings.
272        """
273        findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}"
274        return self._assemble_context(
275            analysis_results, investigation_context, metadata, findings_section,
276        )

Rebuild the context block using pre-compressed per-artifact findings.

Identical to build_chat_context() except that the per-artifact section is replaced with an externally compressed version of the findings, used when the full context exceeds the token budget.

Arguments:
  • analysis_results: The full analysis results mapping.
  • investigation_context: Free-text investigation context.
  • metadata: Evidence metadata mapping.
  • compressed_findings: Pre-compressed per-artifact findings text to substitute into the context block.
Returns:

A formatted multi-section context string with compressed findings.

def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
278    def context_needs_compression(self, context_block: str, token_budget: int) -> bool:
279        """Return *True* when the context block exceeds 80 % of the token budget.
280
281        Args:
282            context_block: The assembled context text to measure.
283            token_budget: Maximum token allowance for the context window.
284
285        Returns:
286            *True* if the estimated token count of *context_block* exceeds
287            80 % of *token_budget*, *False* otherwise.
288        """
289        if token_budget <= 0:
290            return False
291        return self.estimate_token_count(context_block) > int(token_budget * 0.8)

Return True when the context block exceeds 80 % of the token budget.

Arguments:
  • context_block: The assembled context text to measure.
  • token_budget: Maximum token allowance for the context window.
Returns:

True if the estimated token count of context_block exceeds 80 % of token_budget, False otherwise.

def retrieve_csv_data( self, question: str, parsed_dir: str | pathlib.Path) -> dict[str, typing.Any]:
297    def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]:
298        """Best-effort retrieval of raw CSV rows for data-centric chat questions.
299
300        Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`.
301
302        Args:
303            question: The user's chat question text.
304            parsed_dir: Path to the directory containing parsed artifact
305                CSV files.
306
307        Returns:
308            A dictionary with a ``retrieved`` boolean.  When *True*, also
309            includes ``artifacts`` (list of matched CSV filenames) and
310            ``data`` (formatted row text).
311        """
312        return _retrieve_csv_data(question, parsed_dir)

Best-effort retrieval of raw CSV rows for data-centric chat questions.

Delegates to ~app.chat.csv_retrieval.retrieve_csv_data().

Arguments:
  • question: The user's chat question text.
  • parsed_dir: Path to the directory containing parsed artifact CSV files.
Returns:

A dictionary with a retrieved boolean. When True, also includes artifacts (list of matched CSV filenames) and data (formatted row text).

def estimate_token_count(self, text: str) -> int:
318    def estimate_token_count(self, text: str) -> int:
319        """Estimate token count using a rough 4-characters-per-token ratio.
320
321        Args:
322            text: The string to estimate tokens for.
323
324        Returns:
325            Approximate token count (integer).
326        """
327        if not text:
328            return 0
329        return int(len(text) / 4)

Estimate token count using a rough 4-characters-per-token ratio.

Arguments:
  • text: The string to estimate tokens for.
Returns:

Approximate token count (integer).

def fit_history( self, history: list[dict[str, typing.Any]], max_tokens: int) -> list[dict[str, typing.Any]]:
331    def fit_history(
332        self,
333        history: list[dict[str, Any]],
334        max_tokens: int,
335    ) -> list[dict[str, Any]]:
336        """Trim conversation history to fit within *max_tokens*.
337
338        Pairs up user/assistant messages and drops the oldest complete
339        pairs first until the estimated total token count fits within
340        the budget.
341
342        Args:
343            history: Flat list of message dictionaries to trim.
344            max_tokens: Maximum token budget for the returned history.
345
346        Returns:
347            A (possibly shorter) flat list of message dictionaries that
348            fits within *max_tokens*.
349        """
350        if max_tokens <= 0:
351            return []
352        if not history:
353            return []
354
355        # Pair up messages so we can drop oldest pairs.
356        pairs: list[tuple[dict[str, Any], dict[str, Any]]] = []
357        pending_user: dict[str, Any] | None = None
358        for msg in history:
359            role = msg.get("role")
360            if role == "user":
361                pending_user = msg
362            elif role == "assistant" and pending_user is not None:
363                pairs.append((pending_user, msg))
364                pending_user = None
365
366        # Drop oldest pairs until total fits.
367        while pairs:
368            total = sum(
369                self.estimate_token_count(str(u.get("content", "")))
370                + self.estimate_token_count(str(a.get("content", "")))
371                for u, a in pairs
372            )
373            if total <= max_tokens:
374                break
375            pairs.pop(0)
376
377        result: list[dict[str, Any]] = []
378        for user_msg, assistant_msg in pairs:
379            result.append(user_msg)
380            result.append(assistant_msg)
381        return result

Trim conversation history to fit within max_tokens.

Pairs up user/assistant messages and drops the oldest complete pairs first until the estimated total token count fits within the budget.

Arguments:
  • history: Flat list of message dictionaries to trim.
  • max_tokens: Maximum token budget for the returned history.
Returns:

A (possibly shorter) flat list of message dictionaries that fits within max_tokens.