app.routes.tasks
Background task runners for parsing, analysis, and chat.
This module contains the long-running functions that execute on background
threading.Thread instances:
run_parse-- Parse forensic artifacts via Dissect.run_analysis-- AI-powered analysis of parsed CSV artifacts.run_chat-- Follow-up chat with the AI about analysis results.
Each runner emits SSE progress events through the shared progress stores
defined in routes_state and uses a case-log-context wrapper to
ensure log messages are tagged with the case ID.
Attributes:
- _COMPRESS_FINDINGS_FALLBACK_PROMPT: Fallback prompt for findings compression when the prompt file is missing.
1"""Background task runners for parsing, analysis, and chat. 2 3This module contains the long-running functions that execute on background 4``threading.Thread`` instances: 5 6* ``run_parse`` -- Parse forensic artifacts via Dissect. 7* ``run_analysis`` -- AI-powered analysis of parsed CSV artifacts. 8* ``run_chat`` -- Follow-up chat with the AI about analysis results. 9 10Each runner emits SSE progress events through the shared progress stores 11defined in :mod:`routes_state` and uses a case-log-context wrapper to 12ensure log messages are tagged with the case ID. 13 14Attributes: 15 _COMPRESS_FINDINGS_FALLBACK_PROMPT: Fallback prompt for findings 16 compression when the prompt file is missing. 17""" 18 19from __future__ import annotations 20 21import copy 22import json 23import logging 24import time 25from collections.abc import Mapping 26from pathlib import Path 27from typing import Any 28 29from ..ai_providers import AIProviderError, create_provider 30from ..analyzer import ForensicAnalyzer 31from ..analyzer.core import AnalysisCancelledError 32from ..case_logging import case_log_context 33from ..chat import ChatManager 34from ..parser import ForensicParser 35from .state import ( 36 PROJECT_ROOT, 37 CHAT_HISTORY_MAX_PAIRS, 38 DEFAULT_FORENSIC_SYSTEM_PROMPT, 39 ANALYSIS_PROGRESS, 40 CHAT_PROGRESS, 41 PARSE_PROGRESS, 42 STATE_LOCK, 43 emit_progress, 44 get_cancel_event, 45 get_case, 46 mark_case_status, 47 safe_int, 48 set_progress_status, 49) 50from .artifacts import ( 51 extract_parse_progress, 52 sanitize_prompt, 53) 54from .evidence import ( 55 build_csv_map, 56 collect_case_csv_paths, 57 generate_case_report, 58 resolve_case_csv_output_dir, 59) 60 61__all__ = [ 62 "run_task_with_case_log_context", 63 "run_parse", 64 "run_analysis", 65 "run_chat", 66 "load_case_analysis_results", 67 "resolve_case_investigation_context", 68 "resolve_case_parsed_dir", 69] 70 71LOGGER = logging.getLogger(__name__) 72 73_COMPRESS_FINDINGS_FALLBACK_PROMPT = ( 74 "You are a forensic analysis assistant. Compress per-artifact findings " 75 "while preserving all critical forensic details. Return only the " 76 "compressed text in bullet-point format, no preamble." 77) 78 79 80# --------------------------------------------------------------------------- 81# Prompt / context helpers 82# --------------------------------------------------------------------------- 83 84def _load_forensic_system_prompt() -> str: 85 """Load the forensic AI system prompt from the ``prompts/`` directory. 86 87 Returns: 88 The system prompt string, or the default fallback. 89 """ 90 prompt_path = PROJECT_ROOT / "prompts" / "system_prompt.md" 91 try: 92 prompt_text = prompt_path.read_text(encoding="utf-8").strip() 93 except OSError: 94 LOGGER.warning("Failed to read system prompt from %s; using fallback prompt.", prompt_path) 95 return DEFAULT_FORENSIC_SYSTEM_PROMPT 96 return prompt_text or DEFAULT_FORENSIC_SYSTEM_PROMPT 97 98 99def load_case_analysis_results(case: dict[str, Any]) -> dict[str, Any] | None: 100 """Load analysis results for a case from memory or disk. 101 102 Args: 103 case: The in-memory case state dictionary. 104 105 Returns: 106 Analysis results dict, or ``None``. 107 """ 108 in_memory = case.get("analysis_results") 109 if isinstance(in_memory, dict) and in_memory: 110 return dict(in_memory) 111 112 results_path = Path(case["case_dir"]) / "analysis_results.json" 113 if not results_path.exists(): 114 return dict(in_memory) if isinstance(in_memory, dict) else None 115 116 try: 117 parsed = json.loads(results_path.read_text(encoding="utf-8")) 118 except (OSError, json.JSONDecodeError): 119 LOGGER.warning("Failed to load analysis results from %s", results_path, exc_info=True) 120 return dict(in_memory) if isinstance(in_memory, dict) else None 121 122 if isinstance(parsed, dict): 123 return parsed 124 return dict(in_memory) if isinstance(in_memory, dict) else None 125 126 127def resolve_case_investigation_context(case: dict[str, Any]) -> str: 128 """Resolve the investigation context prompt for a case. 129 130 Args: 131 case: The in-memory case state dictionary. 132 133 Returns: 134 The investigation context string, or empty string. 135 """ 136 context = str(case.get("investigation_context", "")).strip() 137 if context: 138 return context 139 140 prompt_path = Path(case["case_dir"]) / "prompt.txt" 141 if not prompt_path.exists(): 142 return "" 143 144 try: 145 return prompt_path.read_text(encoding="utf-8") 146 except OSError: 147 LOGGER.warning("Failed to read investigation context prompt at %s", prompt_path, exc_info=True) 148 return "" 149 150 151def resolve_case_parsed_dir(case: dict[str, Any]) -> Path: 152 """Resolve the directory containing parsed CSV files for a case. 153 154 Args: 155 case: The in-memory case state dictionary. 156 157 Returns: 158 Path to the parsed CSV directory. 159 """ 160 csv_output_dir = str(case.get("csv_output_dir", "")).strip() 161 if csv_output_dir: 162 return Path(csv_output_dir) 163 164 csv_paths = collect_case_csv_paths(case) 165 if csv_paths: 166 return csv_paths[0].parent 167 168 return Path(case["case_dir"]) / "parsed" 169 170 171def _render_chat_messages_for_provider(messages: list[dict[str, str]]) -> str: 172 """Render chat messages into a single prompt string for the AI. 173 174 Args: 175 messages: Ordered list of message dicts with ``role`` and ``content``. 176 177 Returns: 178 Formatted multi-section prompt string. 179 """ 180 rendered_sections: list[str] = [] 181 first_user_rendered = False 182 last_user_index = -1 183 for index, message in enumerate(messages): 184 role = str(message.get("role", "")).strip().lower() 185 content = str(message.get("content", "")).strip() 186 if role == "user" and content: 187 last_user_index = index 188 189 for index, message in enumerate(messages): 190 role = str(message.get("role", "")).strip().lower() 191 content = str(message.get("content", "")).strip() 192 if not content or role == "system": 193 continue 194 195 if role == "user" and not first_user_rendered: 196 rendered_sections.append(f"Context Block:\n{content}") 197 first_user_rendered = True 198 continue 199 200 if role == "user" and index == last_user_index: 201 rendered_sections.append(f"New User Question:\n{content}") 202 continue 203 204 label = "User" if role == "user" else "Assistant" if role == "assistant" else role.title() 205 rendered_sections.append(f"{label}:\n{content}") 206 207 return "\n\n".join(rendered_sections).strip() 208 209 210def _load_compress_findings_prompt() -> str: 211 """Load the prompt used to compress per-artifact findings with AI. 212 213 Returns: 214 The compression system prompt string. 215 """ 216 prompt_path = PROJECT_ROOT / "prompts" / "compress_findings.md" 217 try: 218 prompt_text = prompt_path.read_text(encoding="utf-8").strip() 219 except OSError: 220 LOGGER.warning("Failed to read compress findings prompt from %s; using fallback.", prompt_path) 221 return _COMPRESS_FINDINGS_FALLBACK_PROMPT 222 return prompt_text or _COMPRESS_FINDINGS_FALLBACK_PROMPT 223 224 225def _compress_findings_with_ai( 226 provider: Any, 227 findings_text: str, 228 max_tokens: int, 229) -> str | None: 230 """Use the AI provider to compress per-artifact findings. 231 232 Args: 233 provider: AI provider instance with an ``analyze`` method. 234 findings_text: Full per-artifact findings text. 235 max_tokens: Configured max token budget. 236 237 Returns: 238 Compressed findings text, or ``None`` on failure. 239 """ 240 if not findings_text or not findings_text.strip(): 241 return None 242 243 target_tokens = max(200, int(max_tokens * 0.25)) 244 try: 245 compressed = provider.analyze( 246 system_prompt=_load_compress_findings_prompt(), 247 user_prompt=( 248 f"Compress the following per-artifact forensic findings to " 249 f"roughly {target_tokens} tokens. Keep the bullet-point " 250 f"format (\"- artifact: summary\"). Preserve every " 251 f"suspicious indicator, timestamp, path, and conclusion.\n\n" 252 f"{findings_text}" 253 ), 254 max_tokens=target_tokens, 255 ) 256 result = str(compressed).strip() 257 return result if result else None 258 except (AIProviderError, Exception): 259 LOGGER.warning( 260 "AI-powered findings compression failed; falling back to full context.", 261 exc_info=True, 262 ) 263 return None 264 265 266def _resolve_chat_max_tokens(config: dict[str, Any]) -> int: 267 """Resolve the maximum token count for chat from config. 268 269 Args: 270 config: Full application configuration dict. 271 272 Returns: 273 Positive integer token limit. 274 275 Raises: 276 ValueError: If the setting is missing or invalid. 277 """ 278 analysis_config = config.get("analysis", {}) 279 if not isinstance(analysis_config, dict): 280 raise ValueError( 281 "Chat max tokens are not configured. Set `analysis.ai_max_tokens` in Settings." 282 ) 283 284 if "ai_max_tokens" not in analysis_config: 285 raise ValueError( 286 "Chat max tokens are not configured. Set `analysis.ai_max_tokens` in Settings." 287 ) 288 289 try: 290 resolved = int(analysis_config.get("ai_max_tokens")) 291 except (TypeError, ValueError): 292 raise ValueError( 293 "Invalid `analysis.ai_max_tokens` value in Settings. Provide a positive integer." 294 ) from None 295 296 if resolved <= 0: 297 raise ValueError( 298 "Invalid `analysis.ai_max_tokens` value in Settings. Provide a positive integer." 299 ) 300 return resolved 301 302 303# --------------------------------------------------------------------------- 304# Case-log-context wrapper (replaces three duplicate wrappers) 305# --------------------------------------------------------------------------- 306 307def run_task_with_case_log_context( 308 case_id: str, 309 task_fn: Any, 310 *args: Any, 311 **kwargs: Any, 312) -> None: 313 """Run a background task function within case-scoped logging context. 314 315 This replaces the three near-identical ``_run_*_with_case_log_context`` 316 wrappers with a single generic version. 317 318 Args: 319 case_id: UUID of the case (used for log tagging). 320 task_fn: The callable to invoke. 321 *args: Positional arguments forwarded to *task_fn*. 322 **kwargs: Keyword arguments forwarded to *task_fn*. 323 """ 324 with case_log_context(case_id): 325 task_fn(*args, **kwargs) 326 327 328# --------------------------------------------------------------------------- 329# Background task: parse 330# --------------------------------------------------------------------------- 331 332def run_parse( 333 case_id: str, 334 parse_artifacts: list[str], 335 analysis_artifacts: list[str], 336 artifact_options: list[dict[str, str]], 337 config_snapshot: dict[str, Any], 338) -> None: 339 """Execute background parsing of selected forensic artifacts. 340 341 Args: 342 case_id: UUID of the case. 343 parse_artifacts: Artifact keys to parse. 344 analysis_artifacts: Subset for AI analysis. 345 artifact_options: Canonical artifact option dicts. 346 config_snapshot: Deep copy of application config. 347 """ 348 cancel_event = get_cancel_event(PARSE_PROGRESS, case_id) 349 case = get_case(case_id) 350 if case is None: 351 set_progress_status(PARSE_PROGRESS, case_id, "failed", "Case not found.") 352 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "Case not found."}) 353 return 354 355 with STATE_LOCK: 356 evidence_path = str(case.get("evidence_path", "")).strip() 357 case_dir = case["case_dir"] 358 audit_logger = case["audit"] 359 case_snapshot = dict(case) 360 361 if not evidence_path: 362 mark_case_status(case_id, "failed") 363 set_progress_status(PARSE_PROGRESS, case_id, "failed", "No evidence available for parsing.") 364 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "No evidence available for parsing."}) 365 return 366 367 try: 368 csv_output_dir = resolve_case_csv_output_dir(case_snapshot, config_snapshot=config_snapshot) 369 with ForensicParser( 370 evidence_path=evidence_path, 371 case_dir=case_dir, 372 audit_logger=audit_logger, 373 parsed_dir=csv_output_dir, 374 ) as parser: 375 results: list[dict[str, Any]] = [] 376 total = len(parse_artifacts) 377 378 for index, artifact in enumerate(parse_artifacts, start=1): 379 if cancel_event is not None and cancel_event.is_set(): 380 LOGGER.info("Parsing cancelled for case %s before artifact %s", case_id, artifact) 381 return 382 emit_progress( 383 PARSE_PROGRESS, case_id, 384 {"type": "artifact_started", "artifact_key": artifact, "index": index, "total": total}, 385 ) 386 387 def _progress_callback(*args: Any, **_kwargs: Any) -> None: 388 """Emit per-artifact parse progress events.""" 389 artifact_key, record_count = extract_parse_progress(artifact, args) 390 emit_progress( 391 PARSE_PROGRESS, case_id, 392 {"type": "artifact_progress", "artifact_key": artifact_key, "record_count": record_count}, 393 ) 394 395 result = parser.parse_artifact(artifact, progress_callback=_progress_callback) 396 result_entry = {"artifact_key": artifact, **result} 397 results.append(result_entry) 398 399 emit_progress( 400 PARSE_PROGRESS, case_id, 401 { 402 "type": "artifact_completed" if result.get("success") else "artifact_failed", 403 "artifact_key": artifact, 404 "record_count": safe_int(result.get("record_count", 0)), 405 "duration_seconds": float(result.get("duration_seconds", 0.0)), 406 "csv_path": str(result.get("csv_path", "")), 407 "error": result.get("error"), 408 }, 409 ) 410 411 csv_map = build_csv_map(results) 412 with STATE_LOCK: 413 case["selected_artifacts"] = list(parse_artifacts) 414 case["analysis_artifacts"] = list(analysis_artifacts) 415 case["artifact_options"] = list(artifact_options) 416 case["parse_results"] = results 417 case["artifact_csv_paths"] = csv_map 418 case["csv_output_dir"] = str(csv_output_dir) 419 420 completed = sum(1 for item in results if item.get("success")) 421 failed = len(results) - completed 422 set_progress_status(PARSE_PROGRESS, case_id, "completed") 423 emit_progress( 424 PARSE_PROGRESS, case_id, 425 { 426 "type": "parse_completed", 427 "total_artifacts": len(results), 428 "successful_artifacts": completed, 429 "failed_artifacts": failed, 430 }, 431 ) 432 mark_case_status(case_id, "parsed") 433 except Exception: 434 LOGGER.exception("Background parse failed for case %s", case_id) 435 user_message = ( 436 "Parsing failed due to an internal error. " 437 "Check logs and retry after confirming the evidence file is readable." 438 ) 439 mark_case_status(case_id, "error") 440 set_progress_status(PARSE_PROGRESS, case_id, "failed", user_message) 441 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": user_message}) 442 443 444# --------------------------------------------------------------------------- 445# Background task: analysis 446# --------------------------------------------------------------------------- 447 448def _purge_stale_analysis(case: dict[str, Any], case_dir: str) -> None: 449 """Clear in-memory and on-disk analysis results after a failed run. 450 451 This prevents stale findings from a prior successful analysis from 452 being served via chat, report, or download routes after a re-analysis 453 fails or is cancelled. 454 455 Args: 456 case: The in-memory case state dictionary. 457 case_dir: Path string to the case directory. 458 """ 459 with STATE_LOCK: 460 case["analysis_results"] = {} 461 results_path = Path(case_dir) / "analysis_results.json" 462 if results_path.exists(): 463 results_path.unlink(missing_ok=True) 464 465 466def run_analysis(case_id: str, prompt: str, config_snapshot: dict[str, Any]) -> None: 467 """Execute background AI-powered forensic analysis. 468 469 Args: 470 case_id: UUID of the case. 471 prompt: Investigation context / user prompt. 472 config_snapshot: Deep copy of application config. 473 """ 474 cancel_event = get_cancel_event(ANALYSIS_PROGRESS, case_id) 475 case = get_case(case_id) 476 if case is None: 477 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", "Case not found.") 478 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": "Case not found."}) 479 return 480 481 with STATE_LOCK: 482 csv_map = dict(case.get("artifact_csv_paths", {})) 483 parse_results_snapshot = list(case.get("parse_results", [])) 484 analysis_artifacts_state = case.get("analysis_artifacts") 485 selected_artifacts_snapshot = list(case.get("selected_artifacts", [])) 486 case_dir = case["case_dir"] 487 audit_logger = case["audit"] 488 image_metadata_snapshot = dict(case.get("image_metadata", {})) 489 os_type_snapshot = str(case.get("os_type") or "unknown") 490 artifact_options_snapshot = list(case.get("artifact_options", [])) 491 analysis_date_range = case.get("analysis_date_range") 492 493 if not csv_map: 494 csv_map = build_csv_map(parse_results_snapshot) 495 if isinstance(analysis_artifacts_state, list): 496 artifacts = [str(item) for item in analysis_artifacts_state if str(item) in csv_map] 497 else: 498 artifacts = [item for item in selected_artifacts_snapshot if item in csv_map] 499 if not artifacts and not isinstance(analysis_artifacts_state, list): 500 artifacts = sorted(csv_map.keys()) 501 if not artifacts: 502 message = ( 503 "No parsed CSV artifacts are marked `Parse and use in AI`." 504 if isinstance(analysis_artifacts_state, list) 505 else "No parsed CSV artifacts available." 506 ) 507 mark_case_status(case_id, "failed") 508 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", message) 509 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": message}) 510 return 511 512 try: 513 analyzer = ForensicAnalyzer( 514 case_dir=case_dir, 515 config=config_snapshot, 516 audit_logger=audit_logger, 517 artifact_csv_paths=csv_map, 518 os_type=os_type_snapshot, 519 ) 520 metadata = dict(image_metadata_snapshot) 521 metadata["os_type"] = os_type_snapshot 522 metadata["artifact_csv_paths"] = csv_map 523 metadata["parse_results"] = parse_results_snapshot 524 metadata["analysis_artifacts"] = list(artifacts) 525 metadata["artifact_options"] = artifact_options_snapshot 526 if isinstance(analysis_date_range, dict): 527 metadata["analysis_date_range"] = { 528 "start_date": str(analysis_date_range.get("start_date", "")).strip(), 529 "end_date": str(analysis_date_range.get("end_date", "")).strip(), 530 } 531 532 def _analysis_progress(*args: Any) -> None: 533 """Emit per-artifact analysis progress events.""" 534 artifact_key = "" 535 status = "" 536 result: dict[str, Any] = {} 537 538 if len(args) >= 3: 539 artifact_key = str(args[0]) 540 status = str(args[1]) 541 result_payload = args[2] 542 if isinstance(result_payload, dict): 543 result = dict(result_payload) 544 elif len(args) == 1 and isinstance(args[0], dict): 545 payload = args[0] 546 artifact_key = str(payload.get("artifact_key", "")) 547 status = str(payload.get("status", "")) 548 result_payload = payload.get("result") 549 if isinstance(result_payload, dict): 550 result = dict(result_payload) 551 else: 552 return 553 554 if status == "started": 555 emit_progress(ANALYSIS_PROGRESS, case_id, { 556 "type": "artifact_analysis_started", "artifact_key": artifact_key, "result": result, 557 }) 558 return 559 560 if status == "thinking": 561 emit_progress(ANALYSIS_PROGRESS, case_id, { 562 "type": "artifact_analysis_thinking", "artifact_key": artifact_key, "result": result, 563 }) 564 return 565 566 emit_progress(ANALYSIS_PROGRESS, case_id, { 567 "type": "artifact_analysis_completed", 568 "artifact_key": artifact_key, 569 "status": status or "complete", 570 "result": result, 571 }) 572 573 output = analyzer.run_full_analysis( 574 artifact_keys=artifacts, 575 investigation_context=prompt, 576 metadata=metadata, 577 progress_callback=_analysis_progress, 578 cancel_check=(lambda: cancel_event.is_set()) if cancel_event is not None else None, 579 ) 580 analysis_results_path = Path(case_dir) / "analysis_results.json" 581 with analysis_results_path.open("w", encoding="utf-8") as analysis_results_file: 582 json.dump(output, analysis_results_file, indent=2, ensure_ascii=True) 583 analysis_results_file.write("\n") 584 with STATE_LOCK: 585 case["investigation_context"] = prompt 586 case["analysis_results"] = output 587 588 emit_progress(ANALYSIS_PROGRESS, case_id, { 589 "type": "analysis_summary", 590 "summary": str(output.get("summary", "")), 591 "model_info": output.get("model_info", {}), 592 }) 593 set_progress_status(ANALYSIS_PROGRESS, case_id, "completed") 594 emit_progress(ANALYSIS_PROGRESS, case_id, { 595 "type": "analysis_completed", 596 "artifact_count": len(output.get("per_artifact", [])), 597 "per_artifact": list(output.get("per_artifact", [])), 598 }) 599 mark_case_status(case_id, "completed") 600 601 # Auto-generate the HTML report so it's ready for download. 602 try: 603 report_result = generate_case_report(case_id) 604 if report_result.get("success"): 605 LOGGER.info( 606 "Auto-generated report for case %s: %s", 607 case_id, report_result["report_path"].name, 608 ) 609 else: 610 LOGGER.warning( 611 "Auto-report generation failed for case %s: %s", 612 case_id, report_result.get("error", "unknown error"), 613 ) 614 except Exception: 615 LOGGER.warning( 616 "Auto-report generation raised an exception for case %s", 617 case_id, exc_info=True, 618 ) 619 except AnalysisCancelledError: 620 LOGGER.info("Analysis cancelled for case %s", case_id) 621 except Exception: 622 LOGGER.exception("Background analysis failed for case %s", case_id) 623 _purge_stale_analysis(case, case_dir) 624 user_message = ( 625 "Analysis failed due to an internal error. " 626 "Verify provider settings and retry." 627 ) 628 mark_case_status(case_id, "error") 629 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", user_message) 630 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": user_message}) 631 632 633# --------------------------------------------------------------------------- 634# Background task: chat 635# --------------------------------------------------------------------------- 636 637def run_chat(case_id: str, message: str, config_snapshot: dict[str, Any]) -> None: 638 """Execute a background chat interaction about analysis results. 639 640 Args: 641 case_id: UUID of the case. 642 message: The user's chat message. 643 config_snapshot: Deep copy of application config. 644 """ 645 case = get_case(case_id) 646 if case is None: 647 set_progress_status(CHAT_PROGRESS, case_id, "failed", "Case not found.") 648 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Case not found."}) 649 return 650 651 with STATE_LOCK: 652 case_snapshot = dict(case) 653 audit_logger = case["audit"] 654 655 analysis_results = load_case_analysis_results(case_snapshot) 656 if not analysis_results: 657 message_text = "No analysis results available for this case. Run analysis first." 658 set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text) 659 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text}) 660 return 661 662 if not isinstance(config_snapshot, dict): 663 set_progress_status(CHAT_PROGRESS, case_id, "failed", "Invalid in-memory configuration state.") 664 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Invalid in-memory configuration state."}) 665 return 666 667 try: 668 chat_max_tokens = _resolve_chat_max_tokens(config_snapshot) 669 except ValueError as error: 670 message_text = str(error) 671 LOGGER.warning("Chat configuration rejected for case %s: %s", case_id, message_text) 672 set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text) 673 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text}) 674 return 675 676 case_dir = case_snapshot["case_dir"] 677 chat_manager = ChatManager(case_dir, max_context_tokens=chat_max_tokens) 678 history_snapshot = chat_manager.get_history() 679 message_index = ( 680 sum(1 for entry in history_snapshot if str(entry.get("role", "")).strip().lower() == "user") + 1 681 ) 682 audit_logger.log( 683 "chat_message_sent", 684 { 685 "message_index": message_index, 686 "message": sanitize_prompt(message, max_chars=8000), 687 }, 688 ) 689 690 try: 691 prompt_budget = int(chat_max_tokens * 0.8) 692 provider = create_provider(copy.deepcopy(config_snapshot)) 693 694 investigation_context = resolve_case_investigation_context(case_snapshot) 695 image_metadata = dict(case_snapshot.get("image_metadata", {})) 696 697 context_block = chat_manager.build_chat_context( 698 analysis_results=analysis_results, 699 investigation_context=investigation_context, 700 metadata=image_metadata, 701 ) 702 703 if chat_manager.context_needs_compression(context_block, prompt_budget): 704 per_artifact_text = chat_manager._format_per_artifact_findings( 705 analysis_results if isinstance(analysis_results, Mapping) else {}, 706 ) 707 compressed = _compress_findings_with_ai(provider, per_artifact_text, chat_max_tokens) 708 if compressed: 709 context_block = chat_manager.rebuild_context_with_compressed_findings( 710 analysis_results=analysis_results, 711 investigation_context=investigation_context, 712 metadata=image_metadata, 713 compressed_findings=compressed, 714 ) 715 716 retrieved_payload = chat_manager.retrieve_csv_data( 717 question=message, 718 parsed_dir=resolve_case_parsed_dir(case_snapshot), 719 ) 720 retrieved_artifacts: list[str] = [] 721 if isinstance(retrieved_payload.get("artifacts"), list): 722 retrieved_artifacts = [ 723 str(item).strip() 724 for item in retrieved_payload.get("artifacts", []) 725 if str(item).strip() 726 ] 727 728 message_for_ai = message 729 retrieved_data = str(retrieved_payload.get("data", "")).strip() 730 if bool(retrieved_payload.get("retrieved")) and retrieved_data: 731 message_for_ai = ( 732 "Retrieved CSV data for this question:\n" 733 f"{retrieved_data}\n\n" 734 "User question:\n" 735 f"{message}" 736 ) 737 audit_logger.log( 738 "chat_data_retrieval", 739 { 740 "message_index": message_index, 741 "artifacts": list(retrieved_artifacts), 742 "rows_returned": retrieved_data.count("\n"), 743 }, 744 ) 745 746 system_prompt = _load_forensic_system_prompt() 747 748 fixed_tokens = ( 749 chat_manager.estimate_token_count(system_prompt) 750 + chat_manager.estimate_token_count(context_block) 751 + chat_manager.estimate_token_count(message_for_ai) 752 ) 753 history_budget = max(0, prompt_budget - fixed_tokens) 754 755 recent_history = chat_manager.get_recent_history(max_pairs=CHAT_HISTORY_MAX_PAIRS) 756 fitted_history = chat_manager.fit_history(recent_history, history_budget) 757 758 ai_messages: list[dict[str, str]] = [ 759 {"role": "system", "content": system_prompt}, 760 {"role": "user", "content": context_block}, 761 ] 762 for history_message in fitted_history: 763 role = str(history_message.get("role", "")).strip().lower() 764 content = str(history_message.get("content", "")).strip() 765 if role in {"user", "assistant"} and content: 766 ai_messages.append({"role": role, "content": content}) 767 ai_messages.append({"role": "user", "content": message_for_ai}) 768 chat_user_prompt = _render_chat_messages_for_provider(ai_messages) 769 if not chat_user_prompt: 770 chat_user_prompt = ( 771 f"Context Block:\n{context_block}\n\n" 772 f"New User Question:\n{message_for_ai}" 773 ) 774 started_at = time.perf_counter() 775 chunks: list[str] = [] 776 chat_response_max_tokens = max(1, int(chat_max_tokens * 0.2)) 777 for chunk in provider.analyze_stream( 778 system_prompt=system_prompt, 779 user_prompt=chat_user_prompt, 780 max_tokens=chat_response_max_tokens, 781 ): 782 chunk_text = str(chunk) 783 if not chunk_text: 784 continue 785 chunks.append(chunk_text) 786 emit_progress(CHAT_PROGRESS, case_id, {"type": "token", "content": chunk_text}) 787 788 response_text = "".join(chunks).strip() 789 duration_ms = int((time.perf_counter() - started_at) * 1000) 790 if not response_text: 791 raise AIProviderError("Provider returned an empty response.") 792 793 chat_manager.add_message("user", message, metadata={"message_index": message_index}) 794 assistant_metadata: dict[str, Any] = {"message_index": message_index} 795 if retrieved_artifacts: 796 assistant_metadata["data_retrieved"] = list(retrieved_artifacts) 797 chat_manager.add_message("assistant", response_text, metadata=assistant_metadata) 798 799 audit_logger.log( 800 "chat_response_received", 801 { 802 "message_index": message_index, 803 "duration_ms": duration_ms, 804 "response_tokens_estimate": chat_manager.estimate_token_count(response_text), 805 "data_retrieved": bool(retrieved_artifacts), 806 "retrieved_artifacts": list(retrieved_artifacts), 807 }, 808 ) 809 810 set_progress_status(CHAT_PROGRESS, case_id, "completed") 811 emit_progress(CHAT_PROGRESS, case_id, { 812 "type": "done", 813 "data_retrieved": list(retrieved_artifacts), 814 }) 815 except ValueError as error: 816 LOGGER.warning("Chat request rejected for case %s: %s", case_id, error) 817 set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error)) 818 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)}) 819 except AIProviderError as error: 820 LOGGER.warning("Chat provider request failed for case %s: %s", case_id, error) 821 set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error)) 822 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)}) 823 except Exception: 824 LOGGER.exception("Unexpected failure during chat for case %s", case_id) 825 error_message = "Unexpected error while generating chat response." 826 set_progress_status(CHAT_PROGRESS, case_id, "failed", error_message) 827 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": error_message})
308def run_task_with_case_log_context( 309 case_id: str, 310 task_fn: Any, 311 *args: Any, 312 **kwargs: Any, 313) -> None: 314 """Run a background task function within case-scoped logging context. 315 316 This replaces the three near-identical ``_run_*_with_case_log_context`` 317 wrappers with a single generic version. 318 319 Args: 320 case_id: UUID of the case (used for log tagging). 321 task_fn: The callable to invoke. 322 *args: Positional arguments forwarded to *task_fn*. 323 **kwargs: Keyword arguments forwarded to *task_fn*. 324 """ 325 with case_log_context(case_id): 326 task_fn(*args, **kwargs)
Run a background task function within case-scoped logging context.
This replaces the three near-identical _run_*_with_case_log_context
wrappers with a single generic version.
Arguments:
- case_id: UUID of the case (used for log tagging).
- task_fn: The callable to invoke.
- ***args:** Positional arguments forwarded to task_fn.
- ****kwargs:** Keyword arguments forwarded to task_fn.
333def run_parse( 334 case_id: str, 335 parse_artifacts: list[str], 336 analysis_artifacts: list[str], 337 artifact_options: list[dict[str, str]], 338 config_snapshot: dict[str, Any], 339) -> None: 340 """Execute background parsing of selected forensic artifacts. 341 342 Args: 343 case_id: UUID of the case. 344 parse_artifacts: Artifact keys to parse. 345 analysis_artifacts: Subset for AI analysis. 346 artifact_options: Canonical artifact option dicts. 347 config_snapshot: Deep copy of application config. 348 """ 349 cancel_event = get_cancel_event(PARSE_PROGRESS, case_id) 350 case = get_case(case_id) 351 if case is None: 352 set_progress_status(PARSE_PROGRESS, case_id, "failed", "Case not found.") 353 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "Case not found."}) 354 return 355 356 with STATE_LOCK: 357 evidence_path = str(case.get("evidence_path", "")).strip() 358 case_dir = case["case_dir"] 359 audit_logger = case["audit"] 360 case_snapshot = dict(case) 361 362 if not evidence_path: 363 mark_case_status(case_id, "failed") 364 set_progress_status(PARSE_PROGRESS, case_id, "failed", "No evidence available for parsing.") 365 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "No evidence available for parsing."}) 366 return 367 368 try: 369 csv_output_dir = resolve_case_csv_output_dir(case_snapshot, config_snapshot=config_snapshot) 370 with ForensicParser( 371 evidence_path=evidence_path, 372 case_dir=case_dir, 373 audit_logger=audit_logger, 374 parsed_dir=csv_output_dir, 375 ) as parser: 376 results: list[dict[str, Any]] = [] 377 total = len(parse_artifacts) 378 379 for index, artifact in enumerate(parse_artifacts, start=1): 380 if cancel_event is not None and cancel_event.is_set(): 381 LOGGER.info("Parsing cancelled for case %s before artifact %s", case_id, artifact) 382 return 383 emit_progress( 384 PARSE_PROGRESS, case_id, 385 {"type": "artifact_started", "artifact_key": artifact, "index": index, "total": total}, 386 ) 387 388 def _progress_callback(*args: Any, **_kwargs: Any) -> None: 389 """Emit per-artifact parse progress events.""" 390 artifact_key, record_count = extract_parse_progress(artifact, args) 391 emit_progress( 392 PARSE_PROGRESS, case_id, 393 {"type": "artifact_progress", "artifact_key": artifact_key, "record_count": record_count}, 394 ) 395 396 result = parser.parse_artifact(artifact, progress_callback=_progress_callback) 397 result_entry = {"artifact_key": artifact, **result} 398 results.append(result_entry) 399 400 emit_progress( 401 PARSE_PROGRESS, case_id, 402 { 403 "type": "artifact_completed" if result.get("success") else "artifact_failed", 404 "artifact_key": artifact, 405 "record_count": safe_int(result.get("record_count", 0)), 406 "duration_seconds": float(result.get("duration_seconds", 0.0)), 407 "csv_path": str(result.get("csv_path", "")), 408 "error": result.get("error"), 409 }, 410 ) 411 412 csv_map = build_csv_map(results) 413 with STATE_LOCK: 414 case["selected_artifacts"] = list(parse_artifacts) 415 case["analysis_artifacts"] = list(analysis_artifacts) 416 case["artifact_options"] = list(artifact_options) 417 case["parse_results"] = results 418 case["artifact_csv_paths"] = csv_map 419 case["csv_output_dir"] = str(csv_output_dir) 420 421 completed = sum(1 for item in results if item.get("success")) 422 failed = len(results) - completed 423 set_progress_status(PARSE_PROGRESS, case_id, "completed") 424 emit_progress( 425 PARSE_PROGRESS, case_id, 426 { 427 "type": "parse_completed", 428 "total_artifacts": len(results), 429 "successful_artifacts": completed, 430 "failed_artifacts": failed, 431 }, 432 ) 433 mark_case_status(case_id, "parsed") 434 except Exception: 435 LOGGER.exception("Background parse failed for case %s", case_id) 436 user_message = ( 437 "Parsing failed due to an internal error. " 438 "Check logs and retry after confirming the evidence file is readable." 439 ) 440 mark_case_status(case_id, "error") 441 set_progress_status(PARSE_PROGRESS, case_id, "failed", user_message) 442 emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": user_message})
Execute background parsing of selected forensic artifacts.
Arguments:
- case_id: UUID of the case.
- parse_artifacts: Artifact keys to parse.
- analysis_artifacts: Subset for AI analysis.
- artifact_options: Canonical artifact option dicts.
- config_snapshot: Deep copy of application config.
467def run_analysis(case_id: str, prompt: str, config_snapshot: dict[str, Any]) -> None: 468 """Execute background AI-powered forensic analysis. 469 470 Args: 471 case_id: UUID of the case. 472 prompt: Investigation context / user prompt. 473 config_snapshot: Deep copy of application config. 474 """ 475 cancel_event = get_cancel_event(ANALYSIS_PROGRESS, case_id) 476 case = get_case(case_id) 477 if case is None: 478 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", "Case not found.") 479 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": "Case not found."}) 480 return 481 482 with STATE_LOCK: 483 csv_map = dict(case.get("artifact_csv_paths", {})) 484 parse_results_snapshot = list(case.get("parse_results", [])) 485 analysis_artifacts_state = case.get("analysis_artifacts") 486 selected_artifacts_snapshot = list(case.get("selected_artifacts", [])) 487 case_dir = case["case_dir"] 488 audit_logger = case["audit"] 489 image_metadata_snapshot = dict(case.get("image_metadata", {})) 490 os_type_snapshot = str(case.get("os_type") or "unknown") 491 artifact_options_snapshot = list(case.get("artifact_options", [])) 492 analysis_date_range = case.get("analysis_date_range") 493 494 if not csv_map: 495 csv_map = build_csv_map(parse_results_snapshot) 496 if isinstance(analysis_artifacts_state, list): 497 artifacts = [str(item) for item in analysis_artifacts_state if str(item) in csv_map] 498 else: 499 artifacts = [item for item in selected_artifacts_snapshot if item in csv_map] 500 if not artifacts and not isinstance(analysis_artifacts_state, list): 501 artifacts = sorted(csv_map.keys()) 502 if not artifacts: 503 message = ( 504 "No parsed CSV artifacts are marked `Parse and use in AI`." 505 if isinstance(analysis_artifacts_state, list) 506 else "No parsed CSV artifacts available." 507 ) 508 mark_case_status(case_id, "failed") 509 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", message) 510 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": message}) 511 return 512 513 try: 514 analyzer = ForensicAnalyzer( 515 case_dir=case_dir, 516 config=config_snapshot, 517 audit_logger=audit_logger, 518 artifact_csv_paths=csv_map, 519 os_type=os_type_snapshot, 520 ) 521 metadata = dict(image_metadata_snapshot) 522 metadata["os_type"] = os_type_snapshot 523 metadata["artifact_csv_paths"] = csv_map 524 metadata["parse_results"] = parse_results_snapshot 525 metadata["analysis_artifacts"] = list(artifacts) 526 metadata["artifact_options"] = artifact_options_snapshot 527 if isinstance(analysis_date_range, dict): 528 metadata["analysis_date_range"] = { 529 "start_date": str(analysis_date_range.get("start_date", "")).strip(), 530 "end_date": str(analysis_date_range.get("end_date", "")).strip(), 531 } 532 533 def _analysis_progress(*args: Any) -> None: 534 """Emit per-artifact analysis progress events.""" 535 artifact_key = "" 536 status = "" 537 result: dict[str, Any] = {} 538 539 if len(args) >= 3: 540 artifact_key = str(args[0]) 541 status = str(args[1]) 542 result_payload = args[2] 543 if isinstance(result_payload, dict): 544 result = dict(result_payload) 545 elif len(args) == 1 and isinstance(args[0], dict): 546 payload = args[0] 547 artifact_key = str(payload.get("artifact_key", "")) 548 status = str(payload.get("status", "")) 549 result_payload = payload.get("result") 550 if isinstance(result_payload, dict): 551 result = dict(result_payload) 552 else: 553 return 554 555 if status == "started": 556 emit_progress(ANALYSIS_PROGRESS, case_id, { 557 "type": "artifact_analysis_started", "artifact_key": artifact_key, "result": result, 558 }) 559 return 560 561 if status == "thinking": 562 emit_progress(ANALYSIS_PROGRESS, case_id, { 563 "type": "artifact_analysis_thinking", "artifact_key": artifact_key, "result": result, 564 }) 565 return 566 567 emit_progress(ANALYSIS_PROGRESS, case_id, { 568 "type": "artifact_analysis_completed", 569 "artifact_key": artifact_key, 570 "status": status or "complete", 571 "result": result, 572 }) 573 574 output = analyzer.run_full_analysis( 575 artifact_keys=artifacts, 576 investigation_context=prompt, 577 metadata=metadata, 578 progress_callback=_analysis_progress, 579 cancel_check=(lambda: cancel_event.is_set()) if cancel_event is not None else None, 580 ) 581 analysis_results_path = Path(case_dir) / "analysis_results.json" 582 with analysis_results_path.open("w", encoding="utf-8") as analysis_results_file: 583 json.dump(output, analysis_results_file, indent=2, ensure_ascii=True) 584 analysis_results_file.write("\n") 585 with STATE_LOCK: 586 case["investigation_context"] = prompt 587 case["analysis_results"] = output 588 589 emit_progress(ANALYSIS_PROGRESS, case_id, { 590 "type": "analysis_summary", 591 "summary": str(output.get("summary", "")), 592 "model_info": output.get("model_info", {}), 593 }) 594 set_progress_status(ANALYSIS_PROGRESS, case_id, "completed") 595 emit_progress(ANALYSIS_PROGRESS, case_id, { 596 "type": "analysis_completed", 597 "artifact_count": len(output.get("per_artifact", [])), 598 "per_artifact": list(output.get("per_artifact", [])), 599 }) 600 mark_case_status(case_id, "completed") 601 602 # Auto-generate the HTML report so it's ready for download. 603 try: 604 report_result = generate_case_report(case_id) 605 if report_result.get("success"): 606 LOGGER.info( 607 "Auto-generated report for case %s: %s", 608 case_id, report_result["report_path"].name, 609 ) 610 else: 611 LOGGER.warning( 612 "Auto-report generation failed for case %s: %s", 613 case_id, report_result.get("error", "unknown error"), 614 ) 615 except Exception: 616 LOGGER.warning( 617 "Auto-report generation raised an exception for case %s", 618 case_id, exc_info=True, 619 ) 620 except AnalysisCancelledError: 621 LOGGER.info("Analysis cancelled for case %s", case_id) 622 except Exception: 623 LOGGER.exception("Background analysis failed for case %s", case_id) 624 _purge_stale_analysis(case, case_dir) 625 user_message = ( 626 "Analysis failed due to an internal error. " 627 "Verify provider settings and retry." 628 ) 629 mark_case_status(case_id, "error") 630 set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", user_message) 631 emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": user_message})
Execute background AI-powered forensic analysis.
Arguments:
- case_id: UUID of the case.
- prompt: Investigation context / user prompt.
- config_snapshot: Deep copy of application config.
638def run_chat(case_id: str, message: str, config_snapshot: dict[str, Any]) -> None: 639 """Execute a background chat interaction about analysis results. 640 641 Args: 642 case_id: UUID of the case. 643 message: The user's chat message. 644 config_snapshot: Deep copy of application config. 645 """ 646 case = get_case(case_id) 647 if case is None: 648 set_progress_status(CHAT_PROGRESS, case_id, "failed", "Case not found.") 649 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Case not found."}) 650 return 651 652 with STATE_LOCK: 653 case_snapshot = dict(case) 654 audit_logger = case["audit"] 655 656 analysis_results = load_case_analysis_results(case_snapshot) 657 if not analysis_results: 658 message_text = "No analysis results available for this case. Run analysis first." 659 set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text) 660 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text}) 661 return 662 663 if not isinstance(config_snapshot, dict): 664 set_progress_status(CHAT_PROGRESS, case_id, "failed", "Invalid in-memory configuration state.") 665 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Invalid in-memory configuration state."}) 666 return 667 668 try: 669 chat_max_tokens = _resolve_chat_max_tokens(config_snapshot) 670 except ValueError as error: 671 message_text = str(error) 672 LOGGER.warning("Chat configuration rejected for case %s: %s", case_id, message_text) 673 set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text) 674 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text}) 675 return 676 677 case_dir = case_snapshot["case_dir"] 678 chat_manager = ChatManager(case_dir, max_context_tokens=chat_max_tokens) 679 history_snapshot = chat_manager.get_history() 680 message_index = ( 681 sum(1 for entry in history_snapshot if str(entry.get("role", "")).strip().lower() == "user") + 1 682 ) 683 audit_logger.log( 684 "chat_message_sent", 685 { 686 "message_index": message_index, 687 "message": sanitize_prompt(message, max_chars=8000), 688 }, 689 ) 690 691 try: 692 prompt_budget = int(chat_max_tokens * 0.8) 693 provider = create_provider(copy.deepcopy(config_snapshot)) 694 695 investigation_context = resolve_case_investigation_context(case_snapshot) 696 image_metadata = dict(case_snapshot.get("image_metadata", {})) 697 698 context_block = chat_manager.build_chat_context( 699 analysis_results=analysis_results, 700 investigation_context=investigation_context, 701 metadata=image_metadata, 702 ) 703 704 if chat_manager.context_needs_compression(context_block, prompt_budget): 705 per_artifact_text = chat_manager._format_per_artifact_findings( 706 analysis_results if isinstance(analysis_results, Mapping) else {}, 707 ) 708 compressed = _compress_findings_with_ai(provider, per_artifact_text, chat_max_tokens) 709 if compressed: 710 context_block = chat_manager.rebuild_context_with_compressed_findings( 711 analysis_results=analysis_results, 712 investigation_context=investigation_context, 713 metadata=image_metadata, 714 compressed_findings=compressed, 715 ) 716 717 retrieved_payload = chat_manager.retrieve_csv_data( 718 question=message, 719 parsed_dir=resolve_case_parsed_dir(case_snapshot), 720 ) 721 retrieved_artifacts: list[str] = [] 722 if isinstance(retrieved_payload.get("artifacts"), list): 723 retrieved_artifacts = [ 724 str(item).strip() 725 for item in retrieved_payload.get("artifacts", []) 726 if str(item).strip() 727 ] 728 729 message_for_ai = message 730 retrieved_data = str(retrieved_payload.get("data", "")).strip() 731 if bool(retrieved_payload.get("retrieved")) and retrieved_data: 732 message_for_ai = ( 733 "Retrieved CSV data for this question:\n" 734 f"{retrieved_data}\n\n" 735 "User question:\n" 736 f"{message}" 737 ) 738 audit_logger.log( 739 "chat_data_retrieval", 740 { 741 "message_index": message_index, 742 "artifacts": list(retrieved_artifacts), 743 "rows_returned": retrieved_data.count("\n"), 744 }, 745 ) 746 747 system_prompt = _load_forensic_system_prompt() 748 749 fixed_tokens = ( 750 chat_manager.estimate_token_count(system_prompt) 751 + chat_manager.estimate_token_count(context_block) 752 + chat_manager.estimate_token_count(message_for_ai) 753 ) 754 history_budget = max(0, prompt_budget - fixed_tokens) 755 756 recent_history = chat_manager.get_recent_history(max_pairs=CHAT_HISTORY_MAX_PAIRS) 757 fitted_history = chat_manager.fit_history(recent_history, history_budget) 758 759 ai_messages: list[dict[str, str]] = [ 760 {"role": "system", "content": system_prompt}, 761 {"role": "user", "content": context_block}, 762 ] 763 for history_message in fitted_history: 764 role = str(history_message.get("role", "")).strip().lower() 765 content = str(history_message.get("content", "")).strip() 766 if role in {"user", "assistant"} and content: 767 ai_messages.append({"role": role, "content": content}) 768 ai_messages.append({"role": "user", "content": message_for_ai}) 769 chat_user_prompt = _render_chat_messages_for_provider(ai_messages) 770 if not chat_user_prompt: 771 chat_user_prompt = ( 772 f"Context Block:\n{context_block}\n\n" 773 f"New User Question:\n{message_for_ai}" 774 ) 775 started_at = time.perf_counter() 776 chunks: list[str] = [] 777 chat_response_max_tokens = max(1, int(chat_max_tokens * 0.2)) 778 for chunk in provider.analyze_stream( 779 system_prompt=system_prompt, 780 user_prompt=chat_user_prompt, 781 max_tokens=chat_response_max_tokens, 782 ): 783 chunk_text = str(chunk) 784 if not chunk_text: 785 continue 786 chunks.append(chunk_text) 787 emit_progress(CHAT_PROGRESS, case_id, {"type": "token", "content": chunk_text}) 788 789 response_text = "".join(chunks).strip() 790 duration_ms = int((time.perf_counter() - started_at) * 1000) 791 if not response_text: 792 raise AIProviderError("Provider returned an empty response.") 793 794 chat_manager.add_message("user", message, metadata={"message_index": message_index}) 795 assistant_metadata: dict[str, Any] = {"message_index": message_index} 796 if retrieved_artifacts: 797 assistant_metadata["data_retrieved"] = list(retrieved_artifacts) 798 chat_manager.add_message("assistant", response_text, metadata=assistant_metadata) 799 800 audit_logger.log( 801 "chat_response_received", 802 { 803 "message_index": message_index, 804 "duration_ms": duration_ms, 805 "response_tokens_estimate": chat_manager.estimate_token_count(response_text), 806 "data_retrieved": bool(retrieved_artifacts), 807 "retrieved_artifacts": list(retrieved_artifacts), 808 }, 809 ) 810 811 set_progress_status(CHAT_PROGRESS, case_id, "completed") 812 emit_progress(CHAT_PROGRESS, case_id, { 813 "type": "done", 814 "data_retrieved": list(retrieved_artifacts), 815 }) 816 except ValueError as error: 817 LOGGER.warning("Chat request rejected for case %s: %s", case_id, error) 818 set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error)) 819 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)}) 820 except AIProviderError as error: 821 LOGGER.warning("Chat provider request failed for case %s: %s", case_id, error) 822 set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error)) 823 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)}) 824 except Exception: 825 LOGGER.exception("Unexpected failure during chat for case %s", case_id) 826 error_message = "Unexpected error while generating chat response." 827 set_progress_status(CHAT_PROGRESS, case_id, "failed", error_message) 828 emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": error_message})
Execute a background chat interaction about analysis results.
Arguments:
- case_id: UUID of the case.
- message: The user's chat message.
- config_snapshot: Deep copy of application config.
100def load_case_analysis_results(case: dict[str, Any]) -> dict[str, Any] | None: 101 """Load analysis results for a case from memory or disk. 102 103 Args: 104 case: The in-memory case state dictionary. 105 106 Returns: 107 Analysis results dict, or ``None``. 108 """ 109 in_memory = case.get("analysis_results") 110 if isinstance(in_memory, dict) and in_memory: 111 return dict(in_memory) 112 113 results_path = Path(case["case_dir"]) / "analysis_results.json" 114 if not results_path.exists(): 115 return dict(in_memory) if isinstance(in_memory, dict) else None 116 117 try: 118 parsed = json.loads(results_path.read_text(encoding="utf-8")) 119 except (OSError, json.JSONDecodeError): 120 LOGGER.warning("Failed to load analysis results from %s", results_path, exc_info=True) 121 return dict(in_memory) if isinstance(in_memory, dict) else None 122 123 if isinstance(parsed, dict): 124 return parsed 125 return dict(in_memory) if isinstance(in_memory, dict) else None
Load analysis results for a case from memory or disk.
Arguments:
- case: The in-memory case state dictionary.
Returns:
Analysis results dict, or
None.
128def resolve_case_investigation_context(case: dict[str, Any]) -> str: 129 """Resolve the investigation context prompt for a case. 130 131 Args: 132 case: The in-memory case state dictionary. 133 134 Returns: 135 The investigation context string, or empty string. 136 """ 137 context = str(case.get("investigation_context", "")).strip() 138 if context: 139 return context 140 141 prompt_path = Path(case["case_dir"]) / "prompt.txt" 142 if not prompt_path.exists(): 143 return "" 144 145 try: 146 return prompt_path.read_text(encoding="utf-8") 147 except OSError: 148 LOGGER.warning("Failed to read investigation context prompt at %s", prompt_path, exc_info=True) 149 return ""
Resolve the investigation context prompt for a case.
Arguments:
- case: The in-memory case state dictionary.
Returns:
The investigation context string, or empty string.
152def resolve_case_parsed_dir(case: dict[str, Any]) -> Path: 153 """Resolve the directory containing parsed CSV files for a case. 154 155 Args: 156 case: The in-memory case state dictionary. 157 158 Returns: 159 Path to the parsed CSV directory. 160 """ 161 csv_output_dir = str(case.get("csv_output_dir", "")).strip() 162 if csv_output_dir: 163 return Path(csv_output_dir) 164 165 csv_paths = collect_case_csv_paths(case) 166 if csv_paths: 167 return csv_paths[0].parent 168 169 return Path(case["case_dir"]) / "parsed"
Resolve the directory containing parsed CSV files for a case.
Arguments:
- case: The in-memory case state dictionary.
Returns:
Path to the parsed CSV directory.