app.routes.handlers

HTTP route definitions for the AIFT (AI Forensic Triage) Flask application.

This module serves as the central registration point for all route blueprints. It defines the core routes_bp blueprint (static/UI, case management, and settings routes) and imports sub-blueprints from:

  • evidence -- evidence helpers and route handlers.
  • artifacts -- artifact/profile helpers and route handlers.
  • analysis -- AI-powered analysis routes.
  • chat -- interactive chat routes.

Supporting logic lives in:

  • state -- shared state, constants, SSE streaming.
  • evidence -- archive extraction, CSV/hash helpers.
  • artifacts -- artifact option normalisation, profile helpers.
  • tasks -- background parse/analysis/chat runners.
Attributes:
  • routes_bp: Flask Blueprint for core routes (UI, cases, settings).
  1"""HTTP route definitions for the AIFT (AI Forensic Triage) Flask application.
  2
  3This module serves as the central registration point for all route blueprints.
  4It defines the core ``routes_bp`` blueprint (static/UI, case management, and
  5settings routes) and imports sub-blueprints from:
  6
  7* :mod:`evidence` -- evidence helpers and route handlers.
  8* :mod:`artifacts` -- artifact/profile helpers and route handlers.
  9* :mod:`analysis` -- AI-powered analysis routes.
 10* :mod:`chat` -- interactive chat routes.
 11
 12Supporting logic lives in:
 13
 14* :mod:`state` -- shared state, constants, SSE streaming.
 15* :mod:`evidence` -- archive extraction, CSV/hash helpers.
 16* :mod:`artifacts` -- artifact option normalisation, profile helpers.
 17* :mod:`tasks` -- background parse/analysis/chat runners.
 18
 19Attributes:
 20    routes_bp: Flask ``Blueprint`` for core routes (UI, cases, settings).
 21"""
 22
 23from __future__ import annotations
 24
 25import copy
 26from datetime import datetime
 27import logging
 28from pathlib import Path
 29import threading  # noqa: F401 -- re-exported for test patching
 30from urllib.request import urlopen, Request
 31from urllib.error import URLError
 32from uuid import uuid4
 33
 34from flask import (
 35    Blueprint,
 36    Flask,
 37    Response,
 38    current_app,
 39    g,
 40    render_template,
 41    request,
 42    send_file,
 43)
 44
 45from ..ai_providers import AIProviderError, create_provider  # noqa: F401 -- re-exported
 46from ..analyzer import ForensicAnalyzer  # noqa: F401 -- re-exported for test patching
 47from ..audit import AuditLogger
 48from ..case_logging import (
 49    case_log_context,  # noqa: F401 -- re-exported
 50    pop_case_log_context,
 51    push_case_log_context,
 52    register_case_log_handler,
 53)
 54from ..config import load_config, save_config, validate_config
 55from ..hasher import compute_hashes, verify_hash  # noqa: F401 -- re-exported
 56from ..parser import WINDOWS_ARTIFACT_REGISTRY, ForensicParser  # noqa: F401 -- re-exported
 57from ..reporter import ReportGenerator  # noqa: F401 -- re-exported
 58from ..version import TOOL_VERSION  # noqa: F401 -- re-exported
 59
 60from .state import (
 61    CASES_ROOT,
 62    IMAGES_ROOT,
 63    CONNECTION_TEST_SYSTEM_PROMPT,
 64    CONNECTION_TEST_USER_PROMPT,
 65    SSE_INITIAL_IDLE_GRACE_SECONDS,  # noqa: F401 -- re-exported for test patching
 66    CASE_STATES,
 67    PARSE_PROGRESS,
 68    ANALYSIS_PROGRESS,
 69    CHAT_PROGRESS,
 70    STATE_LOCK,
 71    now_iso,
 72    error_response,
 73    success_response,
 74    resolve_logo_filename,
 75    new_progress,
 76    mask_sensitive,
 77    deep_merge,
 78    audit_config_change,
 79    cleanup_terminal_cases,
 80    safe_name,
 81)
 82from .artifacts import (
 83    RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS,  # noqa: F401 -- re-exported for test access
 84)
 85
 86# Sub-blueprints
 87from .evidence import evidence_bp
 88from .artifacts import artifact_bp
 89from .analysis import analysis_bp
 90from .chat import chat_bp
 91
 92__all__ = ["register_routes"]
 93
 94LOGGER = logging.getLogger(__name__)
 95
 96routes_bp = Blueprint("routes", __name__)
 97_REQUEST_CASE_LOG_TOKEN = "_aift_case_log_token"
 98
 99
100# ---------------------------------------------------------------------------
101# Request lifecycle hooks
102# ---------------------------------------------------------------------------
103
104@routes_bp.before_app_request
105def _bind_case_log_context_for_request() -> None:
106    """Bind case-specific logging context before each request."""
107    case_id: str | None = None
108    case_id = str((request.view_args or {}).get("case_id", "")).strip() or None
109    setattr(g, _REQUEST_CASE_LOG_TOKEN, push_case_log_context(case_id))
110
111
112@routes_bp.teardown_app_request
113def _clear_case_log_context_for_request(_error: BaseException | None) -> None:
114    """Pop case-scoped logging context after each request.
115
116    Args:
117        _error: Optional exception (ignored).
118    """
119    token = getattr(g, _REQUEST_CASE_LOG_TOKEN, None)
120    if token is not None:
121        pop_case_log_context(token)
122        setattr(g, _REQUEST_CASE_LOG_TOKEN, None)
123
124
125# ---------------------------------------------------------------------------
126# Static / UI routes
127# ---------------------------------------------------------------------------
128
129@routes_bp.get("/")
130def index() -> str:
131    """Serve the main single-page application HTML.
132
133    Returns:
134        Rendered ``index.html`` template.
135    """
136    return render_template(
137        "index.html",
138        logo_filename=resolve_logo_filename(),
139        tool_version=TOOL_VERSION,
140    )
141
142
143@routes_bp.get("/api/version/check")
144def version_check() -> tuple[Response, int] | Response:
145    """Check whether a newer AIFT release is available on GitHub.
146
147    Fetches the latest release tag from the GitHub API and compares it
148    with the running ``TOOL_VERSION``.  The comparison is purely
149    string-based (not semver) so any difference is reported.
150
151    Returns:
152        JSON with ``current``, ``latest``, and ``update_available`` fields,
153        or an error payload when the network is unreachable.
154    """
155    import json as _json
156
157    github_url = (
158        "https://api.github.com/repos/FlipForensics/AIFT/releases/latest"
159    )
160    req = Request(github_url, headers={"Accept": "application/vnd.github+json"})
161    try:
162        with urlopen(req, timeout=5) as resp:
163            data = _json.loads(resp.read().decode())
164        tag: str = data.get("tag_name", "")
165        latest = tag.lstrip("vV")
166        return success_response({
167            "current": TOOL_VERSION,
168            "latest": latest,
169            "update_available": latest != TOOL_VERSION,
170        })
171    except (URLError, OSError, ValueError) as exc:
172        LOGGER.debug("Version check failed: %s", exc)
173        return error_response("offline", 503)
174
175
176@routes_bp.get("/favicon.ico")
177def favicon() -> Response | tuple[Response, int]:
178    """Serve the application favicon.
179
180    Returns:
181        The logo image file, or a 404 error.
182    """
183    logo_filename = resolve_logo_filename()
184    if not logo_filename:
185        return error_response("Icon not found.", 404)
186    return image_asset(logo_filename)
187
188
189@routes_bp.get("/images/<path:filename>")
190def image_asset(filename: str) -> Response | tuple[Response, int]:
191    """Serve a static image asset from the images directory.
192
193    Args:
194        filename: Image filename (no directory components).
195
196    Returns:
197        The image file, or a JSON error.
198    """
199    if not IMAGES_ROOT.is_dir():
200        return error_response("Image directory not found.", 404)
201
202    normalized = str(filename).strip()
203    if not normalized or Path(normalized).name != normalized:
204        return error_response("Invalid image filename.", 400)
205    if "/" in normalized or "\\" in normalized:
206        return error_response("Invalid image filename.", 400)
207
208    image_path = IMAGES_ROOT / normalized
209    if not image_path.is_file():
210        return error_response("Image not found.", 404)
211
212    return send_file(image_path)
213
214
215# ---------------------------------------------------------------------------
216# Case management routes
217# ---------------------------------------------------------------------------
218
219@routes_bp.post("/api/cases")
220def create_case() -> tuple[Response, int]:
221    """Create a new forensic analysis case.
222
223    Returns:
224        ``(Response, 201)`` with case_id and case_name, or error.
225    """
226    cleanup_terminal_cases()
227
228    payload = request.get_json(silent=True) or {}
229    if not isinstance(payload, dict):
230        return error_response("Request body must be a JSON object.", 400)
231    case_name = str(payload.get("case_name", "")).strip()
232    has_custom_name = bool(case_name)
233    if not case_name:
234        case_name = datetime.now().strftime("Case %Y-%m-%d %H:%M:%S")
235
236    # Use sanitised case name + timestamp as folder name for readability;
237    # fall back to UUID if no custom name was provided.
238    if has_custom_name:
239        folder_name = safe_name(case_name, "case") + "_" + datetime.now().strftime("%Y%m%d_%H%M%S")
240    else:
241        folder_name = str(uuid4())
242    case_id = folder_name
243    case_dir = CASES_ROOT / case_id
244    (case_dir / "evidence").mkdir(parents=True, exist_ok=True)
245    (case_dir / "parsed").mkdir(parents=True, exist_ok=True)
246    (case_dir / "reports").mkdir(parents=True, exist_ok=True)
247    try:
248        log_file_path = register_case_log_handler(case_id=case_id, case_dir=case_dir)
249    except OSError:
250        LOGGER.exception("Failed to initialize case log file for case %s", case_id)
251        return error_response("Failed to initialize case logging due to a filesystem error.", 500)
252
253    with case_log_context(case_id):
254        LOGGER.info("Initialized case logging at %s", log_file_path)
255
256    audit = AuditLogger(case_dir)
257    audit.log(
258        "case_created",
259        {
260            "case_id": case_id,
261            "name": case_name,
262            "creation_time": now_iso(),
263        },
264    )
265
266    case_state = {
267        "case_id": case_id,
268        "case_name": case_name,
269        "case_dir": case_dir,
270        "audit": audit,
271        "evidence_mode": "",
272        "source_path": "",
273        "stored_path": "",
274        "uploaded_files": [],
275        "evidence_path": "",
276        "evidence_hashes": {},
277        "image_metadata": {},
278        "available_artifacts": [],
279        "selected_artifacts": [],
280        "analysis_artifacts": [],
281        "artifact_options": [],
282        "analysis_date_range": None,
283        "csv_output_dir": "",
284        "parse_results": [],
285        "artifact_csv_paths": {},
286        "investigation_context": "",
287        "analysis_results": {},
288        "status": "active",
289        "log_file_path": str(log_file_path),
290    }
291    with STATE_LOCK:
292        CASE_STATES[case_id] = case_state
293        PARSE_PROGRESS[case_id] = new_progress()
294        ANALYSIS_PROGRESS[case_id] = new_progress()
295        CHAT_PROGRESS[case_id] = new_progress()
296
297    return success_response({"case_id": case_id, "case_name": case_name}, 201)
298
299
300# ---------------------------------------------------------------------------
301# Settings routes
302# ---------------------------------------------------------------------------
303
304@routes_bp.get("/api/settings")
305def get_settings() -> Response:
306    """Retrieve current settings with sensitive values masked.
307
308    Returns:
309        JSON with masked configuration.
310    """
311    config = current_app.config.get("AIFT_CONFIG", {})
312    if not isinstance(config, dict):
313        config = {}
314    return success_response(mask_sensitive(config))
315
316
317@routes_bp.post("/api/settings")
318def update_settings() -> Response | tuple[Response, int]:
319    """Update application settings by deep-merging the request payload.
320
321    Returns:
322        JSON with updated masked configuration, or 400 error.
323    """
324    payload = request.get_json(silent=True)
325    if not isinstance(payload, dict):
326        return error_response("Settings payload must be a JSON object.", 400)
327
328    config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml")))
329    current_config = load_config(config_path, use_env_overrides=False)
330    changed_keys = deep_merge(current_config, payload)
331
332    validation_errors = validate_config(current_config)
333    if validation_errors:
334        return error_response(
335            f"Invalid settings: {'; '.join(validation_errors)}", 400
336        )
337
338    save_config(current_config, config_path)
339
340    refreshed = load_config(config_path)
341    current_app.config["AIFT_CONFIG"] = refreshed
342    if changed_keys:
343        LOGGER.info("Updated settings: %s", ", ".join(changed_keys))
344        audit_config_change(changed_keys)
345
346    return success_response(mask_sensitive(refreshed))
347
348
349@routes_bp.post("/api/settings/test-connection")
350def test_settings_connection() -> Response | tuple[Response, int]:
351    """Test the configured AI provider connection.
352
353    Returns:
354        JSON with model info and response preview, or error.
355    """
356    config = current_app.config.get("AIFT_CONFIG", {})
357    if not isinstance(config, dict):
358        return error_response("Invalid in-memory configuration state.", 500)
359    analysis_config = config.get("analysis", {})
360    if not isinstance(analysis_config, dict):
361        analysis_config = {}
362    raw_connection_tokens = analysis_config.get("connection_test_max_tokens", 256)
363    try:
364        connection_max_tokens = max(1, int(raw_connection_tokens))
365    except (TypeError, ValueError):
366        connection_max_tokens = 256
367
368    try:
369        provider = create_provider(copy.deepcopy(config))
370        model_info = provider.get_model_info()
371        reply = provider.analyze(
372            system_prompt=CONNECTION_TEST_SYSTEM_PROMPT,
373            user_prompt=CONNECTION_TEST_USER_PROMPT,
374            max_tokens=connection_max_tokens,
375        )
376        preview = str(reply).strip()
377        if not preview:
378            return error_response("Provider returned an empty response.", 502)
379        return success_response(
380            {
381                "status": "ok",
382                "model_info": model_info,
383                "response_preview": preview[:240],
384            }
385        )
386    except ValueError as error:
387        LOGGER.warning("Settings connection test rejected due to configuration: %s", error)
388        return error_response(str(error), 400)
389    except AIProviderError as error:
390        LOGGER.warning("Settings connection test failed: %s", error)
391        return error_response(str(error), 502)
392    except Exception:
393        LOGGER.exception("Unexpected failure during settings connection test.")
394        return error_response("Unexpected error while testing provider connection.", 500)
395
396
397# ---------------------------------------------------------------------------
398# Registration
399# ---------------------------------------------------------------------------
400
401def register_routes(app: Flask) -> None:
402    """Register all HTTP route handlers with the Flask application.
403
404    Registers the core ``routes_bp`` blueprint plus sub-blueprints for
405    evidence, artifact, analysis, and chat routes.
406
407    Args:
408        app: The Flask application instance.
409    """
410    app.register_blueprint(routes_bp)
411    app.register_blueprint(evidence_bp)
412    app.register_blueprint(artifact_bp)
413    app.register_blueprint(analysis_bp)
414    app.register_blueprint(chat_bp)
def register_routes(app: flask.app.Flask) -> None:
402def register_routes(app: Flask) -> None:
403    """Register all HTTP route handlers with the Flask application.
404
405    Registers the core ``routes_bp`` blueprint plus sub-blueprints for
406    evidence, artifact, analysis, and chat routes.
407
408    Args:
409        app: The Flask application instance.
410    """
411    app.register_blueprint(routes_bp)
412    app.register_blueprint(evidence_bp)
413    app.register_blueprint(artifact_bp)
414    app.register_blueprint(analysis_bp)
415    app.register_blueprint(chat_bp)

Register all HTTP route handlers with the Flask application.

Registers the core routes_bp blueprint plus sub-blueprints for evidence, artifact, analysis, and chat routes.

Arguments:
  • app: The Flask application instance.