app.routes.state

In-memory state management, progress tracking, and SSE streaming for AIFT routes.

This module centralises all shared mutable state (case dictionaries, progress stores, threading lock), the SSE event streaming machinery, response helpers, and configuration-change auditing used across the route layer.

Attributes:
  • LOGGER: Module-level logger instance.
  • PROJECT_ROOT: Absolute Path to the AIFT project root directory.
  • CASES_ROOT: Absolute Path to the cases/ directory.
  • IMAGES_ROOT: Absolute Path to the images/ directory.
  • SENSITIVE_KEYS: Set of lowercase key names whose values must be masked.
  • MASKED: Placeholder string for masked sensitive values.
  • SAFE_NAME_RE: Regex matching characters unsafe for filenames.
  • DISSECT_EVIDENCE_EXTENSIONS: Frozenset of extensions Dissect can open.
  • MODE_PARSE_AND_AI: Constant for parse-and-analyse mode.
  • MODE_PARSE_ONLY: Constant for parse-only mode.
  • CONNECTION_TEST_SYSTEM_PROMPT: System prompt for AI connectivity tests.
  • CONNECTION_TEST_USER_PROMPT: User prompt for AI connectivity tests.
  • DEFAULT_FORENSIC_SYSTEM_PROMPT: Fallback forensic AI system prompt.
  • CHAT_HISTORY_MAX_PAIRS: Max user/assistant message pairs in chat context.
  • TERMINAL_CASE_STATUSES: Case statuses that indicate a terminal state.
  • SSE_POLL_INTERVAL_SECONDS: Sleep interval between SSE poll iterations.
  • SSE_INITIAL_IDLE_GRACE_SECONDS: Grace period before idle SSE termination.
  • CASE_TTL_SECONDS: Max age for in-memory case state before eviction.
  • CASE_STATES: In-memory dict mapping case IDs to state dicts.
  • PARSE_PROGRESS: In-memory dict mapping case IDs to parse progress.
  • ANALYSIS_PROGRESS: In-memory dict mapping case IDs to analysis progress.
  • CHAT_PROGRESS: In-memory dict mapping case IDs to chat progress.
  • STATE_LOCK: Reentrant threading lock protecting all state dicts.
  1"""In-memory state management, progress tracking, and SSE streaming for AIFT routes.
  2
  3This module centralises all shared mutable state (case dictionaries, progress
  4stores, threading lock), the SSE event streaming machinery, response helpers,
  5and configuration-change auditing used across the route layer.
  6
  7Attributes:
  8    LOGGER: Module-level logger instance.
  9    PROJECT_ROOT: Absolute ``Path`` to the AIFT project root directory.
 10    CASES_ROOT: Absolute ``Path`` to the ``cases/`` directory.
 11    IMAGES_ROOT: Absolute ``Path`` to the ``images/`` directory.
 12    SENSITIVE_KEYS: Set of lowercase key names whose values must be masked.
 13    MASKED: Placeholder string for masked sensitive values.
 14    SAFE_NAME_RE: Regex matching characters unsafe for filenames.
 15    DISSECT_EVIDENCE_EXTENSIONS: Frozenset of extensions Dissect can open.
 16    MODE_PARSE_AND_AI: Constant for parse-and-analyse mode.
 17    MODE_PARSE_ONLY: Constant for parse-only mode.
 18    CONNECTION_TEST_SYSTEM_PROMPT: System prompt for AI connectivity tests.
 19    CONNECTION_TEST_USER_PROMPT: User prompt for AI connectivity tests.
 20    DEFAULT_FORENSIC_SYSTEM_PROMPT: Fallback forensic AI system prompt.
 21    CHAT_HISTORY_MAX_PAIRS: Max user/assistant message pairs in chat context.
 22    TERMINAL_CASE_STATUSES: Case statuses that indicate a terminal state.
 23    SSE_POLL_INTERVAL_SECONDS: Sleep interval between SSE poll iterations.
 24    SSE_INITIAL_IDLE_GRACE_SECONDS: Grace period before idle SSE termination.
 25    CASE_TTL_SECONDS: Max age for in-memory case state before eviction.
 26    CASE_STATES: In-memory dict mapping case IDs to state dicts.
 27    PARSE_PROGRESS: In-memory dict mapping case IDs to parse progress.
 28    ANALYSIS_PROGRESS: In-memory dict mapping case IDs to analysis progress.
 29    CHAT_PROGRESS: In-memory dict mapping case IDs to chat progress.
 30    STATE_LOCK: Reentrant threading lock protecting all state dicts.
 31"""
 32
 33from __future__ import annotations
 34
 35import copy
 36from datetime import datetime, timezone
 37import json
 38import logging
 39from pathlib import Path
 40import re
 41import threading
 42import time
 43from typing import Any
 44
 45from flask import Response, jsonify, stream_with_context
 46
 47from ..case_logging import unregister_case_log_handler
 48from ..config import LOGO_FILE_CANDIDATES
 49
 50__all__ = [
 51    "LOGGER",
 52    "PROJECT_ROOT",
 53    "CASES_ROOT",
 54    "IMAGES_ROOT",
 55    "SENSITIVE_KEYS",
 56    "MASKED",
 57    "SAFE_NAME_RE",
 58    "DISSECT_EVIDENCE_EXTENSIONS",
 59    "MODE_PARSE_AND_AI",
 60    "MODE_PARSE_ONLY",
 61    "CONNECTION_TEST_SYSTEM_PROMPT",
 62    "CONNECTION_TEST_USER_PROMPT",
 63    "DEFAULT_FORENSIC_SYSTEM_PROMPT",
 64    "CHAT_HISTORY_MAX_PAIRS",
 65    "TERMINAL_CASE_STATUSES",
 66    "SSE_POLL_INTERVAL_SECONDS",
 67    "SSE_INITIAL_IDLE_GRACE_SECONDS",
 68    "CASE_TTL_SECONDS",
 69    "CASE_STATES",
 70    "PARSE_PROGRESS",
 71    "ANALYSIS_PROGRESS",
 72    "CHAT_PROGRESS",
 73    "STATE_LOCK",
 74    "now_iso",
 75    "error_response",
 76    "success_response",
 77    "safe_name",
 78    "resolve_logo_filename",
 79    "safe_int",
 80    "normalize_case_status",
 81    "new_progress",
 82    "set_progress_status",
 83    "emit_progress",
 84    "stream_sse",
 85    "get_case",
 86    "mark_case_status",
 87    "cancel_progress",
 88    "is_cancelled",
 89    "get_cancel_event",
 90    "cleanup_case_entries",
 91    "cleanup_terminal_cases",
 92    "mask_sensitive",
 93    "deep_merge",
 94    "sanitize_changed_keys",
 95    "audit_config_change",
 96]
 97
 98LOGGER = logging.getLogger(__name__)
 99
100PROJECT_ROOT = Path(__file__).resolve().parents[2]
101CASES_ROOT = PROJECT_ROOT / "cases"
102IMAGES_ROOT = PROJECT_ROOT / "images"
103SENSITIVE_KEYS = {"api_key", "token", "secret", "password"}
104MASKED = "********"
105SAFE_NAME_RE = re.compile(r"[^A-Za-z0-9._-]+")
106
107DISSECT_EVIDENCE_EXTENSIONS = frozenset({
108    ".e01", ".ex01", ".s01", ".l01",
109    ".dd", ".img", ".raw", ".bin", ".iso",
110    ".000", ".001",
111    ".vmdk", ".vhd", ".vhdx", ".vdi", ".qcow2", ".hdd", ".hds",
112    ".vmx", ".vmwarevm", ".vbox", ".vmcx", ".ovf", ".ova", ".pvm", ".pvs", ".utm", ".xva", ".vma",
113    ".vbk",
114    ".asdf", ".asif",
115    ".ad1",
116    ".tar", ".gz", ".tgz",
117    ".zip", ".7z",
118})
119
120MODE_PARSE_AND_AI = "parse_and_ai"
121MODE_PARSE_ONLY = "parse_only"
122CONNECTION_TEST_SYSTEM_PROMPT = "You are a connectivity test assistant. Reply briefly."
123CONNECTION_TEST_USER_PROMPT = "Reply with: Connection OK."
124DEFAULT_FORENSIC_SYSTEM_PROMPT = (
125    "You are a digital forensic analyst. "
126    "Analyze ONLY the data provided to you. "
127    "Do not fabricate evidence. "
128    "Prioritize incident-relevant findings and response actions; use baseline only as supporting context."
129)
130CHAT_HISTORY_MAX_PAIRS = 20
131TERMINAL_CASE_STATUSES = frozenset({"completed", "failed", "error"})
132SSE_POLL_INTERVAL_SECONDS = 0.2
133SSE_INITIAL_IDLE_GRACE_SECONDS = 1.0
134CASE_TTL_SECONDS = 21600
135
136CASE_STATES: dict[str, dict[str, Any]] = {}
137PARSE_PROGRESS: dict[str, dict[str, Any]] = {}
138ANALYSIS_PROGRESS: dict[str, dict[str, Any]] = {}
139CHAT_PROGRESS: dict[str, dict[str, Any]] = {}
140STATE_LOCK = threading.RLock()
141
142
143# ---------------------------------------------------------------------------
144# Simple helpers
145# ---------------------------------------------------------------------------
146
147def now_iso() -> str:
148    """Return the current UTC timestamp as an ISO 8601 string with ``Z`` suffix.
149
150    Returns:
151        A string like ``"2025-01-15T08:30:00Z"``.
152    """
153    return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
154
155
156def error_response(message: str, status: int = 400) -> tuple[Response, int]:
157    """Create a standardised JSON error response tuple.
158
159    Args:
160        message: Human-readable error description.
161        status: HTTP status code. Defaults to 400.
162
163    Returns:
164        A ``(Response, int)`` tuple with ``"success": false``.
165    """
166    return jsonify({"success": False, "error": message}), status
167
168
169def success_response(data: dict[str, Any] | None = None, status: int = 200) -> tuple[Response, int]:
170    """Create a standardised JSON success response tuple.
171
172    Args:
173        data: Optional dict of response data merged with ``"success": true``.
174        status: HTTP status code. Defaults to 200.
175
176    Returns:
177        A ``(Response, int)`` tuple with ``"success": true``.
178    """
179    payload: dict[str, Any] = {"success": True}
180    if data:
181        payload.update(data)
182    return jsonify(payload), status
183
184
185def safe_name(value: str, fallback: str = "item") -> str:
186    """Sanitise a string for safe use as a filesystem or identifier name.
187
188    Args:
189        value: The raw string to sanitise.
190        fallback: Value to return if sanitisation produces an empty string.
191
192    Returns:
193        A sanitised string safe for file paths and identifiers.
194    """
195    cleaned = SAFE_NAME_RE.sub("_", value).strip("_")
196    return cleaned or fallback
197
198
199def resolve_logo_filename() -> str:
200    """Resolve the application logo filename from the images directory.
201
202    Returns:
203        The logo filename, or an empty string if none is found.
204    """
205    if IMAGES_ROOT.is_dir():
206        for filename in LOGO_FILE_CANDIDATES:
207            if (IMAGES_ROOT / filename).is_file():
208                return filename
209        image_files = sorted(
210            path.name
211            for path in IMAGES_ROOT.iterdir()
212            if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"}
213        )
214        if image_files:
215            return image_files[0]
216    return ""
217
218
219def safe_int(value: Any, default: int = 0) -> int:
220    """Safely convert a value to int, returning *default* on failure.
221
222    Args:
223        value: Value to convert.
224        default: Fallback integer. Defaults to 0.
225
226    Returns:
227        The integer representation, or *default*.
228    """
229    try:
230        return int(value)
231    except (TypeError, ValueError):
232        return default
233
234
235def normalize_case_status(value: Any) -> str:
236    """Normalise a case status value to a lowercase, stripped string.
237
238    Args:
239        value: Raw status value.
240
241    Returns:
242        Lowercase, stripped string.
243    """
244    return str(value or "").strip().lower()
245
246
247# ---------------------------------------------------------------------------
248# Progress / SSE helpers
249# ---------------------------------------------------------------------------
250
251def new_progress(status: str = "idle") -> dict[str, Any]:
252    """Create a fresh progress-tracking dictionary for SSE event stores.
253
254    Args:
255        status: Initial status string. Defaults to ``"idle"``.
256
257    Returns:
258        A progress dict with ``status``, ``events``, ``error``, and
259        ``created_at`` keys.
260    """
261    return {
262        "status": status,
263        "events": [],
264        "error": None,
265        "created_at": time.monotonic(),
266        "cancel_event": threading.Event(),
267    }
268
269
270def set_progress_status(
271    store: dict[str, dict[str, Any]],
272    case_id: str,
273    status: str,
274    error: str | None = None,
275) -> None:
276    """Update the status (and optionally error) in a progress store.
277
278    Thread-safe: acquires ``STATE_LOCK``.
279
280    Args:
281        store: One of the progress dicts.
282        case_id: UUID of the case.
283        status: New status string.
284        error: Optional error message.
285    """
286    with STATE_LOCK:
287        state = store.setdefault(case_id, new_progress())
288        state["status"] = status
289        state["error"] = error
290
291
292def cancel_progress(
293    store: dict[str, dict[str, Any]],
294    case_id: str,
295) -> bool:
296    """Mark a running progress entry as cancelled and signal its cancel event.
297
298    Thread-safe: acquires ``STATE_LOCK``.
299
300    Args:
301        store: One of the progress dicts.
302        case_id: UUID of the case.
303
304    Returns:
305        ``True`` if the entry was running and is now cancelled, ``False`` otherwise.
306    """
307    with STATE_LOCK:
308        state = store.get(case_id)
309        if state is None or state.get("status") != "running":
310            return False
311        state["status"] = "cancelled"
312        cancel_event = state.get("cancel_event")
313        if isinstance(cancel_event, threading.Event):
314            cancel_event.set()
315        return True
316
317
318def is_cancelled(
319    store: dict[str, dict[str, Any]],
320    case_id: str,
321) -> bool:
322    """Check whether a progress entry has been cancelled.
323
324    Thread-safe: acquires ``STATE_LOCK``.
325
326    Args:
327        store: One of the progress dicts.
328        case_id: UUID of the case.
329
330    Returns:
331        ``True`` if the entry status is ``"cancelled"``.
332    """
333    with STATE_LOCK:
334        state = store.get(case_id)
335        return state is not None and state.get("status") == "cancelled"
336
337
338def get_cancel_event(
339    store: dict[str, dict[str, Any]],
340    case_id: str,
341) -> threading.Event | None:
342    """Return the cancel event for the current progress entry.
343
344    Thread-safe: acquires ``STATE_LOCK``. The caller should hold a
345    reference to the returned event so that it remains valid even if
346    the progress dict is later replaced by a new run.
347
348    Args:
349        store: One of the progress dicts.
350        case_id: UUID of the case.
351
352    Returns:
353        The ``threading.Event``, or ``None`` if no entry exists.
354    """
355    with STATE_LOCK:
356        state = store.get(case_id)
357        if state is None:
358            return None
359        event = state.get("cancel_event")
360        return event if isinstance(event, threading.Event) else None
361
362
363def emit_progress(
364    store: dict[str, dict[str, Any]],
365    case_id: str,
366    payload: dict[str, Any],
367) -> None:
368    """Append a progress event to a case's SSE event store.
369
370    Thread-safe: acquires ``STATE_LOCK``.
371
372    Args:
373        store: One of the progress dicts.
374        case_id: UUID of the case.
375        payload: Event dict (must include a ``"type"`` key).
376    """
377    event = dict(payload)
378    event.setdefault("timestamp", now_iso())
379    with STATE_LOCK:
380        state = store.setdefault(case_id, new_progress())
381        event["sequence"] = len(state["events"])
382        state["events"].append(event)
383
384
385def _cleanup_progress_store(store: dict[str, dict[str, Any]], case_id: str) -> None:
386    """Mark a finished case's progress entry as drained rather than removing it.
387
388    Terminal entries are retained so that reconnecting SSE clients receive a
389    proper completion signal instead of a misleading "Case not found" error.
390    Actual removal is handled later by ``cleanup_terminal_cases``.
391
392    Args:
393        store: One of the progress dicts.
394        case_id: UUID of the case.
395    """
396    with STATE_LOCK:
397        entry = store.get(case_id)
398        if entry is not None and entry.get("status") in TERMINAL_CASE_STATUSES:
399            entry["_drained"] = True
400
401
402def stream_sse(store: dict[str, dict[str, Any]], case_id: str) -> Response:
403    """Create an SSE streaming ``Response`` that polls a progress event store.
404
405    Args:
406        store: One of the progress dicts.
407        case_id: UUID of the case.
408
409    Returns:
410        A Flask ``Response`` with ``text/event-stream`` MIME type.
411    """
412    @stream_with_context
413    def stream() -> Any:
414        """Generate SSE data frames by polling the progress event store."""
415        last = 0
416        initial_idle_deadline = time.monotonic() + SSE_INITIAL_IDLE_GRACE_SECONDS
417        try:
418            while True:
419                with STATE_LOCK:
420                    state = store.get(case_id)
421                    if state is None:
422                        # Progress entry absent — check whether the case
423                        # itself still exists.  If it does, the progress was
424                        # already drained/cleaned; tell the client the
425                        # operation finished rather than emitting a
426                        # misleading "Case not found" error.
427                        case_exists = case_id in CASE_STATES
428                        if case_exists:
429                            synthetic = {"type": "complete", "message": "Already completed."}
430                        else:
431                            synthetic = {"type": "error", "message": "Case not found."}
432                        yield f"data: {json.dumps(synthetic, separators=(',', ':'))}\n\n"
433                        break
434
435                    status = str(state.get("status", "idle"))
436                    events = list(state.get("events", []))
437                    pending: list[dict[str, Any]] = events[last:]
438                    last = len(events)
439
440                if not pending and status == "idle":
441                    if time.monotonic() < initial_idle_deadline:
442                        yield ": keep-alive\n\n"
443                        time.sleep(SSE_POLL_INTERVAL_SECONDS)
444                        continue
445                    idle = {"type": "idle", "status": "idle"}
446                    yield f"data: {json.dumps(idle, separators=(',', ':'))}\n\n"
447                    break
448
449                for event in pending:
450                    yield f"data: {json.dumps(event, separators=(',', ':'))}\n\n"
451
452                if status in TERMINAL_CASE_STATUSES and not pending:
453                    break
454
455                if not pending:
456                    yield ": keep-alive\n\n"
457                time.sleep(SSE_POLL_INTERVAL_SECONDS)
458        finally:
459            _cleanup_progress_store(store, case_id)
460
461    return Response(
462        stream(),
463        mimetype="text/event-stream",
464        headers={
465            "Cache-Control": "no-cache",
466            "X-Accel-Buffering": "no",
467            "Connection": "keep-alive",
468        },
469    )
470
471
472# ---------------------------------------------------------------------------
473# Case state helpers
474# ---------------------------------------------------------------------------
475
476def get_case(case_id: str) -> dict[str, Any] | None:
477    """Retrieve the in-memory state dictionary for a case.
478
479    Thread-safe: acquires ``STATE_LOCK``.
480
481    Args:
482        case_id: UUID of the case.
483
484    Returns:
485        The case state dictionary, or ``None``.
486    """
487    with STATE_LOCK:
488        return CASE_STATES.get(case_id)
489
490
491def mark_case_status(case_id: str, status: str) -> None:
492    """Update the in-memory status of a case. No-op if case missing.
493
494    When transitioning to a terminal status, records the monotonic timestamp
495    in ``_terminal_since`` so cleanup can apply a TTL grace period.
496
497    Args:
498        case_id: UUID of the case.
499        status: New status string.
500    """
501    normalized = normalize_case_status(status)
502    with STATE_LOCK:
503        case = CASE_STATES.get(case_id)
504        if case is not None:
505            case["status"] = normalized
506            if normalized in TERMINAL_CASE_STATUSES and "_terminal_since" not in case:
507                case["_terminal_since"] = time.monotonic()
508
509
510def cleanup_case_entries(case_id: str) -> None:
511    """Remove all in-memory state entries for a case.
512
513    Args:
514        case_id: UUID of the case.
515    """
516    with STATE_LOCK:
517        CASE_STATES.pop(case_id, None)
518        PARSE_PROGRESS.pop(case_id, None)
519        ANALYSIS_PROGRESS.pop(case_id, None)
520        CHAT_PROGRESS.pop(case_id, None)
521    unregister_case_log_handler(case_id)
522
523
524def _is_case_expired(case_id: str, now: float) -> bool:
525    """Check whether a case's progress entries have exceeded the TTL.
526
527    Must be called while holding ``STATE_LOCK``.
528
529    Args:
530        case_id: UUID of the case.
531        now: Current monotonic timestamp.
532
533    Returns:
534        ``True`` if the case has exceeded the TTL.
535    """
536    latest_created = 0.0
537    for store in (PARSE_PROGRESS, ANALYSIS_PROGRESS, CHAT_PROGRESS):
538        entry = store.get(case_id)
539        if entry is not None:
540            latest_created = max(latest_created, entry.get("created_at", 0.0))
541    if latest_created == 0.0:
542        return False
543    return (now - latest_created) > CASE_TTL_SECONDS
544
545
546def _evict_orphaned_progress(now: float) -> None:
547    """Remove progress entries with no corresponding CASE_STATES entry.
548
549    Must be called while holding ``STATE_LOCK``.
550
551    Args:
552        now: Current monotonic timestamp.
553    """
554    for store in (PARSE_PROGRESS, ANALYSIS_PROGRESS, CHAT_PROGRESS):
555        orphan_ids = [
556            cid for cid in store
557            if cid not in CASE_STATES
558            and (now - store[cid].get("created_at", 0.0)) > CASE_TTL_SECONDS
559        ]
560        for cid in orphan_ids:
561            store.pop(cid, None)
562
563
564def cleanup_terminal_cases(exclude_case_id: str | None = None) -> None:
565    """Remove in-memory state for TTL-expired cases.
566
567    Terminal cases (completed, failed, error) are only evicted once their
568    ``_terminal_since`` timestamp exceeds ``CASE_TTL_SECONDS``, so that
569    post-analysis actions (chat, report, download) continue to work.
570    Non-terminal cases are evicted if their progress entries exceed the TTL.
571
572    Only in-memory state is removed; case data on disk is never deleted.
573
574    Args:
575        exclude_case_id: Optional case ID to exempt from cleanup.
576    """
577    now = time.monotonic()
578    with STATE_LOCK:
579        evict_case_ids = []
580        for case_id, case in CASE_STATES.items():
581            if case_id == exclude_case_id:
582                continue
583            is_terminal = normalize_case_status(case.get("status")) in TERMINAL_CASE_STATUSES
584            if is_terminal:
585                terminal_since = case.get("_terminal_since", 0.0)
586                if terminal_since and (now - terminal_since) > CASE_TTL_SECONDS:
587                    evict_case_ids.append(case_id)
588            elif _is_case_expired(case_id, now):
589                evict_case_ids.append(case_id)
590        for case_id in evict_case_ids:
591            CASE_STATES.pop(case_id, None)
592            PARSE_PROGRESS.pop(case_id, None)
593            ANALYSIS_PROGRESS.pop(case_id, None)
594            CHAT_PROGRESS.pop(case_id, None)
595        _evict_orphaned_progress(now)
596    for case_id in evict_case_ids:
597        unregister_case_log_handler(case_id)
598
599
600# ---------------------------------------------------------------------------
601# Config / sensitive-data helpers
602# ---------------------------------------------------------------------------
603
604def mask_sensitive(data: Any) -> Any:
605    """Recursively mask sensitive values in a data structure.
606
607    Args:
608        data: Input data structure (dict, list, or scalar).
609
610    Returns:
611        A new structure with sensitive values replaced by ``MASKED``.
612    """
613    if isinstance(data, dict):
614        masked: dict[str, Any] = {}
615        for key, value in data.items():
616            if key.lower() in SENSITIVE_KEYS:
617                masked[key] = MASKED if str(value).strip() else ""
618            else:
619                masked[key] = mask_sensitive(value)
620        return masked
621    if isinstance(data, list):
622        return [mask_sensitive(item) for item in data]
623    return data
624
625
626def deep_merge(current: dict[str, Any], updates: dict[str, Any], prefix: str = "") -> list[str]:
627    """Recursively merge *updates* into *current*, tracking changed keys.
628
629    Sensitive keys whose value equals ``MASKED`` are skipped.
630
631    Args:
632        current: Target dictionary (mutated in place).
633        updates: Source dictionary with new values.
634        prefix: Dot-separated key prefix for recursive tracking.
635
636    Returns:
637        List of dot-separated key paths that were changed.
638    """
639    changed: list[str] = []
640    for key, value in updates.items():
641        if not isinstance(key, str):
642            continue
643        full_key = f"{prefix}{key}"
644        if key in current and isinstance(current[key], dict) and isinstance(value, dict):
645            changed.extend(deep_merge(current[key], value, f"{full_key}."))
646            continue
647        if key.lower() in SENSITIVE_KEYS and isinstance(value, str) and value == MASKED:
648            continue
649        if current.get(key) != value:
650            current[key] = copy.deepcopy(value)
651            changed.append(full_key)
652    return changed
653
654
655def _is_sensitive_path(path: str) -> bool:
656    """Check whether a dot-separated key path contains a sensitive segment.
657
658    Args:
659        path: Dot-separated key path.
660
661    Returns:
662        ``True`` if any segment matches a key in ``SENSITIVE_KEYS``.
663    """
664    return any(segment.strip().lower() in SENSITIVE_KEYS for segment in path.split("."))
665
666
667def sanitize_changed_keys(changed_keys: list[str]) -> list[str]:
668    """Sanitise changed config key paths for audit logging.
669
670    Args:
671        changed_keys: Raw list of dot-separated key paths.
672
673    Returns:
674        Deduplicated, sanitised list with sensitive paths redacted.
675    """
676    sanitized: list[str] = []
677    for key in changed_keys:
678        if not isinstance(key, str):
679            continue
680        normalized = key.strip()
681        if not normalized:
682            continue
683        if _is_sensitive_path(normalized):
684            normalized = f"{normalized} (redacted)"
685        if normalized not in sanitized:
686            sanitized.append(normalized)
687    return sanitized
688
689
690def audit_config_change(changed_keys: list[str]) -> None:
691    """Write a ``config_changed`` audit entry to all active cases.
692
693    Args:
694        changed_keys: List of changed config key paths.
695    """
696    sanitized_keys = sanitize_changed_keys(changed_keys)
697    if not sanitized_keys:
698        return
699
700    with STATE_LOCK:
701        audit_loggers = [
702            case.get("audit")
703            for case in CASE_STATES.values()
704            if isinstance(case, dict) and case.get("audit") is not None
705        ]
706
707    details = {
708        "changed_keys": sanitized_keys,
709        "changed_count": len(sanitized_keys),
710    }
711    for audit_logger in audit_loggers:
712        try:
713            audit_logger.log("config_changed", details)
714        except Exception:
715            LOGGER.exception("Failed to write config_changed audit entry.")
LOGGER = <TraceLogger app.routes.state (WARNING)>
PROJECT_ROOT = PosixPath('/home/runner/work/AIFT/AIFT')
CASES_ROOT = PosixPath('/home/runner/work/AIFT/AIFT/cases')
IMAGES_ROOT = PosixPath('/home/runner/work/AIFT/AIFT/images')
SENSITIVE_KEYS = {'token', 'password', 'api_key', 'secret'}
MASKED = '********'
SAFE_NAME_RE = re.compile('[^A-Za-z0-9._-]+')
DISSECT_EVIDENCE_EXTENSIONS = frozenset({'.vmdk', '.ovf', '.ova', '.hdd', '.bin', '.pvm', '.vbox', '.s01', '.img', '.tar', '.vbk', '.vma', '.ex01', '.vdi', '.000', '.l01', '.vmwarevm', '.e01', '.ad1', '.tgz', '.asif', '.qcow2', '.001', '.utm', '.asdf', '.vhdx', '.hds', '.pvs', '.raw', '.xva', '.7z', '.zip', '.iso', '.vmx', '.vhd', '.vmcx', '.gz', '.dd'})
MODE_PARSE_AND_AI = 'parse_and_ai'
MODE_PARSE_ONLY = 'parse_only'
CONNECTION_TEST_SYSTEM_PROMPT = 'You are a connectivity test assistant. Reply briefly.'
CONNECTION_TEST_USER_PROMPT = 'Reply with: Connection OK.'
DEFAULT_FORENSIC_SYSTEM_PROMPT = 'You are a digital forensic analyst. Analyze ONLY the data provided to you. Do not fabricate evidence. Prioritize incident-relevant findings and response actions; use baseline only as supporting context.'
CHAT_HISTORY_MAX_PAIRS = 20
TERMINAL_CASE_STATUSES = frozenset({'error', 'completed', 'failed'})
SSE_POLL_INTERVAL_SECONDS = 0.2
SSE_INITIAL_IDLE_GRACE_SECONDS = 1.0
CASE_TTL_SECONDS = 21600
CASE_STATES: dict[str, dict[str, typing.Any]] = {}
PARSE_PROGRESS: dict[str, dict[str, typing.Any]] = {}
ANALYSIS_PROGRESS: dict[str, dict[str, typing.Any]] = {}
CHAT_PROGRESS: dict[str, dict[str, typing.Any]] = {}
STATE_LOCK = <unlocked _thread.RLock object owner=0 count=0>
def now_iso() -> str:
148def now_iso() -> str:
149    """Return the current UTC timestamp as an ISO 8601 string with ``Z`` suffix.
150
151    Returns:
152        A string like ``"2025-01-15T08:30:00Z"``.
153    """
154    return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")

Return the current UTC timestamp as an ISO 8601 string with Z suffix.

Returns:

A string like "2025-01-15T08:30:00Z".

def error_response(message: str, status: int = 400) -> tuple[flask.wrappers.Response, int]:
157def error_response(message: str, status: int = 400) -> tuple[Response, int]:
158    """Create a standardised JSON error response tuple.
159
160    Args:
161        message: Human-readable error description.
162        status: HTTP status code. Defaults to 400.
163
164    Returns:
165        A ``(Response, int)`` tuple with ``"success": false``.
166    """
167    return jsonify({"success": False, "error": message}), status

Create a standardised JSON error response tuple.

Arguments:
  • message: Human-readable error description.
  • status: HTTP status code. Defaults to 400.
Returns:

A (Response, int) tuple with "success": false.

def success_response( data: dict[str, typing.Any] | None = None, status: int = 200) -> tuple[flask.wrappers.Response, int]:
170def success_response(data: dict[str, Any] | None = None, status: int = 200) -> tuple[Response, int]:
171    """Create a standardised JSON success response tuple.
172
173    Args:
174        data: Optional dict of response data merged with ``"success": true``.
175        status: HTTP status code. Defaults to 200.
176
177    Returns:
178        A ``(Response, int)`` tuple with ``"success": true``.
179    """
180    payload: dict[str, Any] = {"success": True}
181    if data:
182        payload.update(data)
183    return jsonify(payload), status

Create a standardised JSON success response tuple.

Arguments:
  • data: Optional dict of response data merged with "success": true.
  • status: HTTP status code. Defaults to 200.
Returns:

A (Response, int) tuple with "success": true.

def safe_name(value: str, fallback: str = 'item') -> str:
186def safe_name(value: str, fallback: str = "item") -> str:
187    """Sanitise a string for safe use as a filesystem or identifier name.
188
189    Args:
190        value: The raw string to sanitise.
191        fallback: Value to return if sanitisation produces an empty string.
192
193    Returns:
194        A sanitised string safe for file paths and identifiers.
195    """
196    cleaned = SAFE_NAME_RE.sub("_", value).strip("_")
197    return cleaned or fallback

Sanitise a string for safe use as a filesystem or identifier name.

Arguments:
  • value: The raw string to sanitise.
  • fallback: Value to return if sanitisation produces an empty string.
Returns:

A sanitised string safe for file paths and identifiers.

def resolve_logo_filename() -> str:
200def resolve_logo_filename() -> str:
201    """Resolve the application logo filename from the images directory.
202
203    Returns:
204        The logo filename, or an empty string if none is found.
205    """
206    if IMAGES_ROOT.is_dir():
207        for filename in LOGO_FILE_CANDIDATES:
208            if (IMAGES_ROOT / filename).is_file():
209                return filename
210        image_files = sorted(
211            path.name
212            for path in IMAGES_ROOT.iterdir()
213            if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"}
214        )
215        if image_files:
216            return image_files[0]
217    return ""

Resolve the application logo filename from the images directory.

Returns:

The logo filename, or an empty string if none is found.

def safe_int(value: Any, default: int = 0) -> int:
220def safe_int(value: Any, default: int = 0) -> int:
221    """Safely convert a value to int, returning *default* on failure.
222
223    Args:
224        value: Value to convert.
225        default: Fallback integer. Defaults to 0.
226
227    Returns:
228        The integer representation, or *default*.
229    """
230    try:
231        return int(value)
232    except (TypeError, ValueError):
233        return default

Safely convert a value to int, returning default on failure.

Arguments:
  • value: Value to convert.
  • default: Fallback integer. Defaults to 0.
Returns:

The integer representation, or default.

def normalize_case_status(value: Any) -> str:
236def normalize_case_status(value: Any) -> str:
237    """Normalise a case status value to a lowercase, stripped string.
238
239    Args:
240        value: Raw status value.
241
242    Returns:
243        Lowercase, stripped string.
244    """
245    return str(value or "").strip().lower()

Normalise a case status value to a lowercase, stripped string.

Arguments:
  • value: Raw status value.
Returns:

Lowercase, stripped string.

def new_progress(status: str = 'idle') -> dict[str, typing.Any]:
252def new_progress(status: str = "idle") -> dict[str, Any]:
253    """Create a fresh progress-tracking dictionary for SSE event stores.
254
255    Args:
256        status: Initial status string. Defaults to ``"idle"``.
257
258    Returns:
259        A progress dict with ``status``, ``events``, ``error``, and
260        ``created_at`` keys.
261    """
262    return {
263        "status": status,
264        "events": [],
265        "error": None,
266        "created_at": time.monotonic(),
267        "cancel_event": threading.Event(),
268    }

Create a fresh progress-tracking dictionary for SSE event stores.

Arguments:
  • status: Initial status string. Defaults to "idle".
Returns:

A progress dict with status, events, error, and created_at keys.

def set_progress_status( store: dict[str, dict[str, typing.Any]], case_id: str, status: str, error: str | None = None) -> None:
271def set_progress_status(
272    store: dict[str, dict[str, Any]],
273    case_id: str,
274    status: str,
275    error: str | None = None,
276) -> None:
277    """Update the status (and optionally error) in a progress store.
278
279    Thread-safe: acquires ``STATE_LOCK``.
280
281    Args:
282        store: One of the progress dicts.
283        case_id: UUID of the case.
284        status: New status string.
285        error: Optional error message.
286    """
287    with STATE_LOCK:
288        state = store.setdefault(case_id, new_progress())
289        state["status"] = status
290        state["error"] = error

Update the status (and optionally error) in a progress store.

Thread-safe: acquires STATE_LOCK.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
  • status: New status string.
  • error: Optional error message.
def emit_progress( store: dict[str, dict[str, typing.Any]], case_id: str, payload: dict[str, typing.Any]) -> None:
364def emit_progress(
365    store: dict[str, dict[str, Any]],
366    case_id: str,
367    payload: dict[str, Any],
368) -> None:
369    """Append a progress event to a case's SSE event store.
370
371    Thread-safe: acquires ``STATE_LOCK``.
372
373    Args:
374        store: One of the progress dicts.
375        case_id: UUID of the case.
376        payload: Event dict (must include a ``"type"`` key).
377    """
378    event = dict(payload)
379    event.setdefault("timestamp", now_iso())
380    with STATE_LOCK:
381        state = store.setdefault(case_id, new_progress())
382        event["sequence"] = len(state["events"])
383        state["events"].append(event)

Append a progress event to a case's SSE event store.

Thread-safe: acquires STATE_LOCK.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
  • payload: Event dict (must include a "type" key).
def stream_sse( store: dict[str, dict[str, typing.Any]], case_id: str) -> flask.wrappers.Response:
403def stream_sse(store: dict[str, dict[str, Any]], case_id: str) -> Response:
404    """Create an SSE streaming ``Response`` that polls a progress event store.
405
406    Args:
407        store: One of the progress dicts.
408        case_id: UUID of the case.
409
410    Returns:
411        A Flask ``Response`` with ``text/event-stream`` MIME type.
412    """
413    @stream_with_context
414    def stream() -> Any:
415        """Generate SSE data frames by polling the progress event store."""
416        last = 0
417        initial_idle_deadline = time.monotonic() + SSE_INITIAL_IDLE_GRACE_SECONDS
418        try:
419            while True:
420                with STATE_LOCK:
421                    state = store.get(case_id)
422                    if state is None:
423                        # Progress entry absent — check whether the case
424                        # itself still exists.  If it does, the progress was
425                        # already drained/cleaned; tell the client the
426                        # operation finished rather than emitting a
427                        # misleading "Case not found" error.
428                        case_exists = case_id in CASE_STATES
429                        if case_exists:
430                            synthetic = {"type": "complete", "message": "Already completed."}
431                        else:
432                            synthetic = {"type": "error", "message": "Case not found."}
433                        yield f"data: {json.dumps(synthetic, separators=(',', ':'))}\n\n"
434                        break
435
436                    status = str(state.get("status", "idle"))
437                    events = list(state.get("events", []))
438                    pending: list[dict[str, Any]] = events[last:]
439                    last = len(events)
440
441                if not pending and status == "idle":
442                    if time.monotonic() < initial_idle_deadline:
443                        yield ": keep-alive\n\n"
444                        time.sleep(SSE_POLL_INTERVAL_SECONDS)
445                        continue
446                    idle = {"type": "idle", "status": "idle"}
447                    yield f"data: {json.dumps(idle, separators=(',', ':'))}\n\n"
448                    break
449
450                for event in pending:
451                    yield f"data: {json.dumps(event, separators=(',', ':'))}\n\n"
452
453                if status in TERMINAL_CASE_STATUSES and not pending:
454                    break
455
456                if not pending:
457                    yield ": keep-alive\n\n"
458                time.sleep(SSE_POLL_INTERVAL_SECONDS)
459        finally:
460            _cleanup_progress_store(store, case_id)
461
462    return Response(
463        stream(),
464        mimetype="text/event-stream",
465        headers={
466            "Cache-Control": "no-cache",
467            "X-Accel-Buffering": "no",
468            "Connection": "keep-alive",
469        },
470    )

Create an SSE streaming Response that polls a progress event store.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
Returns:

A Flask Response with text/event-stream MIME type.

def get_case(case_id: str) -> dict[str, typing.Any] | None:
477def get_case(case_id: str) -> dict[str, Any] | None:
478    """Retrieve the in-memory state dictionary for a case.
479
480    Thread-safe: acquires ``STATE_LOCK``.
481
482    Args:
483        case_id: UUID of the case.
484
485    Returns:
486        The case state dictionary, or ``None``.
487    """
488    with STATE_LOCK:
489        return CASE_STATES.get(case_id)

Retrieve the in-memory state dictionary for a case.

Thread-safe: acquires STATE_LOCK.

Arguments:
  • case_id: UUID of the case.
Returns:

The case state dictionary, or None.

def mark_case_status(case_id: str, status: str) -> None:
492def mark_case_status(case_id: str, status: str) -> None:
493    """Update the in-memory status of a case. No-op if case missing.
494
495    When transitioning to a terminal status, records the monotonic timestamp
496    in ``_terminal_since`` so cleanup can apply a TTL grace period.
497
498    Args:
499        case_id: UUID of the case.
500        status: New status string.
501    """
502    normalized = normalize_case_status(status)
503    with STATE_LOCK:
504        case = CASE_STATES.get(case_id)
505        if case is not None:
506            case["status"] = normalized
507            if normalized in TERMINAL_CASE_STATUSES and "_terminal_since" not in case:
508                case["_terminal_since"] = time.monotonic()

Update the in-memory status of a case. No-op if case missing.

When transitioning to a terminal status, records the monotonic timestamp in _terminal_since so cleanup can apply a TTL grace period.

Arguments:
  • case_id: UUID of the case.
  • status: New status string.
def cancel_progress(store: dict[str, dict[str, typing.Any]], case_id: str) -> bool:
293def cancel_progress(
294    store: dict[str, dict[str, Any]],
295    case_id: str,
296) -> bool:
297    """Mark a running progress entry as cancelled and signal its cancel event.
298
299    Thread-safe: acquires ``STATE_LOCK``.
300
301    Args:
302        store: One of the progress dicts.
303        case_id: UUID of the case.
304
305    Returns:
306        ``True`` if the entry was running and is now cancelled, ``False`` otherwise.
307    """
308    with STATE_LOCK:
309        state = store.get(case_id)
310        if state is None or state.get("status") != "running":
311            return False
312        state["status"] = "cancelled"
313        cancel_event = state.get("cancel_event")
314        if isinstance(cancel_event, threading.Event):
315            cancel_event.set()
316        return True

Mark a running progress entry as cancelled and signal its cancel event.

Thread-safe: acquires STATE_LOCK.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
Returns:

True if the entry was running and is now cancelled, False otherwise.

def is_cancelled(store: dict[str, dict[str, typing.Any]], case_id: str) -> bool:
319def is_cancelled(
320    store: dict[str, dict[str, Any]],
321    case_id: str,
322) -> bool:
323    """Check whether a progress entry has been cancelled.
324
325    Thread-safe: acquires ``STATE_LOCK``.
326
327    Args:
328        store: One of the progress dicts.
329        case_id: UUID of the case.
330
331    Returns:
332        ``True`` if the entry status is ``"cancelled"``.
333    """
334    with STATE_LOCK:
335        state = store.get(case_id)
336        return state is not None and state.get("status") == "cancelled"

Check whether a progress entry has been cancelled.

Thread-safe: acquires STATE_LOCK.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
Returns:

True if the entry status is "cancelled".

def get_cancel_event( store: dict[str, dict[str, typing.Any]], case_id: str) -> threading.Event | None:
339def get_cancel_event(
340    store: dict[str, dict[str, Any]],
341    case_id: str,
342) -> threading.Event | None:
343    """Return the cancel event for the current progress entry.
344
345    Thread-safe: acquires ``STATE_LOCK``. The caller should hold a
346    reference to the returned event so that it remains valid even if
347    the progress dict is later replaced by a new run.
348
349    Args:
350        store: One of the progress dicts.
351        case_id: UUID of the case.
352
353    Returns:
354        The ``threading.Event``, or ``None`` if no entry exists.
355    """
356    with STATE_LOCK:
357        state = store.get(case_id)
358        if state is None:
359            return None
360        event = state.get("cancel_event")
361        return event if isinstance(event, threading.Event) else None

Return the cancel event for the current progress entry.

Thread-safe: acquires STATE_LOCK. The caller should hold a reference to the returned event so that it remains valid even if the progress dict is later replaced by a new run.

Arguments:
  • store: One of the progress dicts.
  • case_id: UUID of the case.
Returns:

The threading.Event, or None if no entry exists.

def cleanup_case_entries(case_id: str) -> None:
511def cleanup_case_entries(case_id: str) -> None:
512    """Remove all in-memory state entries for a case.
513
514    Args:
515        case_id: UUID of the case.
516    """
517    with STATE_LOCK:
518        CASE_STATES.pop(case_id, None)
519        PARSE_PROGRESS.pop(case_id, None)
520        ANALYSIS_PROGRESS.pop(case_id, None)
521        CHAT_PROGRESS.pop(case_id, None)
522    unregister_case_log_handler(case_id)

Remove all in-memory state entries for a case.

Arguments:
  • case_id: UUID of the case.
def cleanup_terminal_cases(exclude_case_id: str | None = None) -> None:
565def cleanup_terminal_cases(exclude_case_id: str | None = None) -> None:
566    """Remove in-memory state for TTL-expired cases.
567
568    Terminal cases (completed, failed, error) are only evicted once their
569    ``_terminal_since`` timestamp exceeds ``CASE_TTL_SECONDS``, so that
570    post-analysis actions (chat, report, download) continue to work.
571    Non-terminal cases are evicted if their progress entries exceed the TTL.
572
573    Only in-memory state is removed; case data on disk is never deleted.
574
575    Args:
576        exclude_case_id: Optional case ID to exempt from cleanup.
577    """
578    now = time.monotonic()
579    with STATE_LOCK:
580        evict_case_ids = []
581        for case_id, case in CASE_STATES.items():
582            if case_id == exclude_case_id:
583                continue
584            is_terminal = normalize_case_status(case.get("status")) in TERMINAL_CASE_STATUSES
585            if is_terminal:
586                terminal_since = case.get("_terminal_since", 0.0)
587                if terminal_since and (now - terminal_since) > CASE_TTL_SECONDS:
588                    evict_case_ids.append(case_id)
589            elif _is_case_expired(case_id, now):
590                evict_case_ids.append(case_id)
591        for case_id in evict_case_ids:
592            CASE_STATES.pop(case_id, None)
593            PARSE_PROGRESS.pop(case_id, None)
594            ANALYSIS_PROGRESS.pop(case_id, None)
595            CHAT_PROGRESS.pop(case_id, None)
596        _evict_orphaned_progress(now)
597    for case_id in evict_case_ids:
598        unregister_case_log_handler(case_id)

Remove in-memory state for TTL-expired cases.

Terminal cases (completed, failed, error) are only evicted once their _terminal_since timestamp exceeds CASE_TTL_SECONDS, so that post-analysis actions (chat, report, download) continue to work. Non-terminal cases are evicted if their progress entries exceed the TTL.

Only in-memory state is removed; case data on disk is never deleted.

Arguments:
  • exclude_case_id: Optional case ID to exempt from cleanup.
def mask_sensitive(data: Any) -> Any:
605def mask_sensitive(data: Any) -> Any:
606    """Recursively mask sensitive values in a data structure.
607
608    Args:
609        data: Input data structure (dict, list, or scalar).
610
611    Returns:
612        A new structure with sensitive values replaced by ``MASKED``.
613    """
614    if isinstance(data, dict):
615        masked: dict[str, Any] = {}
616        for key, value in data.items():
617            if key.lower() in SENSITIVE_KEYS:
618                masked[key] = MASKED if str(value).strip() else ""
619            else:
620                masked[key] = mask_sensitive(value)
621        return masked
622    if isinstance(data, list):
623        return [mask_sensitive(item) for item in data]
624    return data

Recursively mask sensitive values in a data structure.

Arguments:
  • data: Input data structure (dict, list, or scalar).
Returns:

A new structure with sensitive values replaced by MASKED.

def deep_merge( current: dict[str, typing.Any], updates: dict[str, typing.Any], prefix: str = '') -> list[str]:
627def deep_merge(current: dict[str, Any], updates: dict[str, Any], prefix: str = "") -> list[str]:
628    """Recursively merge *updates* into *current*, tracking changed keys.
629
630    Sensitive keys whose value equals ``MASKED`` are skipped.
631
632    Args:
633        current: Target dictionary (mutated in place).
634        updates: Source dictionary with new values.
635        prefix: Dot-separated key prefix for recursive tracking.
636
637    Returns:
638        List of dot-separated key paths that were changed.
639    """
640    changed: list[str] = []
641    for key, value in updates.items():
642        if not isinstance(key, str):
643            continue
644        full_key = f"{prefix}{key}"
645        if key in current and isinstance(current[key], dict) and isinstance(value, dict):
646            changed.extend(deep_merge(current[key], value, f"{full_key}."))
647            continue
648        if key.lower() in SENSITIVE_KEYS and isinstance(value, str) and value == MASKED:
649            continue
650        if current.get(key) != value:
651            current[key] = copy.deepcopy(value)
652            changed.append(full_key)
653    return changed

Recursively merge updates into current, tracking changed keys.

Sensitive keys whose value equals MASKED are skipped.

Arguments:
  • current: Target dictionary (mutated in place).
  • updates: Source dictionary with new values.
  • prefix: Dot-separated key prefix for recursive tracking.
Returns:

List of dot-separated key paths that were changed.

def sanitize_changed_keys(changed_keys: list[str]) -> list[str]:
668def sanitize_changed_keys(changed_keys: list[str]) -> list[str]:
669    """Sanitise changed config key paths for audit logging.
670
671    Args:
672        changed_keys: Raw list of dot-separated key paths.
673
674    Returns:
675        Deduplicated, sanitised list with sensitive paths redacted.
676    """
677    sanitized: list[str] = []
678    for key in changed_keys:
679        if not isinstance(key, str):
680            continue
681        normalized = key.strip()
682        if not normalized:
683            continue
684        if _is_sensitive_path(normalized):
685            normalized = f"{normalized} (redacted)"
686        if normalized not in sanitized:
687            sanitized.append(normalized)
688    return sanitized

Sanitise changed config key paths for audit logging.

Arguments:
  • changed_keys: Raw list of dot-separated key paths.
Returns:

Deduplicated, sanitised list with sensitive paths redacted.

def audit_config_change(changed_keys: list[str]) -> None:
691def audit_config_change(changed_keys: list[str]) -> None:
692    """Write a ``config_changed`` audit entry to all active cases.
693
694    Args:
695        changed_keys: List of changed config key paths.
696    """
697    sanitized_keys = sanitize_changed_keys(changed_keys)
698    if not sanitized_keys:
699        return
700
701    with STATE_LOCK:
702        audit_loggers = [
703            case.get("audit")
704            for case in CASE_STATES.values()
705            if isinstance(case, dict) and case.get("audit") is not None
706        ]
707
708    details = {
709        "changed_keys": sanitized_keys,
710        "changed_count": len(sanitized_keys),
711    }
712    for audit_logger in audit_loggers:
713        try:
714            audit_logger.log("config_changed", details)
715        except Exception:
716            LOGGER.exception("Failed to write config_changed audit entry.")

Write a config_changed audit entry to all active cases.

Arguments:
  • changed_keys: List of changed config key paths.