app.chat
Chat history storage and CSV retrieval for post-analysis Q&A.
This package provides the ChatManager class for persisting
per-case chat conversations and assembling AI prompt context, as well
as CSV retrieval utilities for injecting artifact data into chat prompts.
Modules:
manager: Core ChatManager class (history, context, token budgeting). csv_retrieval: Heuristic CSV matching and row formatting.
1"""Chat history storage and CSV retrieval for post-analysis Q&A. 2 3This package provides the :class:`ChatManager` class for persisting 4per-case chat conversations and assembling AI prompt context, as well 5as CSV retrieval utilities for injecting artifact data into chat prompts. 6 7Modules: 8 manager: Core ChatManager class (history, context, token budgeting). 9 csv_retrieval: Heuristic CSV matching and row formatting. 10""" 11 12from .manager import ChatManager 13 14__all__ = ["ChatManager", "csv_retrieval", "manager"]
58class ChatManager: 59 """Persist and retrieve case-scoped chat history records. 60 61 Each instance is bound to a single case directory and manages a 62 ``chat_history.jsonl`` file containing timestamped user/assistant 63 message pairs. The manager also assembles context blocks for AI 64 prompts by combining analysis results, investigation context, and 65 system metadata. 66 67 Attributes: 68 MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly. 69 case_dir: Resolved path to the case directory. 70 chat_file: Path to the ``chat_history.jsonl`` file. 71 """ 72 73 MAX_CONTEXT_TOKENS = 100000 74 75 def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None: 76 """Initialise the chat manager for a case directory. 77 78 Args: 79 case_dir: Path to the case directory. Created if it does 80 not exist when messages are first written. 81 max_context_tokens: Optional override for the maximum token 82 budget. Falls back to :attr:`MAX_CONTEXT_TOKENS` when 83 *None* or invalid. 84 """ 85 self.case_dir = Path(case_dir) 86 self.chat_file = self.case_dir / "chat_history.jsonl" 87 self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens) 88 89 # ------------------------------------------------------------------ 90 # Message persistence 91 # ------------------------------------------------------------------ 92 93 def add_message( 94 self, 95 role: str, 96 content: str, 97 metadata: dict[str, Any] | None = None, 98 ) -> None: 99 """Append one message entry to the case chat JSONL history. 100 101 The message is written as a single JSON line with a UTC ISO 8601 102 timestamp. The file is opened, written, and flushed for each call 103 to minimise data loss on unexpected termination. 104 105 Args: 106 role: Message role -- must be ``"user"`` or ``"assistant"``. 107 content: The message text. 108 metadata: Optional dictionary of extra metadata to attach to 109 the record (e.g. token counts, retrieval info). 110 111 Raises: 112 ValueError: If *role* is not in :data:`VALID_ROLES`. 113 TypeError: If *content* is not a string or *metadata* is not a 114 dict when provided. 115 """ 116 normalized_role = str(role).strip().lower() 117 if normalized_role not in VALID_ROLES: 118 allowed = ", ".join(sorted(VALID_ROLES)) 119 raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.") 120 if not isinstance(content, str): 121 raise TypeError("content must be a string.") 122 if metadata is not None and not isinstance(metadata, dict): 123 raise TypeError("metadata must be a dictionary when provided.") 124 125 message: dict[str, Any] = { 126 "timestamp": _utc_now_iso8601_ms(), 127 "role": normalized_role, 128 "content": content, 129 } 130 if metadata is not None: 131 message["metadata"] = metadata 132 133 line = json.dumps(message, separators=(",", ":")) + "\n" 134 self.chat_file.parent.mkdir(parents=True, exist_ok=True) 135 with self.chat_file.open("ab", buffering=0) as chat_stream: 136 chat_stream.write(line.encode("utf-8")) 137 chat_stream.flush() 138 139 def get_history(self) -> list[dict[str, Any]]: 140 """Load the full chat history in insertion order. 141 142 Reads every line from ``chat_history.jsonl``, skipping blank lines 143 and malformed JSON entries (which are logged as warnings). 144 145 Returns: 146 A list of message dictionaries, each containing at least 147 ``timestamp``, ``role``, and ``content`` keys. 148 """ 149 if not self.chat_file.exists(): 150 return [] 151 152 history: list[dict[str, Any]] = [] 153 with self.chat_file.open("r", encoding="utf-8") as chat_stream: 154 for line_no, raw_line in enumerate(chat_stream, 1): 155 line = raw_line.strip() 156 if not line: 157 continue 158 try: 159 record = json.loads(line) 160 except json.JSONDecodeError: 161 log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file) 162 continue 163 if isinstance(record, dict): 164 history.append(record) 165 return history 166 167 def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]: 168 """Return the most recent complete user/assistant message pairs. 169 170 Messages are paired in order: a ``user`` message followed by the 171 next ``assistant`` message forms a pair. Only the last 172 *max_pairs* complete pairs are returned. 173 174 Args: 175 max_pairs: Maximum number of user/assistant pairs to return. 176 177 Returns: 178 A flat list of message dictionaries alternating 179 ``[user, assistant, user, assistant, ...]``. 180 """ 181 if max_pairs <= 0: 182 return [] 183 184 history = self.get_history() 185 paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = [] 186 pending_user: dict[str, Any] | None = None 187 188 for message in history: 189 role = message.get("role") 190 if role == "user": 191 pending_user = message 192 continue 193 if role == "assistant" and pending_user is not None: 194 paired_messages.append((pending_user, message)) 195 pending_user = None 196 197 recent_pairs = paired_messages[-max_pairs:] 198 recent_history: list[dict[str, Any]] = [] 199 for user_message, assistant_message in recent_pairs: 200 recent_history.append(user_message) 201 recent_history.append(assistant_message) 202 return recent_history 203 204 def clear(self) -> None: 205 """Delete the chat history file when present. 206 207 This is a destructive operation -- all chat messages for this 208 case are permanently removed. 209 """ 210 if self.chat_file.exists(): 211 self.chat_file.unlink() 212 213 # ------------------------------------------------------------------ 214 # Context assembly 215 # ------------------------------------------------------------------ 216 217 def build_chat_context( 218 self, 219 analysis_results: Mapping[str, Any] | None, 220 investigation_context: str, 221 metadata: Mapping[str, Any] | None, 222 ) -> str: 223 """Build a compact, complete context block for chat prompts. 224 225 Assembles investigation context, system metadata (hostname, OS, 226 domain), executive summary, and per-artifact findings into a 227 single multi-section text string suitable for injection into an 228 AI system prompt. 229 230 Args: 231 analysis_results: The full analysis results mapping (may 232 contain ``summary`` and ``per_artifact`` keys). 233 investigation_context: Free-text investigation context 234 provided by the analyst. 235 metadata: Evidence metadata mapping (hostname, os_version, 236 domain, etc.). 237 238 Returns: 239 A formatted multi-section context string. 240 """ 241 analysis = analysis_results if isinstance(analysis_results, Mapping) else {} 242 per_artifact_lines = self._format_per_artifact_findings(analysis) 243 findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}" 244 return self._assemble_context( 245 analysis_results, investigation_context, metadata, findings_section, 246 ) 247 248 def rebuild_context_with_compressed_findings( 249 self, 250 analysis_results: Mapping[str, Any] | None, 251 investigation_context: str, 252 metadata: Mapping[str, Any] | None, 253 compressed_findings: str, 254 ) -> str: 255 """Rebuild the context block using pre-compressed per-artifact findings. 256 257 Identical to :meth:`build_chat_context` except that the 258 per-artifact section is replaced with an externally compressed 259 version of the findings, used when the full context exceeds the 260 token budget. 261 262 Args: 263 analysis_results: The full analysis results mapping. 264 investigation_context: Free-text investigation context. 265 metadata: Evidence metadata mapping. 266 compressed_findings: Pre-compressed per-artifact findings 267 text to substitute into the context block. 268 269 Returns: 270 A formatted multi-section context string with compressed 271 findings. 272 """ 273 findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}" 274 return self._assemble_context( 275 analysis_results, investigation_context, metadata, findings_section, 276 ) 277 278 def context_needs_compression(self, context_block: str, token_budget: int) -> bool: 279 """Return *True* when the context block exceeds 80 % of the token budget. 280 281 Args: 282 context_block: The assembled context text to measure. 283 token_budget: Maximum token allowance for the context window. 284 285 Returns: 286 *True* if the estimated token count of *context_block* exceeds 287 80 % of *token_budget*, *False* otherwise. 288 """ 289 if token_budget <= 0: 290 return False 291 return self.estimate_token_count(context_block) > int(token_budget * 0.8) 292 293 # ------------------------------------------------------------------ 294 # CSV data retrieval (delegates to csv_retrieval module) 295 # ------------------------------------------------------------------ 296 297 def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]: 298 """Best-effort retrieval of raw CSV rows for data-centric chat questions. 299 300 Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`. 301 302 Args: 303 question: The user's chat question text. 304 parsed_dir: Path to the directory containing parsed artifact 305 CSV files. 306 307 Returns: 308 A dictionary with a ``retrieved`` boolean. When *True*, also 309 includes ``artifacts`` (list of matched CSV filenames) and 310 ``data`` (formatted row text). 311 """ 312 return _retrieve_csv_data(question, parsed_dir) 313 314 # ------------------------------------------------------------------ 315 # Token budgeting 316 # ------------------------------------------------------------------ 317 318 def estimate_token_count(self, text: str) -> int: 319 """Estimate token count using a rough 4-characters-per-token ratio. 320 321 Args: 322 text: The string to estimate tokens for. 323 324 Returns: 325 Approximate token count (integer). 326 """ 327 if not text: 328 return 0 329 return int(len(text) / 4) 330 331 def fit_history( 332 self, 333 history: list[dict[str, Any]], 334 max_tokens: int, 335 ) -> list[dict[str, Any]]: 336 """Trim conversation history to fit within *max_tokens*. 337 338 Pairs up user/assistant messages and drops the oldest complete 339 pairs first until the estimated total token count fits within 340 the budget. 341 342 Args: 343 history: Flat list of message dictionaries to trim. 344 max_tokens: Maximum token budget for the returned history. 345 346 Returns: 347 A (possibly shorter) flat list of message dictionaries that 348 fits within *max_tokens*. 349 """ 350 if max_tokens <= 0: 351 return [] 352 if not history: 353 return [] 354 355 # Pair up messages so we can drop oldest pairs. 356 pairs: list[tuple[dict[str, Any], dict[str, Any]]] = [] 357 pending_user: dict[str, Any] | None = None 358 for msg in history: 359 role = msg.get("role") 360 if role == "user": 361 pending_user = msg 362 elif role == "assistant" and pending_user is not None: 363 pairs.append((pending_user, msg)) 364 pending_user = None 365 366 # Drop oldest pairs until total fits. 367 while pairs: 368 total = sum( 369 self.estimate_token_count(str(u.get("content", ""))) 370 + self.estimate_token_count(str(a.get("content", ""))) 371 for u, a in pairs 372 ) 373 if total <= max_tokens: 374 break 375 pairs.pop(0) 376 377 result: list[dict[str, Any]] = [] 378 for user_msg, assistant_msg in pairs: 379 result.append(user_msg) 380 result.append(assistant_msg) 381 return result 382 383 # ------------------------------------------------------------------ 384 # Private helpers 385 # ------------------------------------------------------------------ 386 387 @classmethod 388 def _resolve_max_context_tokens(cls, value: Any) -> int: 389 """Coerce *value* to a positive integer token limit. 390 391 Falls back to :attr:`MAX_CONTEXT_TOKENS` when *value* is *None* 392 or cannot be converted to an integer. 393 394 Args: 395 value: Candidate token limit value. 396 397 Returns: 398 A positive integer (minimum 1). 399 """ 400 try: 401 resolved = int(value) if value is not None else int(cls.MAX_CONTEXT_TOKENS) 402 except (TypeError, ValueError): 403 resolved = int(cls.MAX_CONTEXT_TOKENS) 404 return max(1, resolved) 405 406 def _assemble_context( 407 self, 408 analysis_results: Mapping[str, Any] | None, 409 investigation_context: str, 410 metadata: Mapping[str, Any] | None, 411 findings_section: str, 412 ) -> str: 413 """Assemble context sections shared by build and rebuild methods. 414 415 Extracts metadata fields, formats the standard sections, and 416 appends the caller-provided findings section. 417 418 Args: 419 analysis_results: The full analysis results mapping. 420 investigation_context: Free-text investigation context. 421 metadata: Evidence metadata mapping. 422 findings_section: Pre-formatted findings section string 423 (including its header line). 424 425 Returns: 426 A formatted multi-section context string. 427 """ 428 analysis = analysis_results if isinstance(analysis_results, Mapping) else {} 429 metadata_map = metadata if isinstance(metadata, Mapping) else {} 430 431 hostname = _stringify(metadata_map.get("hostname"), default="Unknown") 432 os_value = _stringify( 433 metadata_map.get("os_version") or metadata_map.get("os"), 434 default="Unknown", 435 ) 436 domain = _stringify(metadata_map.get("domain"), default="Unknown") 437 summary = _stringify(analysis.get("summary"), default="No executive summary available.") 438 context_text = _stringify( 439 investigation_context, 440 default="No investigation context provided.", 441 ) 442 443 sections = [ 444 f"Investigation Context:\n{context_text}", 445 ( 446 "System Under Analysis:\n" 447 f"- Hostname: {hostname}\n" 448 f"- OS: {os_value}\n" 449 f"- Domain: {domain}" 450 ), 451 f"Executive Summary:\n{summary}", 452 findings_section, 453 ] 454 return "\n\n".join(sections) 455 456 def _format_per_artifact_findings(self, analysis_results: Mapping[str, Any]) -> str: 457 """Format per-artifact findings as a bulleted text block. 458 459 Handles multiple input shapes (dict keyed by artifact name, list 460 of finding dicts, or list of raw strings) and normalises them 461 into ``- artifact_name: analysis_text`` lines. 462 463 Args: 464 analysis_results: The full analysis results mapping. 465 466 Returns: 467 A newline-joined string of bullet-pointed findings, or a 468 placeholder message when no findings are available. 469 """ 470 raw_findings = analysis_results.get("per_artifact") 471 if raw_findings is None: 472 raw_findings = analysis_results.get("per_artifact_findings") 473 474 findings: list[tuple[str, str]] = [] 475 if isinstance(raw_findings, Mapping): 476 items: list[Any] = [] 477 for artifact_name, value in raw_findings.items(): 478 if isinstance(value, Mapping): 479 merged = dict(value) 480 merged.setdefault("artifact_name", artifact_name) 481 items.append(merged) 482 else: 483 items.append({"artifact_name": artifact_name, "analysis": value}) 484 elif isinstance(raw_findings, list): 485 items = list(raw_findings) 486 else: 487 items = [] 488 489 for item in items: 490 if isinstance(item, Mapping): 491 artifact_name = _stringify( 492 item.get("artifact_name") or item.get("name") or item.get("artifact_key"), 493 default="Unknown Artifact", 494 ) 495 analysis_text = _stringify( 496 item.get("analysis") 497 or item.get("finding") 498 or item.get("summary") 499 or item.get("text"), 500 ) 501 else: 502 artifact_name = "Unknown Artifact" 503 analysis_text = _stringify(item) 504 505 if analysis_text: 506 findings.append((artifact_name, analysis_text)) 507 508 if not findings: 509 return "- No per-artifact findings available." 510 511 return "\n".join( 512 f"- {artifact_name}: {analysis_text}" 513 for artifact_name, analysis_text in findings 514 )
Persist and retrieve case-scoped chat history records.
Each instance is bound to a single case directory and manages a
chat_history.jsonl file containing timestamped user/assistant
message pairs. The manager also assembles context blocks for AI
prompts by combining analysis results, investigation context, and
system metadata.
Attributes:
- MAX_CONTEXT_TOKENS: Maximum token budget for chat context assembly.
- case_dir: Resolved path to the case directory.
- chat_file: Path to the
chat_history.jsonlfile.
75 def __init__(self, case_dir: str | Path, max_context_tokens: int | None = None) -> None: 76 """Initialise the chat manager for a case directory. 77 78 Args: 79 case_dir: Path to the case directory. Created if it does 80 not exist when messages are first written. 81 max_context_tokens: Optional override for the maximum token 82 budget. Falls back to :attr:`MAX_CONTEXT_TOKENS` when 83 *None* or invalid. 84 """ 85 self.case_dir = Path(case_dir) 86 self.chat_file = self.case_dir / "chat_history.jsonl" 87 self.MAX_CONTEXT_TOKENS = self._resolve_max_context_tokens(max_context_tokens)
Initialise the chat manager for a case directory.
Arguments:
- case_dir: Path to the case directory. Created if it does not exist when messages are first written.
- max_context_tokens: Optional override for the maximum token
budget. Falls back to
MAX_CONTEXT_TOKENSwhen None or invalid.
93 def add_message( 94 self, 95 role: str, 96 content: str, 97 metadata: dict[str, Any] | None = None, 98 ) -> None: 99 """Append one message entry to the case chat JSONL history. 100 101 The message is written as a single JSON line with a UTC ISO 8601 102 timestamp. The file is opened, written, and flushed for each call 103 to minimise data loss on unexpected termination. 104 105 Args: 106 role: Message role -- must be ``"user"`` or ``"assistant"``. 107 content: The message text. 108 metadata: Optional dictionary of extra metadata to attach to 109 the record (e.g. token counts, retrieval info). 110 111 Raises: 112 ValueError: If *role* is not in :data:`VALID_ROLES`. 113 TypeError: If *content* is not a string or *metadata* is not a 114 dict when provided. 115 """ 116 normalized_role = str(role).strip().lower() 117 if normalized_role not in VALID_ROLES: 118 allowed = ", ".join(sorted(VALID_ROLES)) 119 raise ValueError(f"Unsupported role '{role}'. Allowed values: {allowed}.") 120 if not isinstance(content, str): 121 raise TypeError("content must be a string.") 122 if metadata is not None and not isinstance(metadata, dict): 123 raise TypeError("metadata must be a dictionary when provided.") 124 125 message: dict[str, Any] = { 126 "timestamp": _utc_now_iso8601_ms(), 127 "role": normalized_role, 128 "content": content, 129 } 130 if metadata is not None: 131 message["metadata"] = metadata 132 133 line = json.dumps(message, separators=(",", ":")) + "\n" 134 self.chat_file.parent.mkdir(parents=True, exist_ok=True) 135 with self.chat_file.open("ab", buffering=0) as chat_stream: 136 chat_stream.write(line.encode("utf-8")) 137 chat_stream.flush()
Append one message entry to the case chat JSONL history.
The message is written as a single JSON line with a UTC ISO 8601 timestamp. The file is opened, written, and flushed for each call to minimise data loss on unexpected termination.
Arguments:
- role: Message role -- must be
"user"or"assistant". - content: The message text.
- metadata: Optional dictionary of extra metadata to attach to the record (e.g. token counts, retrieval info).
Raises:
- ValueError: If role is not in
VALID_ROLES. - TypeError: If content is not a string or metadata is not a dict when provided.
139 def get_history(self) -> list[dict[str, Any]]: 140 """Load the full chat history in insertion order. 141 142 Reads every line from ``chat_history.jsonl``, skipping blank lines 143 and malformed JSON entries (which are logged as warnings). 144 145 Returns: 146 A list of message dictionaries, each containing at least 147 ``timestamp``, ``role``, and ``content`` keys. 148 """ 149 if not self.chat_file.exists(): 150 return [] 151 152 history: list[dict[str, Any]] = [] 153 with self.chat_file.open("r", encoding="utf-8") as chat_stream: 154 for line_no, raw_line in enumerate(chat_stream, 1): 155 line = raw_line.strip() 156 if not line: 157 continue 158 try: 159 record = json.loads(line) 160 except json.JSONDecodeError: 161 log.warning("Skipping malformed JSON on line %d of %s", line_no, self.chat_file) 162 continue 163 if isinstance(record, dict): 164 history.append(record) 165 return history
Load the full chat history in insertion order.
Reads every line from chat_history.jsonl, skipping blank lines
and malformed JSON entries (which are logged as warnings).
Returns:
A list of message dictionaries, each containing at least
timestamp,role, andcontentkeys.
167 def get_recent_history(self, max_pairs: int = 20) -> list[dict[str, Any]]: 168 """Return the most recent complete user/assistant message pairs. 169 170 Messages are paired in order: a ``user`` message followed by the 171 next ``assistant`` message forms a pair. Only the last 172 *max_pairs* complete pairs are returned. 173 174 Args: 175 max_pairs: Maximum number of user/assistant pairs to return. 176 177 Returns: 178 A flat list of message dictionaries alternating 179 ``[user, assistant, user, assistant, ...]``. 180 """ 181 if max_pairs <= 0: 182 return [] 183 184 history = self.get_history() 185 paired_messages: list[tuple[dict[str, Any], dict[str, Any]]] = [] 186 pending_user: dict[str, Any] | None = None 187 188 for message in history: 189 role = message.get("role") 190 if role == "user": 191 pending_user = message 192 continue 193 if role == "assistant" and pending_user is not None: 194 paired_messages.append((pending_user, message)) 195 pending_user = None 196 197 recent_pairs = paired_messages[-max_pairs:] 198 recent_history: list[dict[str, Any]] = [] 199 for user_message, assistant_message in recent_pairs: 200 recent_history.append(user_message) 201 recent_history.append(assistant_message) 202 return recent_history
Return the most recent complete user/assistant message pairs.
Messages are paired in order: a user message followed by the
next assistant message forms a pair. Only the last
max_pairs complete pairs are returned.
Arguments:
- max_pairs: Maximum number of user/assistant pairs to return.
Returns:
A flat list of message dictionaries alternating
[user, assistant, user, assistant, ...].
204 def clear(self) -> None: 205 """Delete the chat history file when present. 206 207 This is a destructive operation -- all chat messages for this 208 case are permanently removed. 209 """ 210 if self.chat_file.exists(): 211 self.chat_file.unlink()
Delete the chat history file when present.
This is a destructive operation -- all chat messages for this case are permanently removed.
217 def build_chat_context( 218 self, 219 analysis_results: Mapping[str, Any] | None, 220 investigation_context: str, 221 metadata: Mapping[str, Any] | None, 222 ) -> str: 223 """Build a compact, complete context block for chat prompts. 224 225 Assembles investigation context, system metadata (hostname, OS, 226 domain), executive summary, and per-artifact findings into a 227 single multi-section text string suitable for injection into an 228 AI system prompt. 229 230 Args: 231 analysis_results: The full analysis results mapping (may 232 contain ``summary`` and ``per_artifact`` keys). 233 investigation_context: Free-text investigation context 234 provided by the analyst. 235 metadata: Evidence metadata mapping (hostname, os_version, 236 domain, etc.). 237 238 Returns: 239 A formatted multi-section context string. 240 """ 241 analysis = analysis_results if isinstance(analysis_results, Mapping) else {} 242 per_artifact_lines = self._format_per_artifact_findings(analysis) 243 findings_section = f"Per-Artifact Findings:\n{per_artifact_lines}" 244 return self._assemble_context( 245 analysis_results, investigation_context, metadata, findings_section, 246 )
Build a compact, complete context block for chat prompts.
Assembles investigation context, system metadata (hostname, OS, domain), executive summary, and per-artifact findings into a single multi-section text string suitable for injection into an AI system prompt.
Arguments:
- analysis_results: The full analysis results mapping (may
contain
summaryandper_artifactkeys). - investigation_context: Free-text investigation context provided by the analyst.
- metadata: Evidence metadata mapping (hostname, os_version, domain, etc.).
Returns:
A formatted multi-section context string.
248 def rebuild_context_with_compressed_findings( 249 self, 250 analysis_results: Mapping[str, Any] | None, 251 investigation_context: str, 252 metadata: Mapping[str, Any] | None, 253 compressed_findings: str, 254 ) -> str: 255 """Rebuild the context block using pre-compressed per-artifact findings. 256 257 Identical to :meth:`build_chat_context` except that the 258 per-artifact section is replaced with an externally compressed 259 version of the findings, used when the full context exceeds the 260 token budget. 261 262 Args: 263 analysis_results: The full analysis results mapping. 264 investigation_context: Free-text investigation context. 265 metadata: Evidence metadata mapping. 266 compressed_findings: Pre-compressed per-artifact findings 267 text to substitute into the context block. 268 269 Returns: 270 A formatted multi-section context string with compressed 271 findings. 272 """ 273 findings_section = f"Per-Artifact Findings (compressed):\n{compressed_findings}" 274 return self._assemble_context( 275 analysis_results, investigation_context, metadata, findings_section, 276 )
Rebuild the context block using pre-compressed per-artifact findings.
Identical to build_chat_context() except that the
per-artifact section is replaced with an externally compressed
version of the findings, used when the full context exceeds the
token budget.
Arguments:
- analysis_results: The full analysis results mapping.
- investigation_context: Free-text investigation context.
- metadata: Evidence metadata mapping.
- compressed_findings: Pre-compressed per-artifact findings text to substitute into the context block.
Returns:
A formatted multi-section context string with compressed findings.
278 def context_needs_compression(self, context_block: str, token_budget: int) -> bool: 279 """Return *True* when the context block exceeds 80 % of the token budget. 280 281 Args: 282 context_block: The assembled context text to measure. 283 token_budget: Maximum token allowance for the context window. 284 285 Returns: 286 *True* if the estimated token count of *context_block* exceeds 287 80 % of *token_budget*, *False* otherwise. 288 """ 289 if token_budget <= 0: 290 return False 291 return self.estimate_token_count(context_block) > int(token_budget * 0.8)
Return True when the context block exceeds 80 % of the token budget.
Arguments:
- context_block: The assembled context text to measure.
- token_budget: Maximum token allowance for the context window.
Returns:
True if the estimated token count of context_block exceeds 80 % of token_budget, False otherwise.
297 def retrieve_csv_data(self, question: str, parsed_dir: str | Path) -> dict[str, Any]: 298 """Best-effort retrieval of raw CSV rows for data-centric chat questions. 299 300 Delegates to :func:`~app.chat.csv_retrieval.retrieve_csv_data`. 301 302 Args: 303 question: The user's chat question text. 304 parsed_dir: Path to the directory containing parsed artifact 305 CSV files. 306 307 Returns: 308 A dictionary with a ``retrieved`` boolean. When *True*, also 309 includes ``artifacts`` (list of matched CSV filenames) and 310 ``data`` (formatted row text). 311 """ 312 return _retrieve_csv_data(question, parsed_dir)
Best-effort retrieval of raw CSV rows for data-centric chat questions.
Delegates to ~app.chat.csv_retrieval.retrieve_csv_data().
Arguments:
- question: The user's chat question text.
- parsed_dir: Path to the directory containing parsed artifact CSV files.
Returns:
A dictionary with a
retrievedboolean. When True, also includesartifacts(list of matched CSV filenames) anddata(formatted row text).
318 def estimate_token_count(self, text: str) -> int: 319 """Estimate token count using a rough 4-characters-per-token ratio. 320 321 Args: 322 text: The string to estimate tokens for. 323 324 Returns: 325 Approximate token count (integer). 326 """ 327 if not text: 328 return 0 329 return int(len(text) / 4)
Estimate token count using a rough 4-characters-per-token ratio.
Arguments:
- text: The string to estimate tokens for.
Returns:
Approximate token count (integer).
331 def fit_history( 332 self, 333 history: list[dict[str, Any]], 334 max_tokens: int, 335 ) -> list[dict[str, Any]]: 336 """Trim conversation history to fit within *max_tokens*. 337 338 Pairs up user/assistant messages and drops the oldest complete 339 pairs first until the estimated total token count fits within 340 the budget. 341 342 Args: 343 history: Flat list of message dictionaries to trim. 344 max_tokens: Maximum token budget for the returned history. 345 346 Returns: 347 A (possibly shorter) flat list of message dictionaries that 348 fits within *max_tokens*. 349 """ 350 if max_tokens <= 0: 351 return [] 352 if not history: 353 return [] 354 355 # Pair up messages so we can drop oldest pairs. 356 pairs: list[tuple[dict[str, Any], dict[str, Any]]] = [] 357 pending_user: dict[str, Any] | None = None 358 for msg in history: 359 role = msg.get("role") 360 if role == "user": 361 pending_user = msg 362 elif role == "assistant" and pending_user is not None: 363 pairs.append((pending_user, msg)) 364 pending_user = None 365 366 # Drop oldest pairs until total fits. 367 while pairs: 368 total = sum( 369 self.estimate_token_count(str(u.get("content", ""))) 370 + self.estimate_token_count(str(a.get("content", ""))) 371 for u, a in pairs 372 ) 373 if total <= max_tokens: 374 break 375 pairs.pop(0) 376 377 result: list[dict[str, Any]] = [] 378 for user_msg, assistant_msg in pairs: 379 result.append(user_msg) 380 result.append(assistant_msg) 381 return result
Trim conversation history to fit within max_tokens.
Pairs up user/assistant messages and drops the oldest complete pairs first until the estimated total token count fits within the budget.
Arguments:
- history: Flat list of message dictionaries to trim.
- max_tokens: Maximum token budget for the returned history.
Returns:
A (possibly shorter) flat list of message dictionaries that fits within max_tokens.