app.routes.analysis

AI analysis route handlers for the AIFT Flask application.

Handles starting and streaming progress of AI-powered forensic analysis.

Attributes:
  • analysis_bp: Flask Blueprint for analysis routes.
  1"""AI analysis route handlers for the AIFT Flask application.
  2
  3Handles starting and streaming progress of AI-powered forensic analysis.
  4
  5Attributes:
  6    analysis_bp: Flask Blueprint for analysis routes.
  7"""
  8
  9from __future__ import annotations
 10
 11import copy
 12import threading
 13from pathlib import Path
 14from typing import Any
 15
 16from flask import Blueprint, Response, current_app, request
 17
 18from .state import (
 19    STATE_LOCK,
 20    ANALYSIS_PROGRESS,
 21    cancel_progress,
 22    error_response,
 23    success_response,
 24    get_case,
 25    new_progress,
 26    emit_progress,
 27    stream_sse,
 28)
 29from .artifacts import sanitize_prompt
 30from .tasks import run_task_with_case_log_context, run_analysis
 31
 32__all__ = ["analysis_bp"]
 33
 34analysis_bp = Blueprint("analysis", __name__)
 35
 36
 37@analysis_bp.post("/api/cases/<case_id>/analyze")
 38def start_analysis(case_id: str) -> tuple[Response, int]:
 39    """Start background AI-powered analysis.
 40
 41    Args:
 42        case_id: UUID of the case.
 43
 44    Returns:
 45        ``(Response, 202)`` confirming start, or error.
 46    """
 47    case = get_case(case_id)
 48    if case is None:
 49        return error_response(f"Case not found: {case_id}", 404)
 50
 51    with STATE_LOCK:
 52        has_results = bool(case.get("parse_results") or case.get("artifact_csv_paths"))
 53        analysis_artifacts_state = case.get("analysis_artifacts")
 54        case_dir = case["case_dir"]
 55        analysis_date_range = case.get("analysis_date_range")
 56        audit_logger = case["audit"]
 57
 58    if not has_results:
 59        return error_response("No parsed artifacts found. Run parsing first.", 400)
 60    if isinstance(analysis_artifacts_state, list):
 61        configured_analysis_artifacts = [
 62            artifact
 63            for artifact in (str(item).strip() for item in analysis_artifacts_state)
 64            if artifact
 65        ]
 66        if not configured_analysis_artifacts:
 67            return error_response(
 68                "No artifacts are marked `Parse and use in AI`. Select at least one AI-enabled artifact and parse again.",
 69                400,
 70            )
 71
 72    payload = request.get_json(silent=True) or {}
 73    if not isinstance(payload, dict):
 74        return error_response("Request body must be a JSON object.", 400)
 75    prompt = str(payload.get("prompt", "")).strip()
 76
 77    prompt_path = Path(case_dir) / "prompt.txt"
 78    prompt_details: dict[str, Any] = {"prompt": sanitize_prompt(prompt)}
 79    if isinstance(analysis_date_range, dict):
 80        start_date = str(analysis_date_range.get("start_date", "")).strip()
 81        end_date = str(analysis_date_range.get("end_date", "")).strip()
 82        if start_date and end_date:
 83            prompt_details["analysis_date_range"] = {
 84                "start_date": start_date,
 85                "end_date": end_date,
 86            }
 87    with STATE_LOCK:
 88        analysis_state = ANALYSIS_PROGRESS.setdefault(case_id, new_progress())
 89        if analysis_state.get("status") == "running":
 90            return error_response("Analysis is already running for this case.", 409)
 91        prompt_path.write_text(prompt, encoding="utf-8")
 92        ANALYSIS_PROGRESS[case_id] = new_progress(status="running")
 93        case["status"] = "running"
 94        case["investigation_context"] = prompt
 95        # Invalidate prior analysis outputs so a subsequent failure cannot
 96        # leave stale results accessible via chat/report/download routes.
 97        case["analysis_results"] = {}
 98        analysis_artifacts_snapshot = list(case.get("analysis_artifacts", []))
 99
100    # Remove the on-disk results file outside the lock to avoid holding
101    # the lock during I/O.
102    stale_results_path = Path(case_dir) / "analysis_results.json"
103    if stale_results_path.exists():
104        stale_results_path.unlink(missing_ok=True)
105
106    audit_logger.log("prompt_submitted", prompt_details)
107
108    emit_progress(
109        ANALYSIS_PROGRESS, case_id,
110        {
111            "type": "analysis_started",
112            "prompt_provided": bool(prompt),
113            "analysis_artifact_count": len(analysis_artifacts_snapshot),
114        },
115    )
116    config_snapshot = copy.deepcopy(current_app.config.get("AIFT_CONFIG", {}))
117    threading.Thread(
118        target=run_task_with_case_log_context,
119        args=(case_id, run_analysis, case_id, prompt, config_snapshot),
120        daemon=True,
121    ).start()
122
123    return success_response(
124        {
125            "status": "started",
126            "case_id": case_id,
127            "analysis_artifacts": analysis_artifacts_snapshot,
128        },
129        202,
130    )
131
132
133@analysis_bp.get("/api/cases/<case_id>/analyze/progress")
134def stream_analysis_progress(case_id: str) -> Response | tuple[Response, int]:
135    """Stream analysis progress events via SSE.
136
137    Args:
138        case_id: UUID of the case.
139
140    Returns:
141        SSE Response, or 404 error.
142    """
143    if get_case(case_id) is None:
144        return error_response(f"Case not found: {case_id}", 404)
145    return stream_sse(ANALYSIS_PROGRESS, case_id)
146
147
148@analysis_bp.post("/api/cases/<case_id>/analyze/cancel")
149def cancel_analysis_route(case_id: str) -> tuple[Response, int]:
150    """Cancel a running analysis operation for a case.
151
152    Args:
153        case_id: UUID of the case.
154
155    Returns:
156        ``(Response, 200)`` confirming cancellation, or error.
157    """
158    if get_case(case_id) is None:
159        return error_response(f"Case not found: {case_id}", 404)
160    cancelled = cancel_progress(ANALYSIS_PROGRESS, case_id)
161    if not cancelled:
162        return error_response("No running analysis to cancel.", 409)
163    return success_response({"status": "cancelling", "case_id": case_id})
analysis_bp = <Blueprint 'analysis'>