app.routes.handlers
HTTP route definitions for the AIFT (AI Forensic Triage) Flask application.
This module serves as the central registration point for all route blueprints.
It defines the core routes_bp blueprint (static/UI, case management, and
settings routes) and imports sub-blueprints from:
evidence-- evidence helpers and route handlers.artifacts-- artifact/profile helpers and route handlers.analysis-- AI-powered analysis routes.chat-- interactive chat routes.
Supporting logic lives in:
state-- shared state, constants, SSE streaming.evidence-- archive extraction, CSV/hash helpers.artifacts-- artifact option normalisation, profile helpers.tasks-- background parse/analysis/chat runners.
Attributes:
- routes_bp: Flask
Blueprintfor core routes (UI, cases, settings).
1"""HTTP route definitions for the AIFT (AI Forensic Triage) Flask application. 2 3This module serves as the central registration point for all route blueprints. 4It defines the core ``routes_bp`` blueprint (static/UI, case management, and 5settings routes) and imports sub-blueprints from: 6 7* :mod:`evidence` -- evidence helpers and route handlers. 8* :mod:`artifacts` -- artifact/profile helpers and route handlers. 9* :mod:`analysis` -- AI-powered analysis routes. 10* :mod:`chat` -- interactive chat routes. 11 12Supporting logic lives in: 13 14* :mod:`state` -- shared state, constants, SSE streaming. 15* :mod:`evidence` -- archive extraction, CSV/hash helpers. 16* :mod:`artifacts` -- artifact option normalisation, profile helpers. 17* :mod:`tasks` -- background parse/analysis/chat runners. 18 19Attributes: 20 routes_bp: Flask ``Blueprint`` for core routes (UI, cases, settings). 21""" 22 23from __future__ import annotations 24 25import copy 26from datetime import datetime 27import logging 28from pathlib import Path 29import threading # noqa: F401 -- re-exported for test patching 30from urllib.request import urlopen, Request 31from urllib.error import URLError 32from uuid import uuid4 33 34from flask import ( 35 Blueprint, 36 Flask, 37 Response, 38 current_app, 39 g, 40 render_template, 41 request, 42 send_file, 43) 44 45from ..ai_providers import AIProviderError, create_provider # noqa: F401 -- re-exported 46from ..analyzer import ForensicAnalyzer # noqa: F401 -- re-exported for test patching 47from ..audit import AuditLogger 48from ..case_logging import ( 49 case_log_context, # noqa: F401 -- re-exported 50 pop_case_log_context, 51 push_case_log_context, 52 register_case_log_handler, 53) 54from ..config import load_config, save_config, validate_config 55from ..hasher import compute_hashes, verify_hash # noqa: F401 -- re-exported 56from ..parser import WINDOWS_ARTIFACT_REGISTRY, ForensicParser # noqa: F401 -- re-exported 57from ..reporter import ReportGenerator # noqa: F401 -- re-exported 58from ..version import TOOL_VERSION # noqa: F401 -- re-exported 59 60from .state import ( 61 CASES_ROOT, 62 IMAGES_ROOT, 63 CONNECTION_TEST_SYSTEM_PROMPT, 64 CONNECTION_TEST_USER_PROMPT, 65 SSE_INITIAL_IDLE_GRACE_SECONDS, # noqa: F401 -- re-exported for test patching 66 CASE_STATES, 67 PARSE_PROGRESS, 68 ANALYSIS_PROGRESS, 69 CHAT_PROGRESS, 70 STATE_LOCK, 71 now_iso, 72 error_response, 73 success_response, 74 resolve_logo_filename, 75 new_progress, 76 mask_sensitive, 77 deep_merge, 78 audit_config_change, 79 cleanup_terminal_cases, 80 safe_name, 81) 82from .artifacts import ( 83 RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS, # noqa: F401 -- re-exported for test access 84) 85 86# Sub-blueprints 87from .evidence import evidence_bp 88from .artifacts import artifact_bp 89from .analysis import analysis_bp 90from .chat import chat_bp 91 92__all__ = ["register_routes"] 93 94LOGGER = logging.getLogger(__name__) 95 96routes_bp = Blueprint("routes", __name__) 97_REQUEST_CASE_LOG_TOKEN = "_aift_case_log_token" 98 99 100# --------------------------------------------------------------------------- 101# Request lifecycle hooks 102# --------------------------------------------------------------------------- 103 104@routes_bp.before_app_request 105def _bind_case_log_context_for_request() -> None: 106 """Bind case-specific logging context before each request.""" 107 case_id: str | None = None 108 case_id = str((request.view_args or {}).get("case_id", "")).strip() or None 109 setattr(g, _REQUEST_CASE_LOG_TOKEN, push_case_log_context(case_id)) 110 111 112@routes_bp.teardown_app_request 113def _clear_case_log_context_for_request(_error: BaseException | None) -> None: 114 """Pop case-scoped logging context after each request. 115 116 Args: 117 _error: Optional exception (ignored). 118 """ 119 token = getattr(g, _REQUEST_CASE_LOG_TOKEN, None) 120 if token is not None: 121 pop_case_log_context(token) 122 setattr(g, _REQUEST_CASE_LOG_TOKEN, None) 123 124 125# --------------------------------------------------------------------------- 126# Static / UI routes 127# --------------------------------------------------------------------------- 128 129@routes_bp.get("/") 130def index() -> str: 131 """Serve the main single-page application HTML. 132 133 Returns: 134 Rendered ``index.html`` template. 135 """ 136 return render_template( 137 "index.html", 138 logo_filename=resolve_logo_filename(), 139 tool_version=TOOL_VERSION, 140 ) 141 142 143@routes_bp.get("/api/version/check") 144def version_check() -> tuple[Response, int] | Response: 145 """Check whether a newer AIFT release is available on GitHub. 146 147 Fetches the latest release tag from the GitHub API and compares it 148 with the running ``TOOL_VERSION``. The comparison is purely 149 string-based (not semver) so any difference is reported. 150 151 Returns: 152 JSON with ``current``, ``latest``, and ``update_available`` fields, 153 or an error payload when the network is unreachable. 154 """ 155 import json as _json 156 157 github_url = ( 158 "https://api.github.com/repos/FlipForensics/AIFT/releases/latest" 159 ) 160 req = Request(github_url, headers={"Accept": "application/vnd.github+json"}) 161 try: 162 with urlopen(req, timeout=5) as resp: 163 data = _json.loads(resp.read().decode()) 164 tag: str = data.get("tag_name", "") 165 latest = tag.lstrip("vV") 166 return success_response({ 167 "current": TOOL_VERSION, 168 "latest": latest, 169 "update_available": latest != TOOL_VERSION, 170 }) 171 except (URLError, OSError, ValueError) as exc: 172 LOGGER.debug("Version check failed: %s", exc) 173 return error_response("offline", 503) 174 175 176@routes_bp.get("/favicon.ico") 177def favicon() -> Response | tuple[Response, int]: 178 """Serve the application favicon. 179 180 Returns: 181 The logo image file, or a 404 error. 182 """ 183 logo_filename = resolve_logo_filename() 184 if not logo_filename: 185 return error_response("Icon not found.", 404) 186 return image_asset(logo_filename) 187 188 189@routes_bp.get("/images/<path:filename>") 190def image_asset(filename: str) -> Response | tuple[Response, int]: 191 """Serve a static image asset from the images directory. 192 193 Args: 194 filename: Image filename (no directory components). 195 196 Returns: 197 The image file, or a JSON error. 198 """ 199 if not IMAGES_ROOT.is_dir(): 200 return error_response("Image directory not found.", 404) 201 202 normalized = str(filename).strip() 203 if not normalized or Path(normalized).name != normalized: 204 return error_response("Invalid image filename.", 400) 205 if "/" in normalized or "\\" in normalized: 206 return error_response("Invalid image filename.", 400) 207 208 image_path = IMAGES_ROOT / normalized 209 if not image_path.is_file(): 210 return error_response("Image not found.", 404) 211 212 return send_file(image_path) 213 214 215# --------------------------------------------------------------------------- 216# Case management routes 217# --------------------------------------------------------------------------- 218 219@routes_bp.post("/api/cases") 220def create_case() -> tuple[Response, int]: 221 """Create a new forensic analysis case. 222 223 Returns: 224 ``(Response, 201)`` with case_id and case_name, or error. 225 """ 226 cleanup_terminal_cases() 227 228 payload = request.get_json(silent=True) or {} 229 if not isinstance(payload, dict): 230 return error_response("Request body must be a JSON object.", 400) 231 case_name = str(payload.get("case_name", "")).strip() 232 has_custom_name = bool(case_name) 233 if not case_name: 234 case_name = datetime.now().strftime("Case %Y-%m-%d %H:%M:%S") 235 236 # Use sanitised case name + timestamp as folder name for readability; 237 # fall back to UUID if no custom name was provided. 238 if has_custom_name: 239 folder_name = safe_name(case_name, "case") + "_" + datetime.now().strftime("%Y%m%d_%H%M%S") 240 else: 241 folder_name = str(uuid4()) 242 case_id = folder_name 243 case_dir = CASES_ROOT / case_id 244 (case_dir / "evidence").mkdir(parents=True, exist_ok=True) 245 (case_dir / "parsed").mkdir(parents=True, exist_ok=True) 246 (case_dir / "reports").mkdir(parents=True, exist_ok=True) 247 try: 248 log_file_path = register_case_log_handler(case_id=case_id, case_dir=case_dir) 249 except OSError: 250 LOGGER.exception("Failed to initialize case log file for case %s", case_id) 251 return error_response("Failed to initialize case logging due to a filesystem error.", 500) 252 253 with case_log_context(case_id): 254 LOGGER.info("Initialized case logging at %s", log_file_path) 255 256 audit = AuditLogger(case_dir) 257 audit.log( 258 "case_created", 259 { 260 "case_id": case_id, 261 "name": case_name, 262 "creation_time": now_iso(), 263 }, 264 ) 265 266 case_state = { 267 "case_id": case_id, 268 "case_name": case_name, 269 "case_dir": case_dir, 270 "audit": audit, 271 "evidence_mode": "", 272 "source_path": "", 273 "stored_path": "", 274 "uploaded_files": [], 275 "evidence_path": "", 276 "evidence_hashes": {}, 277 "image_metadata": {}, 278 "available_artifacts": [], 279 "selected_artifacts": [], 280 "analysis_artifacts": [], 281 "artifact_options": [], 282 "analysis_date_range": None, 283 "csv_output_dir": "", 284 "parse_results": [], 285 "artifact_csv_paths": {}, 286 "investigation_context": "", 287 "analysis_results": {}, 288 "status": "active", 289 "log_file_path": str(log_file_path), 290 } 291 with STATE_LOCK: 292 CASE_STATES[case_id] = case_state 293 PARSE_PROGRESS[case_id] = new_progress() 294 ANALYSIS_PROGRESS[case_id] = new_progress() 295 CHAT_PROGRESS[case_id] = new_progress() 296 297 return success_response({"case_id": case_id, "case_name": case_name}, 201) 298 299 300# --------------------------------------------------------------------------- 301# Settings routes 302# --------------------------------------------------------------------------- 303 304@routes_bp.get("/api/settings") 305def get_settings() -> Response: 306 """Retrieve current settings with sensitive values masked. 307 308 Returns: 309 JSON with masked configuration. 310 """ 311 config = current_app.config.get("AIFT_CONFIG", {}) 312 if not isinstance(config, dict): 313 config = {} 314 return success_response(mask_sensitive(config)) 315 316 317@routes_bp.post("/api/settings") 318def update_settings() -> Response | tuple[Response, int]: 319 """Update application settings by deep-merging the request payload. 320 321 Returns: 322 JSON with updated masked configuration, or 400 error. 323 """ 324 payload = request.get_json(silent=True) 325 if not isinstance(payload, dict): 326 return error_response("Settings payload must be a JSON object.", 400) 327 328 config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml"))) 329 current_config = load_config(config_path, use_env_overrides=False) 330 changed_keys = deep_merge(current_config, payload) 331 332 validation_errors = validate_config(current_config) 333 if validation_errors: 334 return error_response( 335 f"Invalid settings: {'; '.join(validation_errors)}", 400 336 ) 337 338 save_config(current_config, config_path) 339 340 refreshed = load_config(config_path) 341 current_app.config["AIFT_CONFIG"] = refreshed 342 if changed_keys: 343 LOGGER.info("Updated settings: %s", ", ".join(changed_keys)) 344 audit_config_change(changed_keys) 345 346 return success_response(mask_sensitive(refreshed)) 347 348 349@routes_bp.post("/api/settings/test-connection") 350def test_settings_connection() -> Response | tuple[Response, int]: 351 """Test the configured AI provider connection. 352 353 Returns: 354 JSON with model info and response preview, or error. 355 """ 356 config = current_app.config.get("AIFT_CONFIG", {}) 357 if not isinstance(config, dict): 358 return error_response("Invalid in-memory configuration state.", 500) 359 analysis_config = config.get("analysis", {}) 360 if not isinstance(analysis_config, dict): 361 analysis_config = {} 362 raw_connection_tokens = analysis_config.get("connection_test_max_tokens", 256) 363 try: 364 connection_max_tokens = max(1, int(raw_connection_tokens)) 365 except (TypeError, ValueError): 366 connection_max_tokens = 256 367 368 try: 369 provider = create_provider(copy.deepcopy(config)) 370 model_info = provider.get_model_info() 371 reply = provider.analyze( 372 system_prompt=CONNECTION_TEST_SYSTEM_PROMPT, 373 user_prompt=CONNECTION_TEST_USER_PROMPT, 374 max_tokens=connection_max_tokens, 375 ) 376 preview = str(reply).strip() 377 if not preview: 378 return error_response("Provider returned an empty response.", 502) 379 return success_response( 380 { 381 "status": "ok", 382 "model_info": model_info, 383 "response_preview": preview[:240], 384 } 385 ) 386 except ValueError as error: 387 LOGGER.warning("Settings connection test rejected due to configuration: %s", error) 388 return error_response(str(error), 400) 389 except AIProviderError as error: 390 LOGGER.warning("Settings connection test failed: %s", error) 391 return error_response(str(error), 502) 392 except Exception: 393 LOGGER.exception("Unexpected failure during settings connection test.") 394 return error_response("Unexpected error while testing provider connection.", 500) 395 396 397# --------------------------------------------------------------------------- 398# Registration 399# --------------------------------------------------------------------------- 400 401def register_routes(app: Flask) -> None: 402 """Register all HTTP route handlers with the Flask application. 403 404 Registers the core ``routes_bp`` blueprint plus sub-blueprints for 405 evidence, artifact, analysis, and chat routes. 406 407 Args: 408 app: The Flask application instance. 409 """ 410 app.register_blueprint(routes_bp) 411 app.register_blueprint(evidence_bp) 412 app.register_blueprint(artifact_bp) 413 app.register_blueprint(analysis_bp) 414 app.register_blueprint(chat_bp)
def
register_routes(app: flask.app.Flask) -> None:
402def register_routes(app: Flask) -> None: 403 """Register all HTTP route handlers with the Flask application. 404 405 Registers the core ``routes_bp`` blueprint plus sub-blueprints for 406 evidence, artifact, analysis, and chat routes. 407 408 Args: 409 app: The Flask application instance. 410 """ 411 app.register_blueprint(routes_bp) 412 app.register_blueprint(evidence_bp) 413 app.register_blueprint(artifact_bp) 414 app.register_blueprint(analysis_bp) 415 app.register_blueprint(chat_bp)
Register all HTTP route handlers with the Flask application.
Registers the core routes_bp blueprint plus sub-blueprints for
evidence, artifact, analysis, and chat routes.
Arguments:
- app: The Flask application instance.