app.routes.state
In-memory state management, progress tracking, and SSE streaming for AIFT routes.
This module centralises all shared mutable state (case dictionaries, progress stores, threading lock), the SSE event streaming machinery, response helpers, and configuration-change auditing used across the route layer.
Attributes:
- LOGGER: Module-level logger instance.
- PROJECT_ROOT: Absolute
Pathto the AIFT project root directory. - CASES_ROOT: Absolute
Pathto thecases/directory. - IMAGES_ROOT: Absolute
Pathto theimages/directory. - SENSITIVE_KEYS: Set of lowercase key names whose values must be masked.
- MASKED: Placeholder string for masked sensitive values.
- SAFE_NAME_RE: Regex matching characters unsafe for filenames.
- DISSECT_EVIDENCE_EXTENSIONS: Frozenset of extensions Dissect can open.
- MODE_PARSE_AND_AI: Constant for parse-and-analyse mode.
- MODE_PARSE_ONLY: Constant for parse-only mode.
- CONNECTION_TEST_SYSTEM_PROMPT: System prompt for AI connectivity tests.
- CONNECTION_TEST_USER_PROMPT: User prompt for AI connectivity tests.
- DEFAULT_FORENSIC_SYSTEM_PROMPT: Fallback forensic AI system prompt.
- CHAT_HISTORY_MAX_PAIRS: Max user/assistant message pairs in chat context.
- TERMINAL_CASE_STATUSES: Case statuses that indicate a terminal state.
- SSE_POLL_INTERVAL_SECONDS: Sleep interval between SSE poll iterations.
- SSE_INITIAL_IDLE_GRACE_SECONDS: Grace period before idle SSE termination.
- CASE_TTL_SECONDS: Max age for in-memory case state before eviction.
- CASE_STATES: In-memory dict mapping case IDs to state dicts.
- PARSE_PROGRESS: In-memory dict mapping case IDs to parse progress.
- ANALYSIS_PROGRESS: In-memory dict mapping case IDs to analysis progress.
- CHAT_PROGRESS: In-memory dict mapping case IDs to chat progress.
- STATE_LOCK: Reentrant threading lock protecting all state dicts.
1"""In-memory state management, progress tracking, and SSE streaming for AIFT routes. 2 3This module centralises all shared mutable state (case dictionaries, progress 4stores, threading lock), the SSE event streaming machinery, response helpers, 5and configuration-change auditing used across the route layer. 6 7Attributes: 8 LOGGER: Module-level logger instance. 9 PROJECT_ROOT: Absolute ``Path`` to the AIFT project root directory. 10 CASES_ROOT: Absolute ``Path`` to the ``cases/`` directory. 11 IMAGES_ROOT: Absolute ``Path`` to the ``images/`` directory. 12 SENSITIVE_KEYS: Set of lowercase key names whose values must be masked. 13 MASKED: Placeholder string for masked sensitive values. 14 SAFE_NAME_RE: Regex matching characters unsafe for filenames. 15 DISSECT_EVIDENCE_EXTENSIONS: Frozenset of extensions Dissect can open. 16 MODE_PARSE_AND_AI: Constant for parse-and-analyse mode. 17 MODE_PARSE_ONLY: Constant for parse-only mode. 18 CONNECTION_TEST_SYSTEM_PROMPT: System prompt for AI connectivity tests. 19 CONNECTION_TEST_USER_PROMPT: User prompt for AI connectivity tests. 20 DEFAULT_FORENSIC_SYSTEM_PROMPT: Fallback forensic AI system prompt. 21 CHAT_HISTORY_MAX_PAIRS: Max user/assistant message pairs in chat context. 22 TERMINAL_CASE_STATUSES: Case statuses that indicate a terminal state. 23 SSE_POLL_INTERVAL_SECONDS: Sleep interval between SSE poll iterations. 24 SSE_INITIAL_IDLE_GRACE_SECONDS: Grace period before idle SSE termination. 25 CASE_TTL_SECONDS: Max age for in-memory case state before eviction. 26 CASE_STATES: In-memory dict mapping case IDs to state dicts. 27 PARSE_PROGRESS: In-memory dict mapping case IDs to parse progress. 28 ANALYSIS_PROGRESS: In-memory dict mapping case IDs to analysis progress. 29 CHAT_PROGRESS: In-memory dict mapping case IDs to chat progress. 30 STATE_LOCK: Reentrant threading lock protecting all state dicts. 31""" 32 33from __future__ import annotations 34 35import copy 36from datetime import datetime, timezone 37import json 38import logging 39from pathlib import Path 40import re 41import threading 42import time 43from typing import Any 44 45from flask import Response, jsonify, stream_with_context 46 47from ..case_logging import unregister_case_log_handler 48from ..config import LOGO_FILE_CANDIDATES 49 50__all__ = [ 51 "LOGGER", 52 "PROJECT_ROOT", 53 "CASES_ROOT", 54 "IMAGES_ROOT", 55 "SENSITIVE_KEYS", 56 "MASKED", 57 "SAFE_NAME_RE", 58 "DISSECT_EVIDENCE_EXTENSIONS", 59 "MODE_PARSE_AND_AI", 60 "MODE_PARSE_ONLY", 61 "CONNECTION_TEST_SYSTEM_PROMPT", 62 "CONNECTION_TEST_USER_PROMPT", 63 "DEFAULT_FORENSIC_SYSTEM_PROMPT", 64 "CHAT_HISTORY_MAX_PAIRS", 65 "TERMINAL_CASE_STATUSES", 66 "SSE_POLL_INTERVAL_SECONDS", 67 "SSE_INITIAL_IDLE_GRACE_SECONDS", 68 "CASE_TTL_SECONDS", 69 "CASE_STATES", 70 "PARSE_PROGRESS", 71 "ANALYSIS_PROGRESS", 72 "CHAT_PROGRESS", 73 "STATE_LOCK", 74 "now_iso", 75 "error_response", 76 "success_response", 77 "safe_name", 78 "resolve_logo_filename", 79 "safe_int", 80 "normalize_case_status", 81 "new_progress", 82 "set_progress_status", 83 "emit_progress", 84 "stream_sse", 85 "get_case", 86 "mark_case_status", 87 "cancel_progress", 88 "is_cancelled", 89 "get_cancel_event", 90 "cleanup_case_entries", 91 "cleanup_terminal_cases", 92 "mask_sensitive", 93 "deep_merge", 94 "sanitize_changed_keys", 95 "audit_config_change", 96] 97 98LOGGER = logging.getLogger(__name__) 99 100PROJECT_ROOT = Path(__file__).resolve().parents[2] 101CASES_ROOT = PROJECT_ROOT / "cases" 102IMAGES_ROOT = PROJECT_ROOT / "images" 103SENSITIVE_KEYS = {"api_key", "token", "secret", "password"} 104MASKED = "********" 105SAFE_NAME_RE = re.compile(r"[^A-Za-z0-9._-]+") 106 107DISSECT_EVIDENCE_EXTENSIONS = frozenset({ 108 ".e01", ".ex01", ".s01", ".l01", 109 ".dd", ".img", ".raw", ".bin", ".iso", 110 ".000", ".001", 111 ".vmdk", ".vhd", ".vhdx", ".vdi", ".qcow2", ".hdd", ".hds", 112 ".vmx", ".vmwarevm", ".vbox", ".vmcx", ".ovf", ".ova", ".pvm", ".pvs", ".utm", ".xva", ".vma", 113 ".vbk", 114 ".asdf", ".asif", 115 ".ad1", 116 ".tar", ".gz", ".tgz", 117 ".zip", ".7z", 118}) 119 120MODE_PARSE_AND_AI = "parse_and_ai" 121MODE_PARSE_ONLY = "parse_only" 122CONNECTION_TEST_SYSTEM_PROMPT = "You are a connectivity test assistant. Reply briefly." 123CONNECTION_TEST_USER_PROMPT = "Reply with: Connection OK." 124DEFAULT_FORENSIC_SYSTEM_PROMPT = ( 125 "You are a digital forensic analyst. " 126 "Analyze ONLY the data provided to you. " 127 "Do not fabricate evidence. " 128 "Prioritize incident-relevant findings and response actions; use baseline only as supporting context." 129) 130CHAT_HISTORY_MAX_PAIRS = 20 131TERMINAL_CASE_STATUSES = frozenset({"completed", "failed", "error"}) 132SSE_POLL_INTERVAL_SECONDS = 0.2 133SSE_INITIAL_IDLE_GRACE_SECONDS = 1.0 134CASE_TTL_SECONDS = 21600 135 136CASE_STATES: dict[str, dict[str, Any]] = {} 137PARSE_PROGRESS: dict[str, dict[str, Any]] = {} 138ANALYSIS_PROGRESS: dict[str, dict[str, Any]] = {} 139CHAT_PROGRESS: dict[str, dict[str, Any]] = {} 140STATE_LOCK = threading.RLock() 141 142 143# --------------------------------------------------------------------------- 144# Simple helpers 145# --------------------------------------------------------------------------- 146 147def now_iso() -> str: 148 """Return the current UTC timestamp as an ISO 8601 string with ``Z`` suffix. 149 150 Returns: 151 A string like ``"2025-01-15T08:30:00Z"``. 152 """ 153 return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") 154 155 156def error_response(message: str, status: int = 400) -> tuple[Response, int]: 157 """Create a standardised JSON error response tuple. 158 159 Args: 160 message: Human-readable error description. 161 status: HTTP status code. Defaults to 400. 162 163 Returns: 164 A ``(Response, int)`` tuple with ``"success": false``. 165 """ 166 return jsonify({"success": False, "error": message}), status 167 168 169def success_response(data: dict[str, Any] | None = None, status: int = 200) -> tuple[Response, int]: 170 """Create a standardised JSON success response tuple. 171 172 Args: 173 data: Optional dict of response data merged with ``"success": true``. 174 status: HTTP status code. Defaults to 200. 175 176 Returns: 177 A ``(Response, int)`` tuple with ``"success": true``. 178 """ 179 payload: dict[str, Any] = {"success": True} 180 if data: 181 payload.update(data) 182 return jsonify(payload), status 183 184 185def safe_name(value: str, fallback: str = "item") -> str: 186 """Sanitise a string for safe use as a filesystem or identifier name. 187 188 Args: 189 value: The raw string to sanitise. 190 fallback: Value to return if sanitisation produces an empty string. 191 192 Returns: 193 A sanitised string safe for file paths and identifiers. 194 """ 195 cleaned = SAFE_NAME_RE.sub("_", value).strip("_") 196 return cleaned or fallback 197 198 199def resolve_logo_filename() -> str: 200 """Resolve the application logo filename from the images directory. 201 202 Returns: 203 The logo filename, or an empty string if none is found. 204 """ 205 if IMAGES_ROOT.is_dir(): 206 for filename in LOGO_FILE_CANDIDATES: 207 if (IMAGES_ROOT / filename).is_file(): 208 return filename 209 image_files = sorted( 210 path.name 211 for path in IMAGES_ROOT.iterdir() 212 if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"} 213 ) 214 if image_files: 215 return image_files[0] 216 return "" 217 218 219def safe_int(value: Any, default: int = 0) -> int: 220 """Safely convert a value to int, returning *default* on failure. 221 222 Args: 223 value: Value to convert. 224 default: Fallback integer. Defaults to 0. 225 226 Returns: 227 The integer representation, or *default*. 228 """ 229 try: 230 return int(value) 231 except (TypeError, ValueError): 232 return default 233 234 235def normalize_case_status(value: Any) -> str: 236 """Normalise a case status value to a lowercase, stripped string. 237 238 Args: 239 value: Raw status value. 240 241 Returns: 242 Lowercase, stripped string. 243 """ 244 return str(value or "").strip().lower() 245 246 247# --------------------------------------------------------------------------- 248# Progress / SSE helpers 249# --------------------------------------------------------------------------- 250 251def new_progress(status: str = "idle") -> dict[str, Any]: 252 """Create a fresh progress-tracking dictionary for SSE event stores. 253 254 Args: 255 status: Initial status string. Defaults to ``"idle"``. 256 257 Returns: 258 A progress dict with ``status``, ``events``, ``error``, and 259 ``created_at`` keys. 260 """ 261 return { 262 "status": status, 263 "events": [], 264 "error": None, 265 "created_at": time.monotonic(), 266 "cancel_event": threading.Event(), 267 } 268 269 270def set_progress_status( 271 store: dict[str, dict[str, Any]], 272 case_id: str, 273 status: str, 274 error: str | None = None, 275) -> None: 276 """Update the status (and optionally error) in a progress store. 277 278 Thread-safe: acquires ``STATE_LOCK``. 279 280 Args: 281 store: One of the progress dicts. 282 case_id: UUID of the case. 283 status: New status string. 284 error: Optional error message. 285 """ 286 with STATE_LOCK: 287 state = store.setdefault(case_id, new_progress()) 288 state["status"] = status 289 state["error"] = error 290 291 292def cancel_progress( 293 store: dict[str, dict[str, Any]], 294 case_id: str, 295) -> bool: 296 """Mark a running progress entry as cancelled and signal its cancel event. 297 298 Thread-safe: acquires ``STATE_LOCK``. 299 300 Args: 301 store: One of the progress dicts. 302 case_id: UUID of the case. 303 304 Returns: 305 ``True`` if the entry was running and is now cancelled, ``False`` otherwise. 306 """ 307 with STATE_LOCK: 308 state = store.get(case_id) 309 if state is None or state.get("status") != "running": 310 return False 311 state["status"] = "cancelled" 312 cancel_event = state.get("cancel_event") 313 if isinstance(cancel_event, threading.Event): 314 cancel_event.set() 315 return True 316 317 318def is_cancelled( 319 store: dict[str, dict[str, Any]], 320 case_id: str, 321) -> bool: 322 """Check whether a progress entry has been cancelled. 323 324 Thread-safe: acquires ``STATE_LOCK``. 325 326 Args: 327 store: One of the progress dicts. 328 case_id: UUID of the case. 329 330 Returns: 331 ``True`` if the entry status is ``"cancelled"``. 332 """ 333 with STATE_LOCK: 334 state = store.get(case_id) 335 return state is not None and state.get("status") == "cancelled" 336 337 338def get_cancel_event( 339 store: dict[str, dict[str, Any]], 340 case_id: str, 341) -> threading.Event | None: 342 """Return the cancel event for the current progress entry. 343 344 Thread-safe: acquires ``STATE_LOCK``. The caller should hold a 345 reference to the returned event so that it remains valid even if 346 the progress dict is later replaced by a new run. 347 348 Args: 349 store: One of the progress dicts. 350 case_id: UUID of the case. 351 352 Returns: 353 The ``threading.Event``, or ``None`` if no entry exists. 354 """ 355 with STATE_LOCK: 356 state = store.get(case_id) 357 if state is None: 358 return None 359 event = state.get("cancel_event") 360 return event if isinstance(event, threading.Event) else None 361 362 363def emit_progress( 364 store: dict[str, dict[str, Any]], 365 case_id: str, 366 payload: dict[str, Any], 367) -> None: 368 """Append a progress event to a case's SSE event store. 369 370 Thread-safe: acquires ``STATE_LOCK``. 371 372 Args: 373 store: One of the progress dicts. 374 case_id: UUID of the case. 375 payload: Event dict (must include a ``"type"`` key). 376 """ 377 event = dict(payload) 378 event.setdefault("timestamp", now_iso()) 379 with STATE_LOCK: 380 state = store.setdefault(case_id, new_progress()) 381 event["sequence"] = len(state["events"]) 382 state["events"].append(event) 383 384 385def _cleanup_progress_store(store: dict[str, dict[str, Any]], case_id: str) -> None: 386 """Mark a finished case's progress entry as drained rather than removing it. 387 388 Terminal entries are retained so that reconnecting SSE clients receive a 389 proper completion signal instead of a misleading "Case not found" error. 390 Actual removal is handled later by ``cleanup_terminal_cases``. 391 392 Args: 393 store: One of the progress dicts. 394 case_id: UUID of the case. 395 """ 396 with STATE_LOCK: 397 entry = store.get(case_id) 398 if entry is not None and entry.get("status") in TERMINAL_CASE_STATUSES: 399 entry["_drained"] = True 400 401 402def stream_sse(store: dict[str, dict[str, Any]], case_id: str) -> Response: 403 """Create an SSE streaming ``Response`` that polls a progress event store. 404 405 Args: 406 store: One of the progress dicts. 407 case_id: UUID of the case. 408 409 Returns: 410 A Flask ``Response`` with ``text/event-stream`` MIME type. 411 """ 412 @stream_with_context 413 def stream() -> Any: 414 """Generate SSE data frames by polling the progress event store.""" 415 last = 0 416 initial_idle_deadline = time.monotonic() + SSE_INITIAL_IDLE_GRACE_SECONDS 417 try: 418 while True: 419 with STATE_LOCK: 420 state = store.get(case_id) 421 if state is None: 422 # Progress entry absent — check whether the case 423 # itself still exists. If it does, the progress was 424 # already drained/cleaned; tell the client the 425 # operation finished rather than emitting a 426 # misleading "Case not found" error. 427 case_exists = case_id in CASE_STATES 428 if case_exists: 429 synthetic = {"type": "complete", "message": "Already completed."} 430 else: 431 synthetic = {"type": "error", "message": "Case not found."} 432 yield f"data: {json.dumps(synthetic, separators=(',', ':'))}\n\n" 433 break 434 435 status = str(state.get("status", "idle")) 436 events = list(state.get("events", [])) 437 pending: list[dict[str, Any]] = events[last:] 438 last = len(events) 439 440 if not pending and status == "idle": 441 if time.monotonic() < initial_idle_deadline: 442 yield ": keep-alive\n\n" 443 time.sleep(SSE_POLL_INTERVAL_SECONDS) 444 continue 445 idle = {"type": "idle", "status": "idle"} 446 yield f"data: {json.dumps(idle, separators=(',', ':'))}\n\n" 447 break 448 449 for event in pending: 450 yield f"data: {json.dumps(event, separators=(',', ':'))}\n\n" 451 452 if status in TERMINAL_CASE_STATUSES and not pending: 453 break 454 455 if not pending: 456 yield ": keep-alive\n\n" 457 time.sleep(SSE_POLL_INTERVAL_SECONDS) 458 finally: 459 _cleanup_progress_store(store, case_id) 460 461 return Response( 462 stream(), 463 mimetype="text/event-stream", 464 headers={ 465 "Cache-Control": "no-cache", 466 "X-Accel-Buffering": "no", 467 "Connection": "keep-alive", 468 }, 469 ) 470 471 472# --------------------------------------------------------------------------- 473# Case state helpers 474# --------------------------------------------------------------------------- 475 476def get_case(case_id: str) -> dict[str, Any] | None: 477 """Retrieve the in-memory state dictionary for a case. 478 479 Thread-safe: acquires ``STATE_LOCK``. 480 481 Args: 482 case_id: UUID of the case. 483 484 Returns: 485 The case state dictionary, or ``None``. 486 """ 487 with STATE_LOCK: 488 return CASE_STATES.get(case_id) 489 490 491def mark_case_status(case_id: str, status: str) -> None: 492 """Update the in-memory status of a case. No-op if case missing. 493 494 When transitioning to a terminal status, records the monotonic timestamp 495 in ``_terminal_since`` so cleanup can apply a TTL grace period. 496 497 Args: 498 case_id: UUID of the case. 499 status: New status string. 500 """ 501 normalized = normalize_case_status(status) 502 with STATE_LOCK: 503 case = CASE_STATES.get(case_id) 504 if case is not None: 505 case["status"] = normalized 506 if normalized in TERMINAL_CASE_STATUSES and "_terminal_since" not in case: 507 case["_terminal_since"] = time.monotonic() 508 509 510def cleanup_case_entries(case_id: str) -> None: 511 """Remove all in-memory state entries for a case. 512 513 Args: 514 case_id: UUID of the case. 515 """ 516 with STATE_LOCK: 517 CASE_STATES.pop(case_id, None) 518 PARSE_PROGRESS.pop(case_id, None) 519 ANALYSIS_PROGRESS.pop(case_id, None) 520 CHAT_PROGRESS.pop(case_id, None) 521 unregister_case_log_handler(case_id) 522 523 524def _is_case_expired(case_id: str, now: float) -> bool: 525 """Check whether a case's progress entries have exceeded the TTL. 526 527 Must be called while holding ``STATE_LOCK``. 528 529 Args: 530 case_id: UUID of the case. 531 now: Current monotonic timestamp. 532 533 Returns: 534 ``True`` if the case has exceeded the TTL. 535 """ 536 latest_created = 0.0 537 for store in (PARSE_PROGRESS, ANALYSIS_PROGRESS, CHAT_PROGRESS): 538 entry = store.get(case_id) 539 if entry is not None: 540 latest_created = max(latest_created, entry.get("created_at", 0.0)) 541 if latest_created == 0.0: 542 return False 543 return (now - latest_created) > CASE_TTL_SECONDS 544 545 546def _evict_orphaned_progress(now: float) -> None: 547 """Remove progress entries with no corresponding CASE_STATES entry. 548 549 Must be called while holding ``STATE_LOCK``. 550 551 Args: 552 now: Current monotonic timestamp. 553 """ 554 for store in (PARSE_PROGRESS, ANALYSIS_PROGRESS, CHAT_PROGRESS): 555 orphan_ids = [ 556 cid for cid in store 557 if cid not in CASE_STATES 558 and (now - store[cid].get("created_at", 0.0)) > CASE_TTL_SECONDS 559 ] 560 for cid in orphan_ids: 561 store.pop(cid, None) 562 563 564def cleanup_terminal_cases(exclude_case_id: str | None = None) -> None: 565 """Remove in-memory state for TTL-expired cases. 566 567 Terminal cases (completed, failed, error) are only evicted once their 568 ``_terminal_since`` timestamp exceeds ``CASE_TTL_SECONDS``, so that 569 post-analysis actions (chat, report, download) continue to work. 570 Non-terminal cases are evicted if their progress entries exceed the TTL. 571 572 Only in-memory state is removed; case data on disk is never deleted. 573 574 Args: 575 exclude_case_id: Optional case ID to exempt from cleanup. 576 """ 577 now = time.monotonic() 578 with STATE_LOCK: 579 evict_case_ids = [] 580 for case_id, case in CASE_STATES.items(): 581 if case_id == exclude_case_id: 582 continue 583 is_terminal = normalize_case_status(case.get("status")) in TERMINAL_CASE_STATUSES 584 if is_terminal: 585 terminal_since = case.get("_terminal_since", 0.0) 586 if terminal_since and (now - terminal_since) > CASE_TTL_SECONDS: 587 evict_case_ids.append(case_id) 588 elif _is_case_expired(case_id, now): 589 evict_case_ids.append(case_id) 590 for case_id in evict_case_ids: 591 CASE_STATES.pop(case_id, None) 592 PARSE_PROGRESS.pop(case_id, None) 593 ANALYSIS_PROGRESS.pop(case_id, None) 594 CHAT_PROGRESS.pop(case_id, None) 595 _evict_orphaned_progress(now) 596 for case_id in evict_case_ids: 597 unregister_case_log_handler(case_id) 598 599 600# --------------------------------------------------------------------------- 601# Config / sensitive-data helpers 602# --------------------------------------------------------------------------- 603 604def mask_sensitive(data: Any) -> Any: 605 """Recursively mask sensitive values in a data structure. 606 607 Args: 608 data: Input data structure (dict, list, or scalar). 609 610 Returns: 611 A new structure with sensitive values replaced by ``MASKED``. 612 """ 613 if isinstance(data, dict): 614 masked: dict[str, Any] = {} 615 for key, value in data.items(): 616 if key.lower() in SENSITIVE_KEYS: 617 masked[key] = MASKED if str(value).strip() else "" 618 else: 619 masked[key] = mask_sensitive(value) 620 return masked 621 if isinstance(data, list): 622 return [mask_sensitive(item) for item in data] 623 return data 624 625 626def deep_merge(current: dict[str, Any], updates: dict[str, Any], prefix: str = "") -> list[str]: 627 """Recursively merge *updates* into *current*, tracking changed keys. 628 629 Sensitive keys whose value equals ``MASKED`` are skipped. 630 631 Args: 632 current: Target dictionary (mutated in place). 633 updates: Source dictionary with new values. 634 prefix: Dot-separated key prefix for recursive tracking. 635 636 Returns: 637 List of dot-separated key paths that were changed. 638 """ 639 changed: list[str] = [] 640 for key, value in updates.items(): 641 if not isinstance(key, str): 642 continue 643 full_key = f"{prefix}{key}" 644 if key in current and isinstance(current[key], dict) and isinstance(value, dict): 645 changed.extend(deep_merge(current[key], value, f"{full_key}.")) 646 continue 647 if key.lower() in SENSITIVE_KEYS and isinstance(value, str) and value == MASKED: 648 continue 649 if current.get(key) != value: 650 current[key] = copy.deepcopy(value) 651 changed.append(full_key) 652 return changed 653 654 655def _is_sensitive_path(path: str) -> bool: 656 """Check whether a dot-separated key path contains a sensitive segment. 657 658 Args: 659 path: Dot-separated key path. 660 661 Returns: 662 ``True`` if any segment matches a key in ``SENSITIVE_KEYS``. 663 """ 664 return any(segment.strip().lower() in SENSITIVE_KEYS for segment in path.split(".")) 665 666 667def sanitize_changed_keys(changed_keys: list[str]) -> list[str]: 668 """Sanitise changed config key paths for audit logging. 669 670 Args: 671 changed_keys: Raw list of dot-separated key paths. 672 673 Returns: 674 Deduplicated, sanitised list with sensitive paths redacted. 675 """ 676 sanitized: list[str] = [] 677 for key in changed_keys: 678 if not isinstance(key, str): 679 continue 680 normalized = key.strip() 681 if not normalized: 682 continue 683 if _is_sensitive_path(normalized): 684 normalized = f"{normalized} (redacted)" 685 if normalized not in sanitized: 686 sanitized.append(normalized) 687 return sanitized 688 689 690def audit_config_change(changed_keys: list[str]) -> None: 691 """Write a ``config_changed`` audit entry to all active cases. 692 693 Args: 694 changed_keys: List of changed config key paths. 695 """ 696 sanitized_keys = sanitize_changed_keys(changed_keys) 697 if not sanitized_keys: 698 return 699 700 with STATE_LOCK: 701 audit_loggers = [ 702 case.get("audit") 703 for case in CASE_STATES.values() 704 if isinstance(case, dict) and case.get("audit") is not None 705 ] 706 707 details = { 708 "changed_keys": sanitized_keys, 709 "changed_count": len(sanitized_keys), 710 } 711 for audit_logger in audit_loggers: 712 try: 713 audit_logger.log("config_changed", details) 714 except Exception: 715 LOGGER.exception("Failed to write config_changed audit entry.")
148def now_iso() -> str: 149 """Return the current UTC timestamp as an ISO 8601 string with ``Z`` suffix. 150 151 Returns: 152 A string like ``"2025-01-15T08:30:00Z"``. 153 """ 154 return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
Return the current UTC timestamp as an ISO 8601 string with Z suffix.
Returns:
A string like
"2025-01-15T08:30:00Z".
157def error_response(message: str, status: int = 400) -> tuple[Response, int]: 158 """Create a standardised JSON error response tuple. 159 160 Args: 161 message: Human-readable error description. 162 status: HTTP status code. Defaults to 400. 163 164 Returns: 165 A ``(Response, int)`` tuple with ``"success": false``. 166 """ 167 return jsonify({"success": False, "error": message}), status
Create a standardised JSON error response tuple.
Arguments:
- message: Human-readable error description.
- status: HTTP status code. Defaults to 400.
Returns:
A
(Response, int)tuple with"success": false.
170def success_response(data: dict[str, Any] | None = None, status: int = 200) -> tuple[Response, int]: 171 """Create a standardised JSON success response tuple. 172 173 Args: 174 data: Optional dict of response data merged with ``"success": true``. 175 status: HTTP status code. Defaults to 200. 176 177 Returns: 178 A ``(Response, int)`` tuple with ``"success": true``. 179 """ 180 payload: dict[str, Any] = {"success": True} 181 if data: 182 payload.update(data) 183 return jsonify(payload), status
Create a standardised JSON success response tuple.
Arguments:
- data: Optional dict of response data merged with
"success": true. - status: HTTP status code. Defaults to 200.
Returns:
A
(Response, int)tuple with"success": true.
186def safe_name(value: str, fallback: str = "item") -> str: 187 """Sanitise a string for safe use as a filesystem or identifier name. 188 189 Args: 190 value: The raw string to sanitise. 191 fallback: Value to return if sanitisation produces an empty string. 192 193 Returns: 194 A sanitised string safe for file paths and identifiers. 195 """ 196 cleaned = SAFE_NAME_RE.sub("_", value).strip("_") 197 return cleaned or fallback
Sanitise a string for safe use as a filesystem or identifier name.
Arguments:
- value: The raw string to sanitise.
- fallback: Value to return if sanitisation produces an empty string.
Returns:
A sanitised string safe for file paths and identifiers.
200def resolve_logo_filename() -> str: 201 """Resolve the application logo filename from the images directory. 202 203 Returns: 204 The logo filename, or an empty string if none is found. 205 """ 206 if IMAGES_ROOT.is_dir(): 207 for filename in LOGO_FILE_CANDIDATES: 208 if (IMAGES_ROOT / filename).is_file(): 209 return filename 210 image_files = sorted( 211 path.name 212 for path in IMAGES_ROOT.iterdir() 213 if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"} 214 ) 215 if image_files: 216 return image_files[0] 217 return ""
Resolve the application logo filename from the images directory.
Returns:
The logo filename, or an empty string if none is found.
220def safe_int(value: Any, default: int = 0) -> int: 221 """Safely convert a value to int, returning *default* on failure. 222 223 Args: 224 value: Value to convert. 225 default: Fallback integer. Defaults to 0. 226 227 Returns: 228 The integer representation, or *default*. 229 """ 230 try: 231 return int(value) 232 except (TypeError, ValueError): 233 return default
Safely convert a value to int, returning default on failure.
Arguments:
- value: Value to convert.
- default: Fallback integer. Defaults to 0.
Returns:
The integer representation, or default.
236def normalize_case_status(value: Any) -> str: 237 """Normalise a case status value to a lowercase, stripped string. 238 239 Args: 240 value: Raw status value. 241 242 Returns: 243 Lowercase, stripped string. 244 """ 245 return str(value or "").strip().lower()
Normalise a case status value to a lowercase, stripped string.
Arguments:
- value: Raw status value.
Returns:
Lowercase, stripped string.
252def new_progress(status: str = "idle") -> dict[str, Any]: 253 """Create a fresh progress-tracking dictionary for SSE event stores. 254 255 Args: 256 status: Initial status string. Defaults to ``"idle"``. 257 258 Returns: 259 A progress dict with ``status``, ``events``, ``error``, and 260 ``created_at`` keys. 261 """ 262 return { 263 "status": status, 264 "events": [], 265 "error": None, 266 "created_at": time.monotonic(), 267 "cancel_event": threading.Event(), 268 }
Create a fresh progress-tracking dictionary for SSE event stores.
Arguments:
- status: Initial status string. Defaults to
"idle".
Returns:
A progress dict with
status,events,error, andcreated_atkeys.
271def set_progress_status( 272 store: dict[str, dict[str, Any]], 273 case_id: str, 274 status: str, 275 error: str | None = None, 276) -> None: 277 """Update the status (and optionally error) in a progress store. 278 279 Thread-safe: acquires ``STATE_LOCK``. 280 281 Args: 282 store: One of the progress dicts. 283 case_id: UUID of the case. 284 status: New status string. 285 error: Optional error message. 286 """ 287 with STATE_LOCK: 288 state = store.setdefault(case_id, new_progress()) 289 state["status"] = status 290 state["error"] = error
Update the status (and optionally error) in a progress store.
Thread-safe: acquires STATE_LOCK.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
- status: New status string.
- error: Optional error message.
364def emit_progress( 365 store: dict[str, dict[str, Any]], 366 case_id: str, 367 payload: dict[str, Any], 368) -> None: 369 """Append a progress event to a case's SSE event store. 370 371 Thread-safe: acquires ``STATE_LOCK``. 372 373 Args: 374 store: One of the progress dicts. 375 case_id: UUID of the case. 376 payload: Event dict (must include a ``"type"`` key). 377 """ 378 event = dict(payload) 379 event.setdefault("timestamp", now_iso()) 380 with STATE_LOCK: 381 state = store.setdefault(case_id, new_progress()) 382 event["sequence"] = len(state["events"]) 383 state["events"].append(event)
Append a progress event to a case's SSE event store.
Thread-safe: acquires STATE_LOCK.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
- payload: Event dict (must include a
"type"key).
403def stream_sse(store: dict[str, dict[str, Any]], case_id: str) -> Response: 404 """Create an SSE streaming ``Response`` that polls a progress event store. 405 406 Args: 407 store: One of the progress dicts. 408 case_id: UUID of the case. 409 410 Returns: 411 A Flask ``Response`` with ``text/event-stream`` MIME type. 412 """ 413 @stream_with_context 414 def stream() -> Any: 415 """Generate SSE data frames by polling the progress event store.""" 416 last = 0 417 initial_idle_deadline = time.monotonic() + SSE_INITIAL_IDLE_GRACE_SECONDS 418 try: 419 while True: 420 with STATE_LOCK: 421 state = store.get(case_id) 422 if state is None: 423 # Progress entry absent — check whether the case 424 # itself still exists. If it does, the progress was 425 # already drained/cleaned; tell the client the 426 # operation finished rather than emitting a 427 # misleading "Case not found" error. 428 case_exists = case_id in CASE_STATES 429 if case_exists: 430 synthetic = {"type": "complete", "message": "Already completed."} 431 else: 432 synthetic = {"type": "error", "message": "Case not found."} 433 yield f"data: {json.dumps(synthetic, separators=(',', ':'))}\n\n" 434 break 435 436 status = str(state.get("status", "idle")) 437 events = list(state.get("events", [])) 438 pending: list[dict[str, Any]] = events[last:] 439 last = len(events) 440 441 if not pending and status == "idle": 442 if time.monotonic() < initial_idle_deadline: 443 yield ": keep-alive\n\n" 444 time.sleep(SSE_POLL_INTERVAL_SECONDS) 445 continue 446 idle = {"type": "idle", "status": "idle"} 447 yield f"data: {json.dumps(idle, separators=(',', ':'))}\n\n" 448 break 449 450 for event in pending: 451 yield f"data: {json.dumps(event, separators=(',', ':'))}\n\n" 452 453 if status in TERMINAL_CASE_STATUSES and not pending: 454 break 455 456 if not pending: 457 yield ": keep-alive\n\n" 458 time.sleep(SSE_POLL_INTERVAL_SECONDS) 459 finally: 460 _cleanup_progress_store(store, case_id) 461 462 return Response( 463 stream(), 464 mimetype="text/event-stream", 465 headers={ 466 "Cache-Control": "no-cache", 467 "X-Accel-Buffering": "no", 468 "Connection": "keep-alive", 469 }, 470 )
Create an SSE streaming Response that polls a progress event store.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
Returns:
A Flask
Responsewithtext/event-streamMIME type.
477def get_case(case_id: str) -> dict[str, Any] | None: 478 """Retrieve the in-memory state dictionary for a case. 479 480 Thread-safe: acquires ``STATE_LOCK``. 481 482 Args: 483 case_id: UUID of the case. 484 485 Returns: 486 The case state dictionary, or ``None``. 487 """ 488 with STATE_LOCK: 489 return CASE_STATES.get(case_id)
Retrieve the in-memory state dictionary for a case.
Thread-safe: acquires STATE_LOCK.
Arguments:
- case_id: UUID of the case.
Returns:
The case state dictionary, or
None.
492def mark_case_status(case_id: str, status: str) -> None: 493 """Update the in-memory status of a case. No-op if case missing. 494 495 When transitioning to a terminal status, records the monotonic timestamp 496 in ``_terminal_since`` so cleanup can apply a TTL grace period. 497 498 Args: 499 case_id: UUID of the case. 500 status: New status string. 501 """ 502 normalized = normalize_case_status(status) 503 with STATE_LOCK: 504 case = CASE_STATES.get(case_id) 505 if case is not None: 506 case["status"] = normalized 507 if normalized in TERMINAL_CASE_STATUSES and "_terminal_since" not in case: 508 case["_terminal_since"] = time.monotonic()
Update the in-memory status of a case. No-op if case missing.
When transitioning to a terminal status, records the monotonic timestamp
in _terminal_since so cleanup can apply a TTL grace period.
Arguments:
- case_id: UUID of the case.
- status: New status string.
293def cancel_progress( 294 store: dict[str, dict[str, Any]], 295 case_id: str, 296) -> bool: 297 """Mark a running progress entry as cancelled and signal its cancel event. 298 299 Thread-safe: acquires ``STATE_LOCK``. 300 301 Args: 302 store: One of the progress dicts. 303 case_id: UUID of the case. 304 305 Returns: 306 ``True`` if the entry was running and is now cancelled, ``False`` otherwise. 307 """ 308 with STATE_LOCK: 309 state = store.get(case_id) 310 if state is None or state.get("status") != "running": 311 return False 312 state["status"] = "cancelled" 313 cancel_event = state.get("cancel_event") 314 if isinstance(cancel_event, threading.Event): 315 cancel_event.set() 316 return True
Mark a running progress entry as cancelled and signal its cancel event.
Thread-safe: acquires STATE_LOCK.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
Returns:
Trueif the entry was running and is now cancelled,Falseotherwise.
319def is_cancelled( 320 store: dict[str, dict[str, Any]], 321 case_id: str, 322) -> bool: 323 """Check whether a progress entry has been cancelled. 324 325 Thread-safe: acquires ``STATE_LOCK``. 326 327 Args: 328 store: One of the progress dicts. 329 case_id: UUID of the case. 330 331 Returns: 332 ``True`` if the entry status is ``"cancelled"``. 333 """ 334 with STATE_LOCK: 335 state = store.get(case_id) 336 return state is not None and state.get("status") == "cancelled"
Check whether a progress entry has been cancelled.
Thread-safe: acquires STATE_LOCK.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
Returns:
Trueif the entry status is"cancelled".
339def get_cancel_event( 340 store: dict[str, dict[str, Any]], 341 case_id: str, 342) -> threading.Event | None: 343 """Return the cancel event for the current progress entry. 344 345 Thread-safe: acquires ``STATE_LOCK``. The caller should hold a 346 reference to the returned event so that it remains valid even if 347 the progress dict is later replaced by a new run. 348 349 Args: 350 store: One of the progress dicts. 351 case_id: UUID of the case. 352 353 Returns: 354 The ``threading.Event``, or ``None`` if no entry exists. 355 """ 356 with STATE_LOCK: 357 state = store.get(case_id) 358 if state is None: 359 return None 360 event = state.get("cancel_event") 361 return event if isinstance(event, threading.Event) else None
Return the cancel event for the current progress entry.
Thread-safe: acquires STATE_LOCK. The caller should hold a
reference to the returned event so that it remains valid even if
the progress dict is later replaced by a new run.
Arguments:
- store: One of the progress dicts.
- case_id: UUID of the case.
Returns:
The
threading.Event, orNoneif no entry exists.
511def cleanup_case_entries(case_id: str) -> None: 512 """Remove all in-memory state entries for a case. 513 514 Args: 515 case_id: UUID of the case. 516 """ 517 with STATE_LOCK: 518 CASE_STATES.pop(case_id, None) 519 PARSE_PROGRESS.pop(case_id, None) 520 ANALYSIS_PROGRESS.pop(case_id, None) 521 CHAT_PROGRESS.pop(case_id, None) 522 unregister_case_log_handler(case_id)
Remove all in-memory state entries for a case.
Arguments:
- case_id: UUID of the case.
565def cleanup_terminal_cases(exclude_case_id: str | None = None) -> None: 566 """Remove in-memory state for TTL-expired cases. 567 568 Terminal cases (completed, failed, error) are only evicted once their 569 ``_terminal_since`` timestamp exceeds ``CASE_TTL_SECONDS``, so that 570 post-analysis actions (chat, report, download) continue to work. 571 Non-terminal cases are evicted if their progress entries exceed the TTL. 572 573 Only in-memory state is removed; case data on disk is never deleted. 574 575 Args: 576 exclude_case_id: Optional case ID to exempt from cleanup. 577 """ 578 now = time.monotonic() 579 with STATE_LOCK: 580 evict_case_ids = [] 581 for case_id, case in CASE_STATES.items(): 582 if case_id == exclude_case_id: 583 continue 584 is_terminal = normalize_case_status(case.get("status")) in TERMINAL_CASE_STATUSES 585 if is_terminal: 586 terminal_since = case.get("_terminal_since", 0.0) 587 if terminal_since and (now - terminal_since) > CASE_TTL_SECONDS: 588 evict_case_ids.append(case_id) 589 elif _is_case_expired(case_id, now): 590 evict_case_ids.append(case_id) 591 for case_id in evict_case_ids: 592 CASE_STATES.pop(case_id, None) 593 PARSE_PROGRESS.pop(case_id, None) 594 ANALYSIS_PROGRESS.pop(case_id, None) 595 CHAT_PROGRESS.pop(case_id, None) 596 _evict_orphaned_progress(now) 597 for case_id in evict_case_ids: 598 unregister_case_log_handler(case_id)
Remove in-memory state for TTL-expired cases.
Terminal cases (completed, failed, error) are only evicted once their
_terminal_since timestamp exceeds CASE_TTL_SECONDS, so that
post-analysis actions (chat, report, download) continue to work.
Non-terminal cases are evicted if their progress entries exceed the TTL.
Only in-memory state is removed; case data on disk is never deleted.
Arguments:
- exclude_case_id: Optional case ID to exempt from cleanup.
605def mask_sensitive(data: Any) -> Any: 606 """Recursively mask sensitive values in a data structure. 607 608 Args: 609 data: Input data structure (dict, list, or scalar). 610 611 Returns: 612 A new structure with sensitive values replaced by ``MASKED``. 613 """ 614 if isinstance(data, dict): 615 masked: dict[str, Any] = {} 616 for key, value in data.items(): 617 if key.lower() in SENSITIVE_KEYS: 618 masked[key] = MASKED if str(value).strip() else "" 619 else: 620 masked[key] = mask_sensitive(value) 621 return masked 622 if isinstance(data, list): 623 return [mask_sensitive(item) for item in data] 624 return data
Recursively mask sensitive values in a data structure.
Arguments:
- data: Input data structure (dict, list, or scalar).
Returns:
A new structure with sensitive values replaced by
MASKED.
627def deep_merge(current: dict[str, Any], updates: dict[str, Any], prefix: str = "") -> list[str]: 628 """Recursively merge *updates* into *current*, tracking changed keys. 629 630 Sensitive keys whose value equals ``MASKED`` are skipped. 631 632 Args: 633 current: Target dictionary (mutated in place). 634 updates: Source dictionary with new values. 635 prefix: Dot-separated key prefix for recursive tracking. 636 637 Returns: 638 List of dot-separated key paths that were changed. 639 """ 640 changed: list[str] = [] 641 for key, value in updates.items(): 642 if not isinstance(key, str): 643 continue 644 full_key = f"{prefix}{key}" 645 if key in current and isinstance(current[key], dict) and isinstance(value, dict): 646 changed.extend(deep_merge(current[key], value, f"{full_key}.")) 647 continue 648 if key.lower() in SENSITIVE_KEYS and isinstance(value, str) and value == MASKED: 649 continue 650 if current.get(key) != value: 651 current[key] = copy.deepcopy(value) 652 changed.append(full_key) 653 return changed
Recursively merge updates into current, tracking changed keys.
Sensitive keys whose value equals MASKED are skipped.
Arguments:
- current: Target dictionary (mutated in place).
- updates: Source dictionary with new values.
- prefix: Dot-separated key prefix for recursive tracking.
Returns:
List of dot-separated key paths that were changed.
668def sanitize_changed_keys(changed_keys: list[str]) -> list[str]: 669 """Sanitise changed config key paths for audit logging. 670 671 Args: 672 changed_keys: Raw list of dot-separated key paths. 673 674 Returns: 675 Deduplicated, sanitised list with sensitive paths redacted. 676 """ 677 sanitized: list[str] = [] 678 for key in changed_keys: 679 if not isinstance(key, str): 680 continue 681 normalized = key.strip() 682 if not normalized: 683 continue 684 if _is_sensitive_path(normalized): 685 normalized = f"{normalized} (redacted)" 686 if normalized not in sanitized: 687 sanitized.append(normalized) 688 return sanitized
Sanitise changed config key paths for audit logging.
Arguments:
- changed_keys: Raw list of dot-separated key paths.
Returns:
Deduplicated, sanitised list with sensitive paths redacted.
691def audit_config_change(changed_keys: list[str]) -> None: 692 """Write a ``config_changed`` audit entry to all active cases. 693 694 Args: 695 changed_keys: List of changed config key paths. 696 """ 697 sanitized_keys = sanitize_changed_keys(changed_keys) 698 if not sanitized_keys: 699 return 700 701 with STATE_LOCK: 702 audit_loggers = [ 703 case.get("audit") 704 for case in CASE_STATES.values() 705 if isinstance(case, dict) and case.get("audit") is not None 706 ] 707 708 details = { 709 "changed_keys": sanitized_keys, 710 "changed_count": len(sanitized_keys), 711 } 712 for audit_logger in audit_loggers: 713 try: 714 audit_logger.log("config_changed", details) 715 except Exception: 716 LOGGER.exception("Failed to write config_changed audit entry.")
Write a config_changed audit entry to all active cases.
Arguments:
- changed_keys: List of changed config key paths.