app.routes.tasks

Background task runners for parsing, analysis, and chat.

This module contains the long-running functions that execute on background threading.Thread instances:

  • run_parse -- Parse forensic artifacts via Dissect.
  • run_analysis -- AI-powered analysis of parsed CSV artifacts.
  • run_chat -- Follow-up chat with the AI about analysis results.

Each runner emits SSE progress events through the shared progress stores defined in routes_state and uses a case-log-context wrapper to ensure log messages are tagged with the case ID.

Attributes:
  • _COMPRESS_FINDINGS_FALLBACK_PROMPT: Fallback prompt for findings compression when the prompt file is missing.
  1"""Background task runners for parsing, analysis, and chat.
  2
  3This module contains the long-running functions that execute on background
  4``threading.Thread`` instances:
  5
  6* ``run_parse`` -- Parse forensic artifacts via Dissect.
  7* ``run_analysis`` -- AI-powered analysis of parsed CSV artifacts.
  8* ``run_chat`` -- Follow-up chat with the AI about analysis results.
  9
 10Each runner emits SSE progress events through the shared progress stores
 11defined in :mod:`routes_state` and uses a case-log-context wrapper to
 12ensure log messages are tagged with the case ID.
 13
 14Attributes:
 15    _COMPRESS_FINDINGS_FALLBACK_PROMPT: Fallback prompt for findings
 16        compression when the prompt file is missing.
 17"""
 18
 19from __future__ import annotations
 20
 21import copy
 22import json
 23import logging
 24import time
 25from collections.abc import Mapping
 26from pathlib import Path
 27from typing import Any
 28
 29from ..ai_providers import AIProviderError, create_provider
 30from ..analyzer import ForensicAnalyzer
 31from ..analyzer.core import AnalysisCancelledError
 32from ..case_logging import case_log_context
 33from ..chat import ChatManager
 34from ..parser import ForensicParser
 35from .state import (
 36    PROJECT_ROOT,
 37    CHAT_HISTORY_MAX_PAIRS,
 38    DEFAULT_FORENSIC_SYSTEM_PROMPT,
 39    ANALYSIS_PROGRESS,
 40    CHAT_PROGRESS,
 41    PARSE_PROGRESS,
 42    STATE_LOCK,
 43    emit_progress,
 44    get_cancel_event,
 45    get_case,
 46    mark_case_status,
 47    safe_int,
 48    set_progress_status,
 49)
 50from .artifacts import (
 51    extract_parse_progress,
 52    sanitize_prompt,
 53)
 54from .evidence import (
 55    build_csv_map,
 56    collect_case_csv_paths,
 57    generate_case_report,
 58    resolve_case_csv_output_dir,
 59)
 60
 61__all__ = [
 62    "run_task_with_case_log_context",
 63    "run_parse",
 64    "run_analysis",
 65    "run_chat",
 66    "load_case_analysis_results",
 67    "resolve_case_investigation_context",
 68    "resolve_case_parsed_dir",
 69]
 70
 71LOGGER = logging.getLogger(__name__)
 72
 73_COMPRESS_FINDINGS_FALLBACK_PROMPT = (
 74    "You are a forensic analysis assistant. Compress per-artifact findings "
 75    "while preserving all critical forensic details. Return only the "
 76    "compressed text in bullet-point format, no preamble."
 77)
 78
 79
 80# ---------------------------------------------------------------------------
 81# Prompt / context helpers
 82# ---------------------------------------------------------------------------
 83
 84def _load_forensic_system_prompt() -> str:
 85    """Load the forensic AI system prompt from the ``prompts/`` directory.
 86
 87    Returns:
 88        The system prompt string, or the default fallback.
 89    """
 90    prompt_path = PROJECT_ROOT / "prompts" / "system_prompt.md"
 91    try:
 92        prompt_text = prompt_path.read_text(encoding="utf-8").strip()
 93    except OSError:
 94        LOGGER.warning("Failed to read system prompt from %s; using fallback prompt.", prompt_path)
 95        return DEFAULT_FORENSIC_SYSTEM_PROMPT
 96    return prompt_text or DEFAULT_FORENSIC_SYSTEM_PROMPT
 97
 98
 99def load_case_analysis_results(case: dict[str, Any]) -> dict[str, Any] | None:
100    """Load analysis results for a case from memory or disk.
101
102    Args:
103        case: The in-memory case state dictionary.
104
105    Returns:
106        Analysis results dict, or ``None``.
107    """
108    in_memory = case.get("analysis_results")
109    if isinstance(in_memory, dict) and in_memory:
110        return dict(in_memory)
111
112    results_path = Path(case["case_dir"]) / "analysis_results.json"
113    if not results_path.exists():
114        return dict(in_memory) if isinstance(in_memory, dict) else None
115
116    try:
117        parsed = json.loads(results_path.read_text(encoding="utf-8"))
118    except (OSError, json.JSONDecodeError):
119        LOGGER.warning("Failed to load analysis results from %s", results_path, exc_info=True)
120        return dict(in_memory) if isinstance(in_memory, dict) else None
121
122    if isinstance(parsed, dict):
123        return parsed
124    return dict(in_memory) if isinstance(in_memory, dict) else None
125
126
127def resolve_case_investigation_context(case: dict[str, Any]) -> str:
128    """Resolve the investigation context prompt for a case.
129
130    Args:
131        case: The in-memory case state dictionary.
132
133    Returns:
134        The investigation context string, or empty string.
135    """
136    context = str(case.get("investigation_context", "")).strip()
137    if context:
138        return context
139
140    prompt_path = Path(case["case_dir"]) / "prompt.txt"
141    if not prompt_path.exists():
142        return ""
143
144    try:
145        return prompt_path.read_text(encoding="utf-8")
146    except OSError:
147        LOGGER.warning("Failed to read investigation context prompt at %s", prompt_path, exc_info=True)
148        return ""
149
150
151def resolve_case_parsed_dir(case: dict[str, Any]) -> Path:
152    """Resolve the directory containing parsed CSV files for a case.
153
154    Args:
155        case: The in-memory case state dictionary.
156
157    Returns:
158        Path to the parsed CSV directory.
159    """
160    csv_output_dir = str(case.get("csv_output_dir", "")).strip()
161    if csv_output_dir:
162        return Path(csv_output_dir)
163
164    csv_paths = collect_case_csv_paths(case)
165    if csv_paths:
166        return csv_paths[0].parent
167
168    return Path(case["case_dir"]) / "parsed"
169
170
171def _render_chat_messages_for_provider(messages: list[dict[str, str]]) -> str:
172    """Render chat messages into a single prompt string for the AI.
173
174    Args:
175        messages: Ordered list of message dicts with ``role`` and ``content``.
176
177    Returns:
178        Formatted multi-section prompt string.
179    """
180    rendered_sections: list[str] = []
181    first_user_rendered = False
182    last_user_index = -1
183    for index, message in enumerate(messages):
184        role = str(message.get("role", "")).strip().lower()
185        content = str(message.get("content", "")).strip()
186        if role == "user" and content:
187            last_user_index = index
188
189    for index, message in enumerate(messages):
190        role = str(message.get("role", "")).strip().lower()
191        content = str(message.get("content", "")).strip()
192        if not content or role == "system":
193            continue
194
195        if role == "user" and not first_user_rendered:
196            rendered_sections.append(f"Context Block:\n{content}")
197            first_user_rendered = True
198            continue
199
200        if role == "user" and index == last_user_index:
201            rendered_sections.append(f"New User Question:\n{content}")
202            continue
203
204        label = "User" if role == "user" else "Assistant" if role == "assistant" else role.title()
205        rendered_sections.append(f"{label}:\n{content}")
206
207    return "\n\n".join(rendered_sections).strip()
208
209
210def _load_compress_findings_prompt() -> str:
211    """Load the prompt used to compress per-artifact findings with AI.
212
213    Returns:
214        The compression system prompt string.
215    """
216    prompt_path = PROJECT_ROOT / "prompts" / "compress_findings.md"
217    try:
218        prompt_text = prompt_path.read_text(encoding="utf-8").strip()
219    except OSError:
220        LOGGER.warning("Failed to read compress findings prompt from %s; using fallback.", prompt_path)
221        return _COMPRESS_FINDINGS_FALLBACK_PROMPT
222    return prompt_text or _COMPRESS_FINDINGS_FALLBACK_PROMPT
223
224
225def _compress_findings_with_ai(
226    provider: Any,
227    findings_text: str,
228    max_tokens: int,
229) -> str | None:
230    """Use the AI provider to compress per-artifact findings.
231
232    Args:
233        provider: AI provider instance with an ``analyze`` method.
234        findings_text: Full per-artifact findings text.
235        max_tokens: Configured max token budget.
236
237    Returns:
238        Compressed findings text, or ``None`` on failure.
239    """
240    if not findings_text or not findings_text.strip():
241        return None
242
243    target_tokens = max(200, int(max_tokens * 0.25))
244    try:
245        compressed = provider.analyze(
246            system_prompt=_load_compress_findings_prompt(),
247            user_prompt=(
248                f"Compress the following per-artifact forensic findings to "
249                f"roughly {target_tokens} tokens. Keep the bullet-point "
250                f"format (\"- artifact: summary\"). Preserve every "
251                f"suspicious indicator, timestamp, path, and conclusion.\n\n"
252                f"{findings_text}"
253            ),
254            max_tokens=target_tokens,
255        )
256        result = str(compressed).strip()
257        return result if result else None
258    except (AIProviderError, Exception):
259        LOGGER.warning(
260            "AI-powered findings compression failed; falling back to full context.",
261            exc_info=True,
262        )
263        return None
264
265
266def _resolve_chat_max_tokens(config: dict[str, Any]) -> int:
267    """Resolve the maximum token count for chat from config.
268
269    Args:
270        config: Full application configuration dict.
271
272    Returns:
273        Positive integer token limit.
274
275    Raises:
276        ValueError: If the setting is missing or invalid.
277    """
278    analysis_config = config.get("analysis", {})
279    if not isinstance(analysis_config, dict):
280        raise ValueError(
281            "Chat max tokens are not configured. Set `analysis.ai_max_tokens` in Settings."
282        )
283
284    if "ai_max_tokens" not in analysis_config:
285        raise ValueError(
286            "Chat max tokens are not configured. Set `analysis.ai_max_tokens` in Settings."
287        )
288
289    try:
290        resolved = int(analysis_config.get("ai_max_tokens"))
291    except (TypeError, ValueError):
292        raise ValueError(
293            "Invalid `analysis.ai_max_tokens` value in Settings. Provide a positive integer."
294        ) from None
295
296    if resolved <= 0:
297        raise ValueError(
298            "Invalid `analysis.ai_max_tokens` value in Settings. Provide a positive integer."
299        )
300    return resolved
301
302
303# ---------------------------------------------------------------------------
304# Case-log-context wrapper (replaces three duplicate wrappers)
305# ---------------------------------------------------------------------------
306
307def run_task_with_case_log_context(
308    case_id: str,
309    task_fn: Any,
310    *args: Any,
311    **kwargs: Any,
312) -> None:
313    """Run a background task function within case-scoped logging context.
314
315    This replaces the three near-identical ``_run_*_with_case_log_context``
316    wrappers with a single generic version.
317
318    Args:
319        case_id: UUID of the case (used for log tagging).
320        task_fn: The callable to invoke.
321        *args: Positional arguments forwarded to *task_fn*.
322        **kwargs: Keyword arguments forwarded to *task_fn*.
323    """
324    with case_log_context(case_id):
325        task_fn(*args, **kwargs)
326
327
328# ---------------------------------------------------------------------------
329# Background task: parse
330# ---------------------------------------------------------------------------
331
332def run_parse(
333    case_id: str,
334    parse_artifacts: list[str],
335    analysis_artifacts: list[str],
336    artifact_options: list[dict[str, str]],
337    config_snapshot: dict[str, Any],
338) -> None:
339    """Execute background parsing of selected forensic artifacts.
340
341    Args:
342        case_id: UUID of the case.
343        parse_artifacts: Artifact keys to parse.
344        analysis_artifacts: Subset for AI analysis.
345        artifact_options: Canonical artifact option dicts.
346        config_snapshot: Deep copy of application config.
347    """
348    cancel_event = get_cancel_event(PARSE_PROGRESS, case_id)
349    case = get_case(case_id)
350    if case is None:
351        set_progress_status(PARSE_PROGRESS, case_id, "failed", "Case not found.")
352        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "Case not found."})
353        return
354
355    with STATE_LOCK:
356        evidence_path = str(case.get("evidence_path", "")).strip()
357        case_dir = case["case_dir"]
358        audit_logger = case["audit"]
359        case_snapshot = dict(case)
360
361    if not evidence_path:
362        mark_case_status(case_id, "failed")
363        set_progress_status(PARSE_PROGRESS, case_id, "failed", "No evidence available for parsing.")
364        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "No evidence available for parsing."})
365        return
366
367    try:
368        csv_output_dir = resolve_case_csv_output_dir(case_snapshot, config_snapshot=config_snapshot)
369        with ForensicParser(
370            evidence_path=evidence_path,
371            case_dir=case_dir,
372            audit_logger=audit_logger,
373            parsed_dir=csv_output_dir,
374        ) as parser:
375            results: list[dict[str, Any]] = []
376            total = len(parse_artifacts)
377
378            for index, artifact in enumerate(parse_artifacts, start=1):
379                if cancel_event is not None and cancel_event.is_set():
380                    LOGGER.info("Parsing cancelled for case %s before artifact %s", case_id, artifact)
381                    return
382                emit_progress(
383                    PARSE_PROGRESS, case_id,
384                    {"type": "artifact_started", "artifact_key": artifact, "index": index, "total": total},
385                )
386
387                def _progress_callback(*args: Any, **_kwargs: Any) -> None:
388                    """Emit per-artifact parse progress events."""
389                    artifact_key, record_count = extract_parse_progress(artifact, args)
390                    emit_progress(
391                        PARSE_PROGRESS, case_id,
392                        {"type": "artifact_progress", "artifact_key": artifact_key, "record_count": record_count},
393                    )
394
395                result = parser.parse_artifact(artifact, progress_callback=_progress_callback)
396                result_entry = {"artifact_key": artifact, **result}
397                results.append(result_entry)
398
399                emit_progress(
400                    PARSE_PROGRESS, case_id,
401                    {
402                        "type": "artifact_completed" if result.get("success") else "artifact_failed",
403                        "artifact_key": artifact,
404                        "record_count": safe_int(result.get("record_count", 0)),
405                        "duration_seconds": float(result.get("duration_seconds", 0.0)),
406                        "csv_path": str(result.get("csv_path", "")),
407                        "error": result.get("error"),
408                    },
409                )
410
411            csv_map = build_csv_map(results)
412            with STATE_LOCK:
413                case["selected_artifacts"] = list(parse_artifacts)
414                case["analysis_artifacts"] = list(analysis_artifacts)
415                case["artifact_options"] = list(artifact_options)
416                case["parse_results"] = results
417                case["artifact_csv_paths"] = csv_map
418                case["csv_output_dir"] = str(csv_output_dir)
419
420            completed = sum(1 for item in results if item.get("success"))
421            failed = len(results) - completed
422            set_progress_status(PARSE_PROGRESS, case_id, "completed")
423            emit_progress(
424                PARSE_PROGRESS, case_id,
425                {
426                    "type": "parse_completed",
427                    "total_artifacts": len(results),
428                    "successful_artifacts": completed,
429                    "failed_artifacts": failed,
430                },
431            )
432            mark_case_status(case_id, "parsed")
433    except Exception:
434        LOGGER.exception("Background parse failed for case %s", case_id)
435        user_message = (
436            "Parsing failed due to an internal error. "
437            "Check logs and retry after confirming the evidence file is readable."
438        )
439        mark_case_status(case_id, "error")
440        set_progress_status(PARSE_PROGRESS, case_id, "failed", user_message)
441        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": user_message})
442
443
444# ---------------------------------------------------------------------------
445# Background task: analysis
446# ---------------------------------------------------------------------------
447
448def _purge_stale_analysis(case: dict[str, Any], case_dir: str) -> None:
449    """Clear in-memory and on-disk analysis results after a failed run.
450
451    This prevents stale findings from a prior successful analysis from
452    being served via chat, report, or download routes after a re-analysis
453    fails or is cancelled.
454
455    Args:
456        case: The in-memory case state dictionary.
457        case_dir: Path string to the case directory.
458    """
459    with STATE_LOCK:
460        case["analysis_results"] = {}
461    results_path = Path(case_dir) / "analysis_results.json"
462    if results_path.exists():
463        results_path.unlink(missing_ok=True)
464
465
466def run_analysis(case_id: str, prompt: str, config_snapshot: dict[str, Any]) -> None:
467    """Execute background AI-powered forensic analysis.
468
469    Args:
470        case_id: UUID of the case.
471        prompt: Investigation context / user prompt.
472        config_snapshot: Deep copy of application config.
473    """
474    cancel_event = get_cancel_event(ANALYSIS_PROGRESS, case_id)
475    case = get_case(case_id)
476    if case is None:
477        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", "Case not found.")
478        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": "Case not found."})
479        return
480
481    with STATE_LOCK:
482        csv_map = dict(case.get("artifact_csv_paths", {}))
483        parse_results_snapshot = list(case.get("parse_results", []))
484        analysis_artifacts_state = case.get("analysis_artifacts")
485        selected_artifacts_snapshot = list(case.get("selected_artifacts", []))
486        case_dir = case["case_dir"]
487        audit_logger = case["audit"]
488        image_metadata_snapshot = dict(case.get("image_metadata", {}))
489        os_type_snapshot = str(case.get("os_type") or "unknown")
490        artifact_options_snapshot = list(case.get("artifact_options", []))
491        analysis_date_range = case.get("analysis_date_range")
492
493    if not csv_map:
494        csv_map = build_csv_map(parse_results_snapshot)
495    if isinstance(analysis_artifacts_state, list):
496        artifacts = [str(item) for item in analysis_artifacts_state if str(item) in csv_map]
497    else:
498        artifacts = [item for item in selected_artifacts_snapshot if item in csv_map]
499    if not artifacts and not isinstance(analysis_artifacts_state, list):
500        artifacts = sorted(csv_map.keys())
501    if not artifacts:
502        message = (
503            "No parsed CSV artifacts are marked `Parse and use in AI`."
504            if isinstance(analysis_artifacts_state, list)
505            else "No parsed CSV artifacts available."
506        )
507        mark_case_status(case_id, "failed")
508        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", message)
509        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": message})
510        return
511
512    try:
513        analyzer = ForensicAnalyzer(
514            case_dir=case_dir,
515            config=config_snapshot,
516            audit_logger=audit_logger,
517            artifact_csv_paths=csv_map,
518            os_type=os_type_snapshot,
519        )
520        metadata = dict(image_metadata_snapshot)
521        metadata["os_type"] = os_type_snapshot
522        metadata["artifact_csv_paths"] = csv_map
523        metadata["parse_results"] = parse_results_snapshot
524        metadata["analysis_artifacts"] = list(artifacts)
525        metadata["artifact_options"] = artifact_options_snapshot
526        if isinstance(analysis_date_range, dict):
527            metadata["analysis_date_range"] = {
528                "start_date": str(analysis_date_range.get("start_date", "")).strip(),
529                "end_date": str(analysis_date_range.get("end_date", "")).strip(),
530            }
531
532        def _analysis_progress(*args: Any) -> None:
533            """Emit per-artifact analysis progress events."""
534            artifact_key = ""
535            status = ""
536            result: dict[str, Any] = {}
537
538            if len(args) >= 3:
539                artifact_key = str(args[0])
540                status = str(args[1])
541                result_payload = args[2]
542                if isinstance(result_payload, dict):
543                    result = dict(result_payload)
544            elif len(args) == 1 and isinstance(args[0], dict):
545                payload = args[0]
546                artifact_key = str(payload.get("artifact_key", ""))
547                status = str(payload.get("status", ""))
548                result_payload = payload.get("result")
549                if isinstance(result_payload, dict):
550                    result = dict(result_payload)
551            else:
552                return
553
554            if status == "started":
555                emit_progress(ANALYSIS_PROGRESS, case_id, {
556                    "type": "artifact_analysis_started", "artifact_key": artifact_key, "result": result,
557                })
558                return
559
560            if status == "thinking":
561                emit_progress(ANALYSIS_PROGRESS, case_id, {
562                    "type": "artifact_analysis_thinking", "artifact_key": artifact_key, "result": result,
563                })
564                return
565
566            emit_progress(ANALYSIS_PROGRESS, case_id, {
567                "type": "artifact_analysis_completed",
568                "artifact_key": artifact_key,
569                "status": status or "complete",
570                "result": result,
571            })
572
573        output = analyzer.run_full_analysis(
574            artifact_keys=artifacts,
575            investigation_context=prompt,
576            metadata=metadata,
577            progress_callback=_analysis_progress,
578            cancel_check=(lambda: cancel_event.is_set()) if cancel_event is not None else None,
579        )
580        analysis_results_path = Path(case_dir) / "analysis_results.json"
581        with analysis_results_path.open("w", encoding="utf-8") as analysis_results_file:
582            json.dump(output, analysis_results_file, indent=2, ensure_ascii=True)
583            analysis_results_file.write("\n")
584        with STATE_LOCK:
585            case["investigation_context"] = prompt
586            case["analysis_results"] = output
587
588        emit_progress(ANALYSIS_PROGRESS, case_id, {
589            "type": "analysis_summary",
590            "summary": str(output.get("summary", "")),
591            "model_info": output.get("model_info", {}),
592        })
593        set_progress_status(ANALYSIS_PROGRESS, case_id, "completed")
594        emit_progress(ANALYSIS_PROGRESS, case_id, {
595            "type": "analysis_completed",
596            "artifact_count": len(output.get("per_artifact", [])),
597            "per_artifact": list(output.get("per_artifact", [])),
598        })
599        mark_case_status(case_id, "completed")
600
601        # Auto-generate the HTML report so it's ready for download.
602        try:
603            report_result = generate_case_report(case_id)
604            if report_result.get("success"):
605                LOGGER.info(
606                    "Auto-generated report for case %s: %s",
607                    case_id, report_result["report_path"].name,
608                )
609            else:
610                LOGGER.warning(
611                    "Auto-report generation failed for case %s: %s",
612                    case_id, report_result.get("error", "unknown error"),
613                )
614        except Exception:
615            LOGGER.warning(
616                "Auto-report generation raised an exception for case %s",
617                case_id, exc_info=True,
618            )
619    except AnalysisCancelledError:
620        LOGGER.info("Analysis cancelled for case %s", case_id)
621    except Exception:
622        LOGGER.exception("Background analysis failed for case %s", case_id)
623        _purge_stale_analysis(case, case_dir)
624        user_message = (
625            "Analysis failed due to an internal error. "
626            "Verify provider settings and retry."
627        )
628        mark_case_status(case_id, "error")
629        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", user_message)
630        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": user_message})
631
632
633# ---------------------------------------------------------------------------
634# Background task: chat
635# ---------------------------------------------------------------------------
636
637def run_chat(case_id: str, message: str, config_snapshot: dict[str, Any]) -> None:
638    """Execute a background chat interaction about analysis results.
639
640    Args:
641        case_id: UUID of the case.
642        message: The user's chat message.
643        config_snapshot: Deep copy of application config.
644    """
645    case = get_case(case_id)
646    if case is None:
647        set_progress_status(CHAT_PROGRESS, case_id, "failed", "Case not found.")
648        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Case not found."})
649        return
650
651    with STATE_LOCK:
652        case_snapshot = dict(case)
653        audit_logger = case["audit"]
654
655    analysis_results = load_case_analysis_results(case_snapshot)
656    if not analysis_results:
657        message_text = "No analysis results available for this case. Run analysis first."
658        set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text)
659        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text})
660        return
661
662    if not isinstance(config_snapshot, dict):
663        set_progress_status(CHAT_PROGRESS, case_id, "failed", "Invalid in-memory configuration state.")
664        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Invalid in-memory configuration state."})
665        return
666
667    try:
668        chat_max_tokens = _resolve_chat_max_tokens(config_snapshot)
669    except ValueError as error:
670        message_text = str(error)
671        LOGGER.warning("Chat configuration rejected for case %s: %s", case_id, message_text)
672        set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text)
673        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text})
674        return
675
676    case_dir = case_snapshot["case_dir"]
677    chat_manager = ChatManager(case_dir, max_context_tokens=chat_max_tokens)
678    history_snapshot = chat_manager.get_history()
679    message_index = (
680        sum(1 for entry in history_snapshot if str(entry.get("role", "")).strip().lower() == "user") + 1
681    )
682    audit_logger.log(
683        "chat_message_sent",
684        {
685            "message_index": message_index,
686            "message": sanitize_prompt(message, max_chars=8000),
687        },
688    )
689
690    try:
691        prompt_budget = int(chat_max_tokens * 0.8)
692        provider = create_provider(copy.deepcopy(config_snapshot))
693
694        investigation_context = resolve_case_investigation_context(case_snapshot)
695        image_metadata = dict(case_snapshot.get("image_metadata", {}))
696
697        context_block = chat_manager.build_chat_context(
698            analysis_results=analysis_results,
699            investigation_context=investigation_context,
700            metadata=image_metadata,
701        )
702
703        if chat_manager.context_needs_compression(context_block, prompt_budget):
704            per_artifact_text = chat_manager._format_per_artifact_findings(
705                analysis_results if isinstance(analysis_results, Mapping) else {},
706            )
707            compressed = _compress_findings_with_ai(provider, per_artifact_text, chat_max_tokens)
708            if compressed:
709                context_block = chat_manager.rebuild_context_with_compressed_findings(
710                    analysis_results=analysis_results,
711                    investigation_context=investigation_context,
712                    metadata=image_metadata,
713                    compressed_findings=compressed,
714                )
715
716        retrieved_payload = chat_manager.retrieve_csv_data(
717            question=message,
718            parsed_dir=resolve_case_parsed_dir(case_snapshot),
719        )
720        retrieved_artifacts: list[str] = []
721        if isinstance(retrieved_payload.get("artifacts"), list):
722            retrieved_artifacts = [
723                str(item).strip()
724                for item in retrieved_payload.get("artifacts", [])
725                if str(item).strip()
726            ]
727
728        message_for_ai = message
729        retrieved_data = str(retrieved_payload.get("data", "")).strip()
730        if bool(retrieved_payload.get("retrieved")) and retrieved_data:
731            message_for_ai = (
732                "Retrieved CSV data for this question:\n"
733                f"{retrieved_data}\n\n"
734                "User question:\n"
735                f"{message}"
736            )
737            audit_logger.log(
738                "chat_data_retrieval",
739                {
740                    "message_index": message_index,
741                    "artifacts": list(retrieved_artifacts),
742                    "rows_returned": retrieved_data.count("\n"),
743                },
744            )
745
746        system_prompt = _load_forensic_system_prompt()
747
748        fixed_tokens = (
749            chat_manager.estimate_token_count(system_prompt)
750            + chat_manager.estimate_token_count(context_block)
751            + chat_manager.estimate_token_count(message_for_ai)
752        )
753        history_budget = max(0, prompt_budget - fixed_tokens)
754
755        recent_history = chat_manager.get_recent_history(max_pairs=CHAT_HISTORY_MAX_PAIRS)
756        fitted_history = chat_manager.fit_history(recent_history, history_budget)
757
758        ai_messages: list[dict[str, str]] = [
759            {"role": "system", "content": system_prompt},
760            {"role": "user", "content": context_block},
761        ]
762        for history_message in fitted_history:
763            role = str(history_message.get("role", "")).strip().lower()
764            content = str(history_message.get("content", "")).strip()
765            if role in {"user", "assistant"} and content:
766                ai_messages.append({"role": role, "content": content})
767        ai_messages.append({"role": "user", "content": message_for_ai})
768        chat_user_prompt = _render_chat_messages_for_provider(ai_messages)
769        if not chat_user_prompt:
770            chat_user_prompt = (
771                f"Context Block:\n{context_block}\n\n"
772                f"New User Question:\n{message_for_ai}"
773            )
774        started_at = time.perf_counter()
775        chunks: list[str] = []
776        chat_response_max_tokens = max(1, int(chat_max_tokens * 0.2))
777        for chunk in provider.analyze_stream(
778            system_prompt=system_prompt,
779            user_prompt=chat_user_prompt,
780            max_tokens=chat_response_max_tokens,
781        ):
782            chunk_text = str(chunk)
783            if not chunk_text:
784                continue
785            chunks.append(chunk_text)
786            emit_progress(CHAT_PROGRESS, case_id, {"type": "token", "content": chunk_text})
787
788        response_text = "".join(chunks).strip()
789        duration_ms = int((time.perf_counter() - started_at) * 1000)
790        if not response_text:
791            raise AIProviderError("Provider returned an empty response.")
792
793        chat_manager.add_message("user", message, metadata={"message_index": message_index})
794        assistant_metadata: dict[str, Any] = {"message_index": message_index}
795        if retrieved_artifacts:
796            assistant_metadata["data_retrieved"] = list(retrieved_artifacts)
797        chat_manager.add_message("assistant", response_text, metadata=assistant_metadata)
798
799        audit_logger.log(
800            "chat_response_received",
801            {
802                "message_index": message_index,
803                "duration_ms": duration_ms,
804                "response_tokens_estimate": chat_manager.estimate_token_count(response_text),
805                "data_retrieved": bool(retrieved_artifacts),
806                "retrieved_artifacts": list(retrieved_artifacts),
807            },
808        )
809
810        set_progress_status(CHAT_PROGRESS, case_id, "completed")
811        emit_progress(CHAT_PROGRESS, case_id, {
812            "type": "done",
813            "data_retrieved": list(retrieved_artifacts),
814        })
815    except ValueError as error:
816        LOGGER.warning("Chat request rejected for case %s: %s", case_id, error)
817        set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error))
818        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)})
819    except AIProviderError as error:
820        LOGGER.warning("Chat provider request failed for case %s: %s", case_id, error)
821        set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error))
822        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)})
823    except Exception:
824        LOGGER.exception("Unexpected failure during chat for case %s", case_id)
825        error_message = "Unexpected error while generating chat response."
826        set_progress_status(CHAT_PROGRESS, case_id, "failed", error_message)
827        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": error_message})
def run_task_with_case_log_context(case_id: str, task_fn: Any, *args: Any, **kwargs: Any) -> None:
308def run_task_with_case_log_context(
309    case_id: str,
310    task_fn: Any,
311    *args: Any,
312    **kwargs: Any,
313) -> None:
314    """Run a background task function within case-scoped logging context.
315
316    This replaces the three near-identical ``_run_*_with_case_log_context``
317    wrappers with a single generic version.
318
319    Args:
320        case_id: UUID of the case (used for log tagging).
321        task_fn: The callable to invoke.
322        *args: Positional arguments forwarded to *task_fn*.
323        **kwargs: Keyword arguments forwarded to *task_fn*.
324    """
325    with case_log_context(case_id):
326        task_fn(*args, **kwargs)

Run a background task function within case-scoped logging context.

This replaces the three near-identical _run_*_with_case_log_context wrappers with a single generic version.

Arguments:
  • case_id: UUID of the case (used for log tagging).
  • task_fn: The callable to invoke.
  • ***args:** Positional arguments forwarded to task_fn.
  • ****kwargs:** Keyword arguments forwarded to task_fn.
def run_parse( case_id: str, parse_artifacts: list[str], analysis_artifacts: list[str], artifact_options: list[dict[str, str]], config_snapshot: dict[str, typing.Any]) -> None:
333def run_parse(
334    case_id: str,
335    parse_artifacts: list[str],
336    analysis_artifacts: list[str],
337    artifact_options: list[dict[str, str]],
338    config_snapshot: dict[str, Any],
339) -> None:
340    """Execute background parsing of selected forensic artifacts.
341
342    Args:
343        case_id: UUID of the case.
344        parse_artifacts: Artifact keys to parse.
345        analysis_artifacts: Subset for AI analysis.
346        artifact_options: Canonical artifact option dicts.
347        config_snapshot: Deep copy of application config.
348    """
349    cancel_event = get_cancel_event(PARSE_PROGRESS, case_id)
350    case = get_case(case_id)
351    if case is None:
352        set_progress_status(PARSE_PROGRESS, case_id, "failed", "Case not found.")
353        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "Case not found."})
354        return
355
356    with STATE_LOCK:
357        evidence_path = str(case.get("evidence_path", "")).strip()
358        case_dir = case["case_dir"]
359        audit_logger = case["audit"]
360        case_snapshot = dict(case)
361
362    if not evidence_path:
363        mark_case_status(case_id, "failed")
364        set_progress_status(PARSE_PROGRESS, case_id, "failed", "No evidence available for parsing.")
365        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": "No evidence available for parsing."})
366        return
367
368    try:
369        csv_output_dir = resolve_case_csv_output_dir(case_snapshot, config_snapshot=config_snapshot)
370        with ForensicParser(
371            evidence_path=evidence_path,
372            case_dir=case_dir,
373            audit_logger=audit_logger,
374            parsed_dir=csv_output_dir,
375        ) as parser:
376            results: list[dict[str, Any]] = []
377            total = len(parse_artifacts)
378
379            for index, artifact in enumerate(parse_artifacts, start=1):
380                if cancel_event is not None and cancel_event.is_set():
381                    LOGGER.info("Parsing cancelled for case %s before artifact %s", case_id, artifact)
382                    return
383                emit_progress(
384                    PARSE_PROGRESS, case_id,
385                    {"type": "artifact_started", "artifact_key": artifact, "index": index, "total": total},
386                )
387
388                def _progress_callback(*args: Any, **_kwargs: Any) -> None:
389                    """Emit per-artifact parse progress events."""
390                    artifact_key, record_count = extract_parse_progress(artifact, args)
391                    emit_progress(
392                        PARSE_PROGRESS, case_id,
393                        {"type": "artifact_progress", "artifact_key": artifact_key, "record_count": record_count},
394                    )
395
396                result = parser.parse_artifact(artifact, progress_callback=_progress_callback)
397                result_entry = {"artifact_key": artifact, **result}
398                results.append(result_entry)
399
400                emit_progress(
401                    PARSE_PROGRESS, case_id,
402                    {
403                        "type": "artifact_completed" if result.get("success") else "artifact_failed",
404                        "artifact_key": artifact,
405                        "record_count": safe_int(result.get("record_count", 0)),
406                        "duration_seconds": float(result.get("duration_seconds", 0.0)),
407                        "csv_path": str(result.get("csv_path", "")),
408                        "error": result.get("error"),
409                    },
410                )
411
412            csv_map = build_csv_map(results)
413            with STATE_LOCK:
414                case["selected_artifacts"] = list(parse_artifacts)
415                case["analysis_artifacts"] = list(analysis_artifacts)
416                case["artifact_options"] = list(artifact_options)
417                case["parse_results"] = results
418                case["artifact_csv_paths"] = csv_map
419                case["csv_output_dir"] = str(csv_output_dir)
420
421            completed = sum(1 for item in results if item.get("success"))
422            failed = len(results) - completed
423            set_progress_status(PARSE_PROGRESS, case_id, "completed")
424            emit_progress(
425                PARSE_PROGRESS, case_id,
426                {
427                    "type": "parse_completed",
428                    "total_artifacts": len(results),
429                    "successful_artifacts": completed,
430                    "failed_artifacts": failed,
431                },
432            )
433            mark_case_status(case_id, "parsed")
434    except Exception:
435        LOGGER.exception("Background parse failed for case %s", case_id)
436        user_message = (
437            "Parsing failed due to an internal error. "
438            "Check logs and retry after confirming the evidence file is readable."
439        )
440        mark_case_status(case_id, "error")
441        set_progress_status(PARSE_PROGRESS, case_id, "failed", user_message)
442        emit_progress(PARSE_PROGRESS, case_id, {"type": "parse_failed", "error": user_message})

Execute background parsing of selected forensic artifacts.

Arguments:
  • case_id: UUID of the case.
  • parse_artifacts: Artifact keys to parse.
  • analysis_artifacts: Subset for AI analysis.
  • artifact_options: Canonical artifact option dicts.
  • config_snapshot: Deep copy of application config.
def run_analysis( case_id: str, prompt: str, config_snapshot: dict[str, typing.Any]) -> None:
467def run_analysis(case_id: str, prompt: str, config_snapshot: dict[str, Any]) -> None:
468    """Execute background AI-powered forensic analysis.
469
470    Args:
471        case_id: UUID of the case.
472        prompt: Investigation context / user prompt.
473        config_snapshot: Deep copy of application config.
474    """
475    cancel_event = get_cancel_event(ANALYSIS_PROGRESS, case_id)
476    case = get_case(case_id)
477    if case is None:
478        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", "Case not found.")
479        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": "Case not found."})
480        return
481
482    with STATE_LOCK:
483        csv_map = dict(case.get("artifact_csv_paths", {}))
484        parse_results_snapshot = list(case.get("parse_results", []))
485        analysis_artifacts_state = case.get("analysis_artifacts")
486        selected_artifacts_snapshot = list(case.get("selected_artifacts", []))
487        case_dir = case["case_dir"]
488        audit_logger = case["audit"]
489        image_metadata_snapshot = dict(case.get("image_metadata", {}))
490        os_type_snapshot = str(case.get("os_type") or "unknown")
491        artifact_options_snapshot = list(case.get("artifact_options", []))
492        analysis_date_range = case.get("analysis_date_range")
493
494    if not csv_map:
495        csv_map = build_csv_map(parse_results_snapshot)
496    if isinstance(analysis_artifacts_state, list):
497        artifacts = [str(item) for item in analysis_artifacts_state if str(item) in csv_map]
498    else:
499        artifacts = [item for item in selected_artifacts_snapshot if item in csv_map]
500    if not artifacts and not isinstance(analysis_artifacts_state, list):
501        artifacts = sorted(csv_map.keys())
502    if not artifacts:
503        message = (
504            "No parsed CSV artifacts are marked `Parse and use in AI`."
505            if isinstance(analysis_artifacts_state, list)
506            else "No parsed CSV artifacts available."
507        )
508        mark_case_status(case_id, "failed")
509        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", message)
510        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": message})
511        return
512
513    try:
514        analyzer = ForensicAnalyzer(
515            case_dir=case_dir,
516            config=config_snapshot,
517            audit_logger=audit_logger,
518            artifact_csv_paths=csv_map,
519            os_type=os_type_snapshot,
520        )
521        metadata = dict(image_metadata_snapshot)
522        metadata["os_type"] = os_type_snapshot
523        metadata["artifact_csv_paths"] = csv_map
524        metadata["parse_results"] = parse_results_snapshot
525        metadata["analysis_artifacts"] = list(artifacts)
526        metadata["artifact_options"] = artifact_options_snapshot
527        if isinstance(analysis_date_range, dict):
528            metadata["analysis_date_range"] = {
529                "start_date": str(analysis_date_range.get("start_date", "")).strip(),
530                "end_date": str(analysis_date_range.get("end_date", "")).strip(),
531            }
532
533        def _analysis_progress(*args: Any) -> None:
534            """Emit per-artifact analysis progress events."""
535            artifact_key = ""
536            status = ""
537            result: dict[str, Any] = {}
538
539            if len(args) >= 3:
540                artifact_key = str(args[0])
541                status = str(args[1])
542                result_payload = args[2]
543                if isinstance(result_payload, dict):
544                    result = dict(result_payload)
545            elif len(args) == 1 and isinstance(args[0], dict):
546                payload = args[0]
547                artifact_key = str(payload.get("artifact_key", ""))
548                status = str(payload.get("status", ""))
549                result_payload = payload.get("result")
550                if isinstance(result_payload, dict):
551                    result = dict(result_payload)
552            else:
553                return
554
555            if status == "started":
556                emit_progress(ANALYSIS_PROGRESS, case_id, {
557                    "type": "artifact_analysis_started", "artifact_key": artifact_key, "result": result,
558                })
559                return
560
561            if status == "thinking":
562                emit_progress(ANALYSIS_PROGRESS, case_id, {
563                    "type": "artifact_analysis_thinking", "artifact_key": artifact_key, "result": result,
564                })
565                return
566
567            emit_progress(ANALYSIS_PROGRESS, case_id, {
568                "type": "artifact_analysis_completed",
569                "artifact_key": artifact_key,
570                "status": status or "complete",
571                "result": result,
572            })
573
574        output = analyzer.run_full_analysis(
575            artifact_keys=artifacts,
576            investigation_context=prompt,
577            metadata=metadata,
578            progress_callback=_analysis_progress,
579            cancel_check=(lambda: cancel_event.is_set()) if cancel_event is not None else None,
580        )
581        analysis_results_path = Path(case_dir) / "analysis_results.json"
582        with analysis_results_path.open("w", encoding="utf-8") as analysis_results_file:
583            json.dump(output, analysis_results_file, indent=2, ensure_ascii=True)
584            analysis_results_file.write("\n")
585        with STATE_LOCK:
586            case["investigation_context"] = prompt
587            case["analysis_results"] = output
588
589        emit_progress(ANALYSIS_PROGRESS, case_id, {
590            "type": "analysis_summary",
591            "summary": str(output.get("summary", "")),
592            "model_info": output.get("model_info", {}),
593        })
594        set_progress_status(ANALYSIS_PROGRESS, case_id, "completed")
595        emit_progress(ANALYSIS_PROGRESS, case_id, {
596            "type": "analysis_completed",
597            "artifact_count": len(output.get("per_artifact", [])),
598            "per_artifact": list(output.get("per_artifact", [])),
599        })
600        mark_case_status(case_id, "completed")
601
602        # Auto-generate the HTML report so it's ready for download.
603        try:
604            report_result = generate_case_report(case_id)
605            if report_result.get("success"):
606                LOGGER.info(
607                    "Auto-generated report for case %s: %s",
608                    case_id, report_result["report_path"].name,
609                )
610            else:
611                LOGGER.warning(
612                    "Auto-report generation failed for case %s: %s",
613                    case_id, report_result.get("error", "unknown error"),
614                )
615        except Exception:
616            LOGGER.warning(
617                "Auto-report generation raised an exception for case %s",
618                case_id, exc_info=True,
619            )
620    except AnalysisCancelledError:
621        LOGGER.info("Analysis cancelled for case %s", case_id)
622    except Exception:
623        LOGGER.exception("Background analysis failed for case %s", case_id)
624        _purge_stale_analysis(case, case_dir)
625        user_message = (
626            "Analysis failed due to an internal error. "
627            "Verify provider settings and retry."
628        )
629        mark_case_status(case_id, "error")
630        set_progress_status(ANALYSIS_PROGRESS, case_id, "failed", user_message)
631        emit_progress(ANALYSIS_PROGRESS, case_id, {"type": "analysis_failed", "error": user_message})

Execute background AI-powered forensic analysis.

Arguments:
  • case_id: UUID of the case.
  • prompt: Investigation context / user prompt.
  • config_snapshot: Deep copy of application config.
def run_chat( case_id: str, message: str, config_snapshot: dict[str, typing.Any]) -> None:
638def run_chat(case_id: str, message: str, config_snapshot: dict[str, Any]) -> None:
639    """Execute a background chat interaction about analysis results.
640
641    Args:
642        case_id: UUID of the case.
643        message: The user's chat message.
644        config_snapshot: Deep copy of application config.
645    """
646    case = get_case(case_id)
647    if case is None:
648        set_progress_status(CHAT_PROGRESS, case_id, "failed", "Case not found.")
649        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Case not found."})
650        return
651
652    with STATE_LOCK:
653        case_snapshot = dict(case)
654        audit_logger = case["audit"]
655
656    analysis_results = load_case_analysis_results(case_snapshot)
657    if not analysis_results:
658        message_text = "No analysis results available for this case. Run analysis first."
659        set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text)
660        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text})
661        return
662
663    if not isinstance(config_snapshot, dict):
664        set_progress_status(CHAT_PROGRESS, case_id, "failed", "Invalid in-memory configuration state.")
665        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": "Invalid in-memory configuration state."})
666        return
667
668    try:
669        chat_max_tokens = _resolve_chat_max_tokens(config_snapshot)
670    except ValueError as error:
671        message_text = str(error)
672        LOGGER.warning("Chat configuration rejected for case %s: %s", case_id, message_text)
673        set_progress_status(CHAT_PROGRESS, case_id, "failed", message_text)
674        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": message_text})
675        return
676
677    case_dir = case_snapshot["case_dir"]
678    chat_manager = ChatManager(case_dir, max_context_tokens=chat_max_tokens)
679    history_snapshot = chat_manager.get_history()
680    message_index = (
681        sum(1 for entry in history_snapshot if str(entry.get("role", "")).strip().lower() == "user") + 1
682    )
683    audit_logger.log(
684        "chat_message_sent",
685        {
686            "message_index": message_index,
687            "message": sanitize_prompt(message, max_chars=8000),
688        },
689    )
690
691    try:
692        prompt_budget = int(chat_max_tokens * 0.8)
693        provider = create_provider(copy.deepcopy(config_snapshot))
694
695        investigation_context = resolve_case_investigation_context(case_snapshot)
696        image_metadata = dict(case_snapshot.get("image_metadata", {}))
697
698        context_block = chat_manager.build_chat_context(
699            analysis_results=analysis_results,
700            investigation_context=investigation_context,
701            metadata=image_metadata,
702        )
703
704        if chat_manager.context_needs_compression(context_block, prompt_budget):
705            per_artifact_text = chat_manager._format_per_artifact_findings(
706                analysis_results if isinstance(analysis_results, Mapping) else {},
707            )
708            compressed = _compress_findings_with_ai(provider, per_artifact_text, chat_max_tokens)
709            if compressed:
710                context_block = chat_manager.rebuild_context_with_compressed_findings(
711                    analysis_results=analysis_results,
712                    investigation_context=investigation_context,
713                    metadata=image_metadata,
714                    compressed_findings=compressed,
715                )
716
717        retrieved_payload = chat_manager.retrieve_csv_data(
718            question=message,
719            parsed_dir=resolve_case_parsed_dir(case_snapshot),
720        )
721        retrieved_artifacts: list[str] = []
722        if isinstance(retrieved_payload.get("artifacts"), list):
723            retrieved_artifacts = [
724                str(item).strip()
725                for item in retrieved_payload.get("artifacts", [])
726                if str(item).strip()
727            ]
728
729        message_for_ai = message
730        retrieved_data = str(retrieved_payload.get("data", "")).strip()
731        if bool(retrieved_payload.get("retrieved")) and retrieved_data:
732            message_for_ai = (
733                "Retrieved CSV data for this question:\n"
734                f"{retrieved_data}\n\n"
735                "User question:\n"
736                f"{message}"
737            )
738            audit_logger.log(
739                "chat_data_retrieval",
740                {
741                    "message_index": message_index,
742                    "artifacts": list(retrieved_artifacts),
743                    "rows_returned": retrieved_data.count("\n"),
744                },
745            )
746
747        system_prompt = _load_forensic_system_prompt()
748
749        fixed_tokens = (
750            chat_manager.estimate_token_count(system_prompt)
751            + chat_manager.estimate_token_count(context_block)
752            + chat_manager.estimate_token_count(message_for_ai)
753        )
754        history_budget = max(0, prompt_budget - fixed_tokens)
755
756        recent_history = chat_manager.get_recent_history(max_pairs=CHAT_HISTORY_MAX_PAIRS)
757        fitted_history = chat_manager.fit_history(recent_history, history_budget)
758
759        ai_messages: list[dict[str, str]] = [
760            {"role": "system", "content": system_prompt},
761            {"role": "user", "content": context_block},
762        ]
763        for history_message in fitted_history:
764            role = str(history_message.get("role", "")).strip().lower()
765            content = str(history_message.get("content", "")).strip()
766            if role in {"user", "assistant"} and content:
767                ai_messages.append({"role": role, "content": content})
768        ai_messages.append({"role": "user", "content": message_for_ai})
769        chat_user_prompt = _render_chat_messages_for_provider(ai_messages)
770        if not chat_user_prompt:
771            chat_user_prompt = (
772                f"Context Block:\n{context_block}\n\n"
773                f"New User Question:\n{message_for_ai}"
774            )
775        started_at = time.perf_counter()
776        chunks: list[str] = []
777        chat_response_max_tokens = max(1, int(chat_max_tokens * 0.2))
778        for chunk in provider.analyze_stream(
779            system_prompt=system_prompt,
780            user_prompt=chat_user_prompt,
781            max_tokens=chat_response_max_tokens,
782        ):
783            chunk_text = str(chunk)
784            if not chunk_text:
785                continue
786            chunks.append(chunk_text)
787            emit_progress(CHAT_PROGRESS, case_id, {"type": "token", "content": chunk_text})
788
789        response_text = "".join(chunks).strip()
790        duration_ms = int((time.perf_counter() - started_at) * 1000)
791        if not response_text:
792            raise AIProviderError("Provider returned an empty response.")
793
794        chat_manager.add_message("user", message, metadata={"message_index": message_index})
795        assistant_metadata: dict[str, Any] = {"message_index": message_index}
796        if retrieved_artifacts:
797            assistant_metadata["data_retrieved"] = list(retrieved_artifacts)
798        chat_manager.add_message("assistant", response_text, metadata=assistant_metadata)
799
800        audit_logger.log(
801            "chat_response_received",
802            {
803                "message_index": message_index,
804                "duration_ms": duration_ms,
805                "response_tokens_estimate": chat_manager.estimate_token_count(response_text),
806                "data_retrieved": bool(retrieved_artifacts),
807                "retrieved_artifacts": list(retrieved_artifacts),
808            },
809        )
810
811        set_progress_status(CHAT_PROGRESS, case_id, "completed")
812        emit_progress(CHAT_PROGRESS, case_id, {
813            "type": "done",
814            "data_retrieved": list(retrieved_artifacts),
815        })
816    except ValueError as error:
817        LOGGER.warning("Chat request rejected for case %s: %s", case_id, error)
818        set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error))
819        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)})
820    except AIProviderError as error:
821        LOGGER.warning("Chat provider request failed for case %s: %s", case_id, error)
822        set_progress_status(CHAT_PROGRESS, case_id, "failed", str(error))
823        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": str(error)})
824    except Exception:
825        LOGGER.exception("Unexpected failure during chat for case %s", case_id)
826        error_message = "Unexpected error while generating chat response."
827        set_progress_status(CHAT_PROGRESS, case_id, "failed", error_message)
828        emit_progress(CHAT_PROGRESS, case_id, {"type": "error", "message": error_message})

Execute a background chat interaction about analysis results.

Arguments:
  • case_id: UUID of the case.
  • message: The user's chat message.
  • config_snapshot: Deep copy of application config.
def load_case_analysis_results(case: dict[str, typing.Any]) -> dict[str, typing.Any] | None:
100def load_case_analysis_results(case: dict[str, Any]) -> dict[str, Any] | None:
101    """Load analysis results for a case from memory or disk.
102
103    Args:
104        case: The in-memory case state dictionary.
105
106    Returns:
107        Analysis results dict, or ``None``.
108    """
109    in_memory = case.get("analysis_results")
110    if isinstance(in_memory, dict) and in_memory:
111        return dict(in_memory)
112
113    results_path = Path(case["case_dir"]) / "analysis_results.json"
114    if not results_path.exists():
115        return dict(in_memory) if isinstance(in_memory, dict) else None
116
117    try:
118        parsed = json.loads(results_path.read_text(encoding="utf-8"))
119    except (OSError, json.JSONDecodeError):
120        LOGGER.warning("Failed to load analysis results from %s", results_path, exc_info=True)
121        return dict(in_memory) if isinstance(in_memory, dict) else None
122
123    if isinstance(parsed, dict):
124        return parsed
125    return dict(in_memory) if isinstance(in_memory, dict) else None

Load analysis results for a case from memory or disk.

Arguments:
  • case: The in-memory case state dictionary.
Returns:

Analysis results dict, or None.

def resolve_case_investigation_context(case: dict[str, typing.Any]) -> str:
128def resolve_case_investigation_context(case: dict[str, Any]) -> str:
129    """Resolve the investigation context prompt for a case.
130
131    Args:
132        case: The in-memory case state dictionary.
133
134    Returns:
135        The investigation context string, or empty string.
136    """
137    context = str(case.get("investigation_context", "")).strip()
138    if context:
139        return context
140
141    prompt_path = Path(case["case_dir"]) / "prompt.txt"
142    if not prompt_path.exists():
143        return ""
144
145    try:
146        return prompt_path.read_text(encoding="utf-8")
147    except OSError:
148        LOGGER.warning("Failed to read investigation context prompt at %s", prompt_path, exc_info=True)
149        return ""

Resolve the investigation context prompt for a case.

Arguments:
  • case: The in-memory case state dictionary.
Returns:

The investigation context string, or empty string.

def resolve_case_parsed_dir(case: dict[str, typing.Any]) -> pathlib.Path:
152def resolve_case_parsed_dir(case: dict[str, Any]) -> Path:
153    """Resolve the directory containing parsed CSV files for a case.
154
155    Args:
156        case: The in-memory case state dictionary.
157
158    Returns:
159        Path to the parsed CSV directory.
160    """
161    csv_output_dir = str(case.get("csv_output_dir", "")).strip()
162    if csv_output_dir:
163        return Path(csv_output_dir)
164
165    csv_paths = collect_case_csv_paths(case)
166    if csv_paths:
167        return csv_paths[0].parent
168
169    return Path(case["case_dir"]) / "parsed"

Resolve the directory containing parsed CSV files for a case.

Arguments:
  • case: The in-memory case state dictionary.
Returns:

Path to the parsed CSV directory.