app.routes.analysis
AI analysis route handlers for the AIFT Flask application.
Handles starting and streaming progress of AI-powered forensic analysis.
Attributes:
- analysis_bp: Flask Blueprint for analysis routes.
1"""AI analysis route handlers for the AIFT Flask application. 2 3Handles starting and streaming progress of AI-powered forensic analysis. 4 5Attributes: 6 analysis_bp: Flask Blueprint for analysis routes. 7""" 8 9from __future__ import annotations 10 11import copy 12import threading 13from pathlib import Path 14from typing import Any 15 16from flask import Blueprint, Response, current_app, request 17 18from .state import ( 19 STATE_LOCK, 20 ANALYSIS_PROGRESS, 21 cancel_progress, 22 error_response, 23 success_response, 24 get_case, 25 new_progress, 26 emit_progress, 27 stream_sse, 28) 29from .artifacts import sanitize_prompt 30from .tasks import run_task_with_case_log_context, run_analysis 31 32__all__ = ["analysis_bp"] 33 34analysis_bp = Blueprint("analysis", __name__) 35 36 37@analysis_bp.post("/api/cases/<case_id>/analyze") 38def start_analysis(case_id: str) -> tuple[Response, int]: 39 """Start background AI-powered analysis. 40 41 Args: 42 case_id: UUID of the case. 43 44 Returns: 45 ``(Response, 202)`` confirming start, or error. 46 """ 47 case = get_case(case_id) 48 if case is None: 49 return error_response(f"Case not found: {case_id}", 404) 50 51 with STATE_LOCK: 52 has_results = bool(case.get("parse_results") or case.get("artifact_csv_paths")) 53 analysis_artifacts_state = case.get("analysis_artifacts") 54 case_dir = case["case_dir"] 55 analysis_date_range = case.get("analysis_date_range") 56 audit_logger = case["audit"] 57 58 if not has_results: 59 return error_response("No parsed artifacts found. Run parsing first.", 400) 60 if isinstance(analysis_artifacts_state, list): 61 configured_analysis_artifacts = [ 62 artifact 63 for artifact in (str(item).strip() for item in analysis_artifacts_state) 64 if artifact 65 ] 66 if not configured_analysis_artifacts: 67 return error_response( 68 "No artifacts are marked `Parse and use in AI`. Select at least one AI-enabled artifact and parse again.", 69 400, 70 ) 71 72 payload = request.get_json(silent=True) or {} 73 if not isinstance(payload, dict): 74 return error_response("Request body must be a JSON object.", 400) 75 prompt = str(payload.get("prompt", "")).strip() 76 77 prompt_path = Path(case_dir) / "prompt.txt" 78 prompt_details: dict[str, Any] = {"prompt": sanitize_prompt(prompt)} 79 if isinstance(analysis_date_range, dict): 80 start_date = str(analysis_date_range.get("start_date", "")).strip() 81 end_date = str(analysis_date_range.get("end_date", "")).strip() 82 if start_date and end_date: 83 prompt_details["analysis_date_range"] = { 84 "start_date": start_date, 85 "end_date": end_date, 86 } 87 with STATE_LOCK: 88 analysis_state = ANALYSIS_PROGRESS.setdefault(case_id, new_progress()) 89 if analysis_state.get("status") == "running": 90 return error_response("Analysis is already running for this case.", 409) 91 prompt_path.write_text(prompt, encoding="utf-8") 92 ANALYSIS_PROGRESS[case_id] = new_progress(status="running") 93 case["status"] = "running" 94 case["investigation_context"] = prompt 95 # Invalidate prior analysis outputs so a subsequent failure cannot 96 # leave stale results accessible via chat/report/download routes. 97 case["analysis_results"] = {} 98 analysis_artifacts_snapshot = list(case.get("analysis_artifacts", [])) 99 100 # Remove the on-disk results file outside the lock to avoid holding 101 # the lock during I/O. 102 stale_results_path = Path(case_dir) / "analysis_results.json" 103 if stale_results_path.exists(): 104 stale_results_path.unlink(missing_ok=True) 105 106 audit_logger.log("prompt_submitted", prompt_details) 107 108 emit_progress( 109 ANALYSIS_PROGRESS, case_id, 110 { 111 "type": "analysis_started", 112 "prompt_provided": bool(prompt), 113 "analysis_artifact_count": len(analysis_artifacts_snapshot), 114 }, 115 ) 116 config_snapshot = copy.deepcopy(current_app.config.get("AIFT_CONFIG", {})) 117 threading.Thread( 118 target=run_task_with_case_log_context, 119 args=(case_id, run_analysis, case_id, prompt, config_snapshot), 120 daemon=True, 121 ).start() 122 123 return success_response( 124 { 125 "status": "started", 126 "case_id": case_id, 127 "analysis_artifacts": analysis_artifacts_snapshot, 128 }, 129 202, 130 ) 131 132 133@analysis_bp.get("/api/cases/<case_id>/analyze/progress") 134def stream_analysis_progress(case_id: str) -> Response | tuple[Response, int]: 135 """Stream analysis progress events via SSE. 136 137 Args: 138 case_id: UUID of the case. 139 140 Returns: 141 SSE Response, or 404 error. 142 """ 143 if get_case(case_id) is None: 144 return error_response(f"Case not found: {case_id}", 404) 145 return stream_sse(ANALYSIS_PROGRESS, case_id) 146 147 148@analysis_bp.post("/api/cases/<case_id>/analyze/cancel") 149def cancel_analysis_route(case_id: str) -> tuple[Response, int]: 150 """Cancel a running analysis operation for a case. 151 152 Args: 153 case_id: UUID of the case. 154 155 Returns: 156 ``(Response, 200)`` confirming cancellation, or error. 157 """ 158 if get_case(case_id) is None: 159 return error_response(f"Case not found: {case_id}", 404) 160 cancelled = cancel_progress(ANALYSIS_PROGRESS, case_id) 161 if not cancelled: 162 return error_response("No running analysis to cancel.", 409) 163 return success_response({"status": "cancelling", "case_id": case_id})
analysis_bp =
<Blueprint 'analysis'>