app.routes.artifacts

Artifact option normalisation, profile management, validation, and route handlers.

This module handles:

  • Normalising artifact selection payloads (new artifact_options format and legacy artifacts/ai_artifacts format).
  • Artifact profile CRUD (load, save, list) including the built-in recommended profile.
  • Analysis date-range validation.
  • Parse-progress extraction and prompt sanitisation utilities.
  • Flask route handlers for starting/streaming parse operations and profile CRUD.
Attributes:
  • PROFILE_NAME_RE: Regex for validating artifact profile names.
  • BUILTIN_RECOMMENDED_PROFILE: Name of the built-in recommended profile.
  • PROFILE_DIRNAME: Subdirectory for profile JSON files.
  • PROFILE_FILE_SUFFIX: File extension for profile files.
  • RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS: Artifacts excluded from the recommended profile.
  • artifact_bp: Flask Blueprint for artifact and parse routes.
  1"""Artifact option normalisation, profile management, validation, and route handlers.
  2
  3This module handles:
  4
  5* Normalising artifact selection payloads (new ``artifact_options`` format
  6  and legacy ``artifacts``/``ai_artifacts`` format).
  7* Artifact profile CRUD (load, save, list) including the built-in
  8  ``recommended`` profile.
  9* Analysis date-range validation.
 10* Parse-progress extraction and prompt sanitisation utilities.
 11* Flask route handlers for starting/streaming parse operations and profile CRUD.
 12
 13Attributes:
 14    PROFILE_NAME_RE: Regex for validating artifact profile names.
 15    BUILTIN_RECOMMENDED_PROFILE: Name of the built-in recommended profile.
 16    PROFILE_DIRNAME: Subdirectory for profile JSON files.
 17    PROFILE_FILE_SUFFIX: File extension for profile files.
 18    RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS: Artifacts excluded from the
 19        recommended profile.
 20    artifact_bp: Flask Blueprint for artifact and parse routes.
 21"""
 22
 23from __future__ import annotations
 24
 25import copy
 26import json
 27import logging
 28import shutil
 29import threading
 30from datetime import datetime
 31from pathlib import Path
 32import re
 33from typing import Any
 34
 35from flask import Blueprint, Response, current_app, jsonify, request
 36
 37from ..parser import LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY
 38from .state import (
 39    MODE_PARSE_AND_AI,
 40    MODE_PARSE_ONLY,
 41    PARSE_PROGRESS,
 42    STATE_LOCK,
 43    cancel_progress,
 44    emit_progress,
 45    error_response,
 46    get_case,
 47    new_progress,
 48    safe_int,
 49    safe_name,
 50    stream_sse,
 51    success_response,
 52)
 53
 54# NOTE: .tasks imports are deferred to avoid circular import
 55# (tasks.py imports from artifacts.py). See _get_task_runners().
 56
 57__all__ = [
 58    "PROFILE_NAME_RE",
 59    "BUILTIN_RECOMMENDED_PROFILE",
 60    "PROFILE_DIRNAME",
 61    "PROFILE_FILE_SUFFIX",
 62    "RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS",
 63    "artifact_bp",
 64    "normalize_artifact_mode",
 65    "normalize_artifact_options",
 66    "artifact_options_to_lists",
 67    "extract_parse_selection_payload",
 68    "validate_analysis_date_range",
 69    "extract_parse_progress",
 70    "sanitize_prompt",
 71    "resolve_profiles_root",
 72    "compose_profile_response",
 73    "load_profiles_from_directory",
 74    "normalize_profile_name",
 75    "profile_path_for_new_name",
 76    "write_profile_file",
 77]
 78
 79LOGGER = logging.getLogger(__name__)
 80
 81PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$")
 82BUILTIN_RECOMMENDED_PROFILE = "recommended"
 83PROFILE_DIRNAME = "profile"
 84PROFILE_FILE_SUFFIX = ".json"
 85RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS = {"mft", "usnjrnl", "evtx", "defender.evtx"}
 86
 87
 88# ---------------------------------------------------------------------------
 89# Artifact option helpers
 90# ---------------------------------------------------------------------------
 91
 92def normalize_artifact_mode(value: Any, default_mode: str = MODE_PARSE_AND_AI) -> str:
 93    """Normalise an artifact processing mode to a valid constant.
 94
 95    Args:
 96        value: Raw mode value.
 97        default_mode: Fallback mode.
 98
 99    Returns:
100        ``MODE_PARSE_AND_AI`` or ``MODE_PARSE_ONLY``.
101    """
102    mode = str(value or "").strip().lower()
103    if mode == MODE_PARSE_ONLY:
104        return MODE_PARSE_ONLY
105    if mode == MODE_PARSE_AND_AI:
106        return MODE_PARSE_AND_AI
107    return default_mode
108
109
110def _normalize_string_list(values: Any) -> list[str]:
111    """Deduplicate and normalise a list of values to non-empty strings.
112
113    Args:
114        values: Input list (or non-list, returns empty).
115
116    Returns:
117        Deduplicated list of non-empty stripped strings.
118    """
119    if not isinstance(values, list):
120        return []
121    normalized: list[str] = []
122    seen: set[str] = set()
123    for value in values:
124        text = str(value).strip()
125        if not text or text in seen:
126            continue
127        seen.add(text)
128        normalized.append(text)
129    return normalized
130
131
132def normalize_artifact_options(payload: Any) -> list[dict[str, str]]:
133    """Normalise a raw artifact options payload into canonical form.
134
135    Accepts lists of strings or dicts with various key names.
136
137    Args:
138        payload: Raw ``artifact_options`` value.
139
140    Returns:
141        List of dicts with ``artifact_key`` and ``mode`` keys.
142
143    Raises:
144        ValueError: If *payload* is not a list.
145    """
146    if not isinstance(payload, list):
147        raise ValueError("`artifact_options` must be a JSON array.")
148
149    normalized: list[dict[str, str]] = []
150    seen: set[str] = set()
151    for item in payload:
152        artifact_key = ""
153        mode = MODE_PARSE_AND_AI
154
155        if isinstance(item, str):
156            artifact_key = item.strip()
157        elif isinstance(item, dict):
158            artifact_key = str(item.get("artifact_key") or item.get("key") or "").strip()
159            if "mode" in item:
160                mode = normalize_artifact_mode(item.get("mode"))
161            elif "ai_enabled" in item:
162                mode = MODE_PARSE_AND_AI if bool(item.get("ai_enabled")) else MODE_PARSE_ONLY
163            else:
164                mode = normalize_artifact_mode(item.get("parse_mode"), default_mode=MODE_PARSE_AND_AI)
165        else:
166            continue
167
168        if not artifact_key or artifact_key in seen:
169            continue
170        seen.add(artifact_key)
171        normalized.append({"artifact_key": artifact_key, "mode": mode})
172
173    return normalized
174
175
176def artifact_options_to_lists(artifact_options: list[dict[str, str]]) -> tuple[list[str], list[str]]:
177    """Split normalised artifact options into parse and analysis lists.
178
179    Args:
180        artifact_options: Canonical artifact option dicts.
181
182    Returns:
183        ``(parse_artifacts, analysis_artifacts)`` tuple.
184    """
185    parse_artifacts: list[str] = []
186    analysis_artifacts: list[str] = []
187    for option in artifact_options:
188        artifact_key = str(option.get("artifact_key", "")).strip()
189        if not artifact_key:
190            continue
191        parse_artifacts.append(artifact_key)
192        if normalize_artifact_mode(option.get("mode")) == MODE_PARSE_AND_AI:
193            analysis_artifacts.append(artifact_key)
194    return parse_artifacts, analysis_artifacts
195
196
197def _build_artifact_options_from_lists(
198    parse_artifacts: list[str],
199    analysis_artifacts: list[str],
200) -> list[dict[str, str]]:
201    """Construct canonical artifact options from separate lists.
202
203    Args:
204        parse_artifacts: All artifact keys to parse.
205        analysis_artifacts: Subset for AI analysis.
206
207    Returns:
208        List of dicts with ``artifact_key`` and ``mode``.
209    """
210    analysis_set = set(analysis_artifacts)
211    return [
212        {
213            "artifact_key": artifact_key,
214            "mode": MODE_PARSE_AND_AI if artifact_key in analysis_set else MODE_PARSE_ONLY,
215        }
216        for artifact_key in parse_artifacts
217    ]
218
219
220def extract_parse_selection_payload(
221    payload: dict[str, Any],
222) -> tuple[list[dict[str, str]], list[str], list[str]]:
223    """Extract and normalise artifact selection from a parse request payload.
224
225    Supports both ``artifact_options`` (new) and ``artifacts``/``ai_artifacts``
226    (legacy) formats.
227
228    Args:
229        payload: Parsed JSON body from the parse-start request.
230
231    Returns:
232        ``(artifact_options, parse_artifacts, analysis_artifacts)`` tuple.
233
234    Raises:
235        ValueError: If the payload contains invalid fields.
236    """
237    if "artifact_options" in payload:
238        artifact_options = normalize_artifact_options(payload.get("artifact_options"))
239        parse_artifacts, analysis_artifacts = artifact_options_to_lists(artifact_options)
240        return artifact_options, parse_artifacts, analysis_artifacts
241
242    artifacts_raw = payload.get("artifacts", [])
243    if not isinstance(artifacts_raw, list):
244        raise ValueError("`artifacts` must be a JSON array.")
245    parse_artifacts = _normalize_string_list(artifacts_raw)
246
247    if "ai_artifacts" in payload:
248        ai_raw = payload.get("ai_artifacts")
249        if not isinstance(ai_raw, list):
250            raise ValueError("`ai_artifacts` must be a JSON array.")
251        selected_set = set(parse_artifacts)
252        analysis_artifacts = [key for key in _normalize_string_list(ai_raw) if key in selected_set]
253    else:
254        analysis_artifacts = list(parse_artifacts)
255
256    artifact_options = _build_artifact_options_from_lists(
257        parse_artifacts=parse_artifacts,
258        analysis_artifacts=analysis_artifacts,
259    )
260    return artifact_options, parse_artifacts, analysis_artifacts
261
262
263def validate_analysis_date_range(payload: Any) -> dict[str, str] | None:
264    """Validate and normalise an optional analysis date range.
265
266    Args:
267        payload: Raw ``analysis_date_range`` value from request JSON.
268
269    Returns:
270        Dict with ``start_date`` and ``end_date``, or ``None``.
271
272    Raises:
273        ValueError: On invalid format or range.
274    """
275    if payload is None:
276        return None
277
278    if not isinstance(payload, dict):
279        raise ValueError("`analysis_date_range` must be an object.")
280
281    start_raw = payload.get("start_date")
282    end_raw = payload.get("end_date")
283    start_text = str(start_raw).strip() if start_raw is not None else ""
284    end_text = str(end_raw).strip() if end_raw is not None else ""
285    if not start_text and not end_text:
286        return None
287    if not start_text or not end_text:
288        raise ValueError(
289            "Provide both `analysis_date_range.start_date` and `analysis_date_range.end_date`."
290        )
291
292    try:
293        start_date = datetime.strptime(start_text, "%Y-%m-%d").date()
294        end_date = datetime.strptime(end_text, "%Y-%m-%d").date()
295    except ValueError as error:
296        raise ValueError("Date range values must use YYYY-MM-DD format.") from error
297
298    if start_date > end_date:
299        raise ValueError(
300            "`analysis_date_range.start_date` must be earlier than or equal to `end_date`."
301        )
302
303    return {
304        "start_date": start_date.isoformat(),
305        "end_date": end_date.isoformat(),
306    }
307
308
309def extract_parse_progress(fallback_artifact: str, args: tuple[Any, ...]) -> tuple[str, int]:
310    """Extract artifact key and record count from a parser progress callback.
311
312    Args:
313        fallback_artifact: Default artifact key.
314        args: Positional arguments from the callback.
315
316    Returns:
317        ``(artifact_key, record_count)`` tuple.
318    """
319    if not args:
320        return fallback_artifact, 0
321    first = args[0]
322    if isinstance(first, dict):
323        return str(first.get("artifact_key", fallback_artifact)), safe_int(first.get("record_count", 0))
324    if len(args) >= 2:
325        return str(args[0] or fallback_artifact), safe_int(args[1], 0)
326    return fallback_artifact, safe_int(first, 0)
327
328
329def sanitize_prompt(prompt: str, max_chars: int = 2000) -> str:
330    """Normalise and truncate a user prompt for audit logging.
331
332    Args:
333        prompt: Raw user prompt text.
334        max_chars: Maximum character length. Defaults to 2000.
335
336    Returns:
337        Normalised (and possibly truncated) prompt string.
338    """
339    normalized = " ".join(prompt.split())
340    if len(normalized) <= max_chars:
341        return normalized
342    return f"{normalized[:max_chars]}... [truncated]"
343
344
345# ---------------------------------------------------------------------------
346# Profile management
347# ---------------------------------------------------------------------------
348
349def _recommended_artifact_options() -> list[dict[str, str]]:
350    """Build artifact options for the built-in 'recommended' profile.
351
352    Includes artifacts from both the Windows and Linux registries so that
353    a single profile works regardless of the evidence OS.  Duplicate keys
354    (e.g. ``services``) are emitted only once.
355
356    Returns:
357        List of artifact option dicts for the recommended profile.
358    """
359    profile: list[dict[str, str]] = []
360    seen: set[str] = set()
361    for registry in (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY):
362        for artifact_key in registry:
363            normalized_key = str(artifact_key).strip().lower()
364            if normalized_key in RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS:
365                continue
366            if normalized_key in seen:
367                continue
368            seen.add(normalized_key)
369            profile.append({"artifact_key": str(artifact_key), "mode": MODE_PARSE_AND_AI})
370    return profile
371
372
373def resolve_profiles_root(config_path: str | Path) -> Path:
374    """Resolve the directory where artifact profiles are stored.
375
376    Args:
377        config_path: Path to the AIFT configuration file.
378
379    Returns:
380        Absolute ``Path`` to the profiles directory.
381    """
382    return Path(config_path).parent / PROFILE_DIRNAME
383
384
385def _recommended_profile_payload() -> dict[str, Any]:
386    """Build the full payload for the built-in recommended profile.
387
388    Returns:
389        Dict with ``name``, ``builtin``, and ``artifact_options``.
390    """
391    return {
392        "name": BUILTIN_RECOMMENDED_PROFILE,
393        "builtin": True,
394        "artifact_options": _recommended_artifact_options(),
395    }
396
397
398def write_profile_file(path: Path, payload: dict[str, Any]) -> None:
399    """Write an artifact profile to a JSON file.
400
401    Args:
402        path: Destination file path.
403        payload: Profile data to serialise.
404    """
405    path.parent.mkdir(parents=True, exist_ok=True)
406    content = json.dumps(payload, indent=2, ensure_ascii=True)
407    path.write_text(f"{content}\n", encoding="utf-8")
408
409
410def _load_profile_file(path: Path) -> dict[str, Any] | None:
411    """Load and validate a single artifact profile from a JSON file.
412
413    Args:
414        path: Path to the profile JSON file.
415
416    Returns:
417        Validated profile dict, or ``None`` if invalid.
418    """
419    try:
420        raw = json.loads(path.read_text(encoding="utf-8"))
421    except (OSError, json.JSONDecodeError):
422        LOGGER.warning("Skipping unreadable profile file: %s", path)
423        return None
424
425    if not isinstance(raw, dict):
426        LOGGER.warning("Skipping invalid profile payload in %s", path)
427        return None
428
429    name = str(raw.get("name", "")).strip() or path.stem
430    if not name:
431        return None
432    if name.lower() != BUILTIN_RECOMMENDED_PROFILE and not PROFILE_NAME_RE.fullmatch(name):
433        LOGGER.warning("Skipping profile with invalid name in %s", path)
434        return None
435
436    options_payload = raw.get("artifact_options")
437    if options_payload is None:
438        options_payload = raw.get("selections", [])
439    try:
440        artifact_options = normalize_artifact_options(options_payload if options_payload is not None else [])
441    except ValueError:
442        LOGGER.warning("Skipping profile with invalid artifact options in %s", path)
443        return None
444
445    builtin = bool(raw.get("builtin", False))
446    if name.lower() == BUILTIN_RECOMMENDED_PROFILE:
447        builtin = True
448        artifact_options = _recommended_artifact_options()
449    elif not artifact_options:
450        LOGGER.warning("Skipping profile with no artifact options in %s", path)
451        return None
452
453    return {
454        "name": name,
455        "builtin": builtin,
456        "artifact_options": artifact_options,
457        "path": path,
458    }
459
460
461def _ensure_recommended_profile(profiles_root: Path) -> None:
462    """Ensure the built-in recommended profile exists on disk.
463
464    Args:
465        profiles_root: Directory for profile files.
466    """
467    recommended_path = profiles_root / f"{BUILTIN_RECOMMENDED_PROFILE}{PROFILE_FILE_SUFFIX}"
468    write_profile_file(recommended_path, _recommended_profile_payload())
469
470
471def load_profiles_from_directory(profiles_root: Path) -> list[dict[str, Any]]:
472    """Load all valid artifact profiles from the profiles directory.
473
474    Args:
475        profiles_root: Directory containing profile JSON files.
476
477    Returns:
478        Sorted list of validated profile dicts.
479    """
480    profiles_root.mkdir(parents=True, exist_ok=True)
481    _ensure_recommended_profile(profiles_root)
482
483    profiles: list[dict[str, Any]] = []
484    seen_names: set[str] = set()
485    for path in sorted(profiles_root.glob(f"*{PROFILE_FILE_SUFFIX}"), key=lambda item: item.name.lower()):
486        profile = _load_profile_file(path)
487        if profile is None:
488            continue
489        profile_key = str(profile.get("name", "")).strip().lower()
490        if not profile_key or profile_key in seen_names:
491            continue
492        seen_names.add(profile_key)
493        profiles.append(profile)
494
495    profiles.sort(
496        key=lambda item: (
497            0 if str(item.get("name", "")).strip().lower() == BUILTIN_RECOMMENDED_PROFILE else 1,
498            str(item.get("name", "")).strip().lower(),
499        )
500    )
501    return profiles
502
503
504def profile_path_for_new_name(profiles_root: Path, profile_name: str) -> Path:
505    """Compute a unique file path for a new artifact profile.
506
507    Args:
508        profiles_root: Directory for profile files.
509        profile_name: Human-readable profile name.
510
511    Returns:
512        A non-existent ``Path`` suitable for writing.
513    """
514    stem = safe_name(profile_name.lower(), fallback="profile")
515    candidate = profiles_root / f"{stem}{PROFILE_FILE_SUFFIX}"
516    if not candidate.exists():
517        return candidate
518
519    index = 1
520    while True:
521        candidate = profiles_root / f"{stem}_{index}{PROFILE_FILE_SUFFIX}"
522        if not candidate.exists():
523            return candidate
524        index += 1
525
526
527def normalize_profile_name(value: Any) -> str:
528    """Validate and normalise a profile name from user input.
529
530    Args:
531        value: Raw profile name.
532
533    Returns:
534        Stripped, validated profile name.
535
536    Raises:
537        ValueError: If the name is empty, reserved, or invalid.
538    """
539    name = str(value or "").strip()
540    if not name:
541        raise ValueError("Profile name is required.")
542    if name.lower() == BUILTIN_RECOMMENDED_PROFILE:
543        raise ValueError("`recommended` is a built-in profile and cannot be overwritten.")
544    if not PROFILE_NAME_RE.fullmatch(name):
545        raise ValueError(
546            "Profile name must be 1-64 chars and use letters, numbers, spaces, period, underscore, or hyphen."
547        )
548    return name
549
550
551def compose_profile_response(profiles_root: Path) -> list[dict[str, Any]]:
552    """Build the API response payload for all artifact profiles.
553
554    Args:
555        profiles_root: Directory containing profile files.
556
557    Returns:
558        List of dicts with ``name``, ``builtin``, and ``artifact_options``.
559    """
560    return [
561        {
562            "name": str(profile.get("name", "")).strip(),
563            "builtin": bool(profile.get("builtin", False)),
564            "artifact_options": list(profile.get("artifact_options", [])),
565        }
566        for profile in load_profiles_from_directory(profiles_root)
567    ]
568
569
570# ---------------------------------------------------------------------------
571# Route handlers
572# ---------------------------------------------------------------------------
573
574artifact_bp = Blueprint("artifacts", __name__)
575
576
577def _purge_stale_parsed_data(case_dir: Path, prev_csv_output_dir: str) -> None:
578    """Remove parsed CSV data from disk before a new parse run.
579
580    Cleans both the default ``case_dir/parsed`` directory and any external
581    CSV output directory that was used by the previous parse run.
582
583    Args:
584        case_dir: Path to the case directory.
585        prev_csv_output_dir: The ``csv_output_dir`` stored from the previous
586            parse run.  May be empty if no prior run exists.
587    """
588    # Clean the default parsed directory inside the case folder.
589    default_parsed = case_dir / "parsed"
590    if default_parsed.is_dir():
591        LOGGER.info("Removing stale parsed output: %s", default_parsed)
592        shutil.rmtree(default_parsed, ignore_errors=True)
593
594    # Clean external CSV output directory if configured and different
595    # from the default location.
596    if not prev_csv_output_dir:
597        return
598    prev_path = Path(prev_csv_output_dir)
599    if not prev_path.is_dir():
600        return
601    resolved_prev = prev_path.resolve()
602    resolved_default = default_parsed.resolve()
603    if resolved_prev == resolved_default:
604        return  # Already handled above.
605    # Safety: refuse to delete filesystem roots or suspiciously short paths.
606    if resolved_prev == resolved_prev.root or resolved_prev == resolved_prev.anchor:
607        LOGGER.warning("Refusing to remove parsed output at filesystem root: %s", resolved_prev)
608        return
609    if len(resolved_prev.parts) <= 2:
610        LOGGER.warning("Refusing to remove parsed output with suspiciously short path: %s", resolved_prev)
611        return
612    LOGGER.info("Removing stale external parsed output: %s", resolved_prev)
613    shutil.rmtree(resolved_prev, ignore_errors=True)
614
615
616def _purge_stale_downstream_case_files(case_dir: Path) -> None:
617    """Remove stale analysis/chat artifacts before a new parse run.
618
619    Args:
620        case_dir: Path to the case directory.
621    """
622    for stale_name in ("analysis_results.json", "prompt.txt", "chat_history.jsonl"):
623        stale_path = case_dir / stale_name
624        try:
625            stale_path.unlink(missing_ok=True)
626        except OSError:
627            LOGGER.warning("Failed to remove stale case artifact: %s", stale_path, exc_info=True)
628
629
630@artifact_bp.post("/api/cases/<case_id>/parse")
631def start_parse(case_id: str) -> tuple[Response, int]:
632    """Start background parsing of selected forensic artifacts.
633
634    Args:
635        case_id: UUID of the case.
636
637    Returns:
638        ``(Response, 202)`` confirming start, or error.
639    """
640    case = get_case(case_id)
641    if case is None:
642        return error_response(f"Case not found: {case_id}", 404)
643    with STATE_LOCK:
644        has_evidence = bool(str(case.get("evidence_path", "")).strip())
645    if not has_evidence:
646        return error_response("No evidence loaded for this case.", 400)
647
648    payload = request.get_json(silent=True) or {}
649    if not isinstance(payload, dict):
650        return error_response("Request body must be a JSON object.", 400)
651    try:
652        artifact_options, parse_artifacts, analysis_artifacts = extract_parse_selection_payload(payload)
653    except ValueError as error:
654        return error_response(str(error), 400)
655
656    if not parse_artifacts:
657        return error_response("Provide at least one artifact key to parse.", 400)
658    try:
659        analysis_date_range = validate_analysis_date_range(payload.get("analysis_date_range"))
660    except ValueError as error:
661        return error_response(str(error), 400)
662
663    with STATE_LOCK:
664        parse_state = PARSE_PROGRESS.setdefault(case_id, new_progress())
665        if parse_state.get("status") == "running":
666            return error_response("Parsing is already running for this case.", 409)
667        case_dir = Path(case["case_dir"])
668        PARSE_PROGRESS[case_id] = new_progress(status="running")
669        case["status"] = "running"
670        case["selected_artifacts"] = list(parse_artifacts)
671        case["analysis_artifacts"] = list(analysis_artifacts)
672        case["artifact_options"] = list(artifact_options)
673        case["analysis_date_range"] = analysis_date_range
674
675        # Capture previous CSV output dir before clearing so we can
676        # remove stale on-disk data outside the case directory.
677        prev_csv_output_dir = str(case.get("csv_output_dir", "")).strip()
678
679        # Invalidate prior parse-derived outputs so a failed rerun
680        # cannot leave stale data usable by downstream analysis.
681        case["parse_results"] = []
682        case["artifact_csv_paths"] = {}
683        case["analysis_results"] = {}
684        case["csv_output_dir"] = ""
685        case["investigation_context"] = ""
686
687    _purge_stale_parsed_data(case_dir, prev_csv_output_dir)
688    _purge_stale_downstream_case_files(case_dir)
689
690    parse_started_event: dict[str, Any] = {
691        "type": "parse_started",
692        "artifacts": parse_artifacts,
693        "analysis_artifacts": analysis_artifacts,
694        "artifact_options": artifact_options,
695        "total_artifacts": len(parse_artifacts),
696    }
697    if analysis_date_range is not None:
698        parse_started_event["analysis_date_range"] = analysis_date_range
699    emit_progress(PARSE_PROGRESS, case_id, parse_started_event)
700    config_snapshot = copy.deepcopy(current_app.config.get("AIFT_CONFIG", {}))
701    from .tasks import run_task_with_case_log_context, run_parse  # deferred to avoid circular import
702    threading.Thread(
703        target=run_task_with_case_log_context,
704        args=(case_id, run_parse, case_id, parse_artifacts, analysis_artifacts, artifact_options, config_snapshot),
705        daemon=True,
706    ).start()
707
708    response_payload: dict[str, Any] = {
709        "status": "started",
710        "case_id": case_id,
711        "artifacts": parse_artifacts,
712        "ai_artifacts": analysis_artifacts,
713        "artifact_options": artifact_options,
714    }
715    if analysis_date_range is not None:
716        response_payload["analysis_date_range"] = analysis_date_range
717    response_payload["success"] = True
718    return jsonify(response_payload), 202
719
720
721@artifact_bp.get("/api/cases/<case_id>/parse/progress")
722def stream_parse_progress(case_id: str) -> Response | tuple[Response, int]:
723    """Stream parsing progress events via SSE.
724
725    Args:
726        case_id: UUID of the case.
727
728    Returns:
729        SSE Response, or 404 error.
730    """
731    if get_case(case_id) is None:
732        return error_response(f"Case not found: {case_id}", 404)
733    return stream_sse(PARSE_PROGRESS, case_id)
734
735
736@artifact_bp.post("/api/cases/<case_id>/parse/cancel")
737def cancel_parse(case_id: str) -> tuple[Response, int]:
738    """Cancel a running parse operation for a case.
739
740    Args:
741        case_id: UUID of the case.
742
743    Returns:
744        ``(Response, 200)`` confirming cancellation, or error.
745    """
746    if get_case(case_id) is None:
747        return error_response(f"Case not found: {case_id}", 404)
748    cancelled = cancel_progress(PARSE_PROGRESS, case_id)
749    if not cancelled:
750        return error_response("No running parse to cancel.", 409)
751    return success_response({"status": "cancelling", "case_id": case_id})
752
753
754@artifact_bp.get("/api/artifact-profiles")
755def list_artifact_profiles() -> Response:
756    """List all available artifact profiles.
757
758    Returns:
759        JSON response with the ``profiles`` list.
760    """
761    config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml")))
762    profiles_root = resolve_profiles_root(config_path)
763    return success_response({"profiles": compose_profile_response(profiles_root)})
764
765
766@artifact_bp.post("/api/artifact-profiles")
767def save_artifact_profile() -> Response | tuple[Response, int]:
768    """Create or update a user-defined artifact profile.
769
770    Returns:
771        JSON with saved profile and updated profiles list, or error.
772    """
773    payload = request.get_json(silent=True)
774    if not isinstance(payload, dict):
775        return error_response("Profile payload must be a JSON object.", 400)
776
777    try:
778        profile_name = normalize_profile_name(payload.get("name"))
779    except ValueError as error:
780        return error_response(str(error), 400)
781
782    try:
783        artifact_options = normalize_artifact_options(payload.get("artifact_options"))
784    except ValueError as error:
785        return error_response(str(error), 400)
786    if not artifact_options:
787        return error_response("Profile must include at least one artifact option.", 400)
788
789    config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml")))
790    profiles_root = resolve_profiles_root(config_path)
791
792    try:
793        profiles = load_profiles_from_directory(profiles_root)
794        profile_key = profile_name.lower()
795        existing = next(
796            (
797                profile
798                for profile in profiles
799                if str(profile.get("name", "")).strip().lower() == profile_key
800            ),
801            None,
802        )
803        if existing is not None and bool(existing.get("builtin", False)):
804            return error_response("`recommended` is a built-in profile and cannot be overwritten.", 400)
805
806        if existing is not None:
807            target_path = Path(existing.get("path"))
808        else:
809            target_path = profile_path_for_new_name(profiles_root, profile_name)
810
811        response_profile = {
812            "name": profile_name,
813            "builtin": False,
814            "artifact_options": artifact_options,
815        }
816        write_profile_file(target_path, response_profile)
817    except OSError:
818        LOGGER.exception("Failed to save artifact profile '%s'", profile_name)
819        return error_response(
820            "Failed to save the profile due to a filesystem error. "
821            "Check directory permissions and retry.",
822            500,
823        )
824
825    return success_response(
826        {
827            "status": "saved",
828            "profile": response_profile,
829            "profiles": compose_profile_response(profiles_root),
830        }
831    )
PROFILE_NAME_RE = re.compile('^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$')
PROFILE_DIRNAME = 'profile'
PROFILE_FILE_SUFFIX = '.json'
artifact_bp = <Blueprint 'artifacts'>
def normalize_artifact_mode(value: Any, default_mode: str = 'parse_and_ai') -> str:
 93def normalize_artifact_mode(value: Any, default_mode: str = MODE_PARSE_AND_AI) -> str:
 94    """Normalise an artifact processing mode to a valid constant.
 95
 96    Args:
 97        value: Raw mode value.
 98        default_mode: Fallback mode.
 99
100    Returns:
101        ``MODE_PARSE_AND_AI`` or ``MODE_PARSE_ONLY``.
102    """
103    mode = str(value or "").strip().lower()
104    if mode == MODE_PARSE_ONLY:
105        return MODE_PARSE_ONLY
106    if mode == MODE_PARSE_AND_AI:
107        return MODE_PARSE_AND_AI
108    return default_mode

Normalise an artifact processing mode to a valid constant.

Arguments:
  • value: Raw mode value.
  • default_mode: Fallback mode.
Returns:

MODE_PARSE_AND_AI or MODE_PARSE_ONLY.

def normalize_artifact_options(payload: Any) -> list[dict[str, str]]:
133def normalize_artifact_options(payload: Any) -> list[dict[str, str]]:
134    """Normalise a raw artifact options payload into canonical form.
135
136    Accepts lists of strings or dicts with various key names.
137
138    Args:
139        payload: Raw ``artifact_options`` value.
140
141    Returns:
142        List of dicts with ``artifact_key`` and ``mode`` keys.
143
144    Raises:
145        ValueError: If *payload* is not a list.
146    """
147    if not isinstance(payload, list):
148        raise ValueError("`artifact_options` must be a JSON array.")
149
150    normalized: list[dict[str, str]] = []
151    seen: set[str] = set()
152    for item in payload:
153        artifact_key = ""
154        mode = MODE_PARSE_AND_AI
155
156        if isinstance(item, str):
157            artifact_key = item.strip()
158        elif isinstance(item, dict):
159            artifact_key = str(item.get("artifact_key") or item.get("key") or "").strip()
160            if "mode" in item:
161                mode = normalize_artifact_mode(item.get("mode"))
162            elif "ai_enabled" in item:
163                mode = MODE_PARSE_AND_AI if bool(item.get("ai_enabled")) else MODE_PARSE_ONLY
164            else:
165                mode = normalize_artifact_mode(item.get("parse_mode"), default_mode=MODE_PARSE_AND_AI)
166        else:
167            continue
168
169        if not artifact_key or artifact_key in seen:
170            continue
171        seen.add(artifact_key)
172        normalized.append({"artifact_key": artifact_key, "mode": mode})
173
174    return normalized

Normalise a raw artifact options payload into canonical form.

Accepts lists of strings or dicts with various key names.

Arguments:
  • payload: Raw artifact_options value.
Returns:

List of dicts with artifact_key and mode keys.

Raises:
  • ValueError: If payload is not a list.
def artifact_options_to_lists(artifact_options: list[dict[str, str]]) -> tuple[list[str], list[str]]:
177def artifact_options_to_lists(artifact_options: list[dict[str, str]]) -> tuple[list[str], list[str]]:
178    """Split normalised artifact options into parse and analysis lists.
179
180    Args:
181        artifact_options: Canonical artifact option dicts.
182
183    Returns:
184        ``(parse_artifacts, analysis_artifacts)`` tuple.
185    """
186    parse_artifacts: list[str] = []
187    analysis_artifacts: list[str] = []
188    for option in artifact_options:
189        artifact_key = str(option.get("artifact_key", "")).strip()
190        if not artifact_key:
191            continue
192        parse_artifacts.append(artifact_key)
193        if normalize_artifact_mode(option.get("mode")) == MODE_PARSE_AND_AI:
194            analysis_artifacts.append(artifact_key)
195    return parse_artifacts, analysis_artifacts

Split normalised artifact options into parse and analysis lists.

Arguments:
  • artifact_options: Canonical artifact option dicts.
Returns:

(parse_artifacts, analysis_artifacts) tuple.

def extract_parse_selection_payload( payload: dict[str, typing.Any]) -> tuple[list[dict[str, str]], list[str], list[str]]:
221def extract_parse_selection_payload(
222    payload: dict[str, Any],
223) -> tuple[list[dict[str, str]], list[str], list[str]]:
224    """Extract and normalise artifact selection from a parse request payload.
225
226    Supports both ``artifact_options`` (new) and ``artifacts``/``ai_artifacts``
227    (legacy) formats.
228
229    Args:
230        payload: Parsed JSON body from the parse-start request.
231
232    Returns:
233        ``(artifact_options, parse_artifacts, analysis_artifacts)`` tuple.
234
235    Raises:
236        ValueError: If the payload contains invalid fields.
237    """
238    if "artifact_options" in payload:
239        artifact_options = normalize_artifact_options(payload.get("artifact_options"))
240        parse_artifacts, analysis_artifacts = artifact_options_to_lists(artifact_options)
241        return artifact_options, parse_artifacts, analysis_artifacts
242
243    artifacts_raw = payload.get("artifacts", [])
244    if not isinstance(artifacts_raw, list):
245        raise ValueError("`artifacts` must be a JSON array.")
246    parse_artifacts = _normalize_string_list(artifacts_raw)
247
248    if "ai_artifacts" in payload:
249        ai_raw = payload.get("ai_artifacts")
250        if not isinstance(ai_raw, list):
251            raise ValueError("`ai_artifacts` must be a JSON array.")
252        selected_set = set(parse_artifacts)
253        analysis_artifacts = [key for key in _normalize_string_list(ai_raw) if key in selected_set]
254    else:
255        analysis_artifacts = list(parse_artifacts)
256
257    artifact_options = _build_artifact_options_from_lists(
258        parse_artifacts=parse_artifacts,
259        analysis_artifacts=analysis_artifacts,
260    )
261    return artifact_options, parse_artifacts, analysis_artifacts

Extract and normalise artifact selection from a parse request payload.

Supports both artifact_options (new) and artifacts/ai_artifacts (legacy) formats.

Arguments:
  • payload: Parsed JSON body from the parse-start request.
Returns:

(artifact_options, parse_artifacts, analysis_artifacts) tuple.

Raises:
  • ValueError: If the payload contains invalid fields.
def validate_analysis_date_range(payload: Any) -> dict[str, str] | None:
264def validate_analysis_date_range(payload: Any) -> dict[str, str] | None:
265    """Validate and normalise an optional analysis date range.
266
267    Args:
268        payload: Raw ``analysis_date_range`` value from request JSON.
269
270    Returns:
271        Dict with ``start_date`` and ``end_date``, or ``None``.
272
273    Raises:
274        ValueError: On invalid format or range.
275    """
276    if payload is None:
277        return None
278
279    if not isinstance(payload, dict):
280        raise ValueError("`analysis_date_range` must be an object.")
281
282    start_raw = payload.get("start_date")
283    end_raw = payload.get("end_date")
284    start_text = str(start_raw).strip() if start_raw is not None else ""
285    end_text = str(end_raw).strip() if end_raw is not None else ""
286    if not start_text and not end_text:
287        return None
288    if not start_text or not end_text:
289        raise ValueError(
290            "Provide both `analysis_date_range.start_date` and `analysis_date_range.end_date`."
291        )
292
293    try:
294        start_date = datetime.strptime(start_text, "%Y-%m-%d").date()
295        end_date = datetime.strptime(end_text, "%Y-%m-%d").date()
296    except ValueError as error:
297        raise ValueError("Date range values must use YYYY-MM-DD format.") from error
298
299    if start_date > end_date:
300        raise ValueError(
301            "`analysis_date_range.start_date` must be earlier than or equal to `end_date`."
302        )
303
304    return {
305        "start_date": start_date.isoformat(),
306        "end_date": end_date.isoformat(),
307    }

Validate and normalise an optional analysis date range.

Arguments:
  • payload: Raw analysis_date_range value from request JSON.
Returns:

Dict with start_date and end_date, or None.

Raises:
  • ValueError: On invalid format or range.
def extract_parse_progress(fallback_artifact: str, args: tuple[typing.Any, ...]) -> tuple[str, int]:
310def extract_parse_progress(fallback_artifact: str, args: tuple[Any, ...]) -> tuple[str, int]:
311    """Extract artifact key and record count from a parser progress callback.
312
313    Args:
314        fallback_artifact: Default artifact key.
315        args: Positional arguments from the callback.
316
317    Returns:
318        ``(artifact_key, record_count)`` tuple.
319    """
320    if not args:
321        return fallback_artifact, 0
322    first = args[0]
323    if isinstance(first, dict):
324        return str(first.get("artifact_key", fallback_artifact)), safe_int(first.get("record_count", 0))
325    if len(args) >= 2:
326        return str(args[0] or fallback_artifact), safe_int(args[1], 0)
327    return fallback_artifact, safe_int(first, 0)

Extract artifact key and record count from a parser progress callback.

Arguments:
  • fallback_artifact: Default artifact key.
  • args: Positional arguments from the callback.
Returns:

(artifact_key, record_count) tuple.

def sanitize_prompt(prompt: str, max_chars: int = 2000) -> str:
330def sanitize_prompt(prompt: str, max_chars: int = 2000) -> str:
331    """Normalise and truncate a user prompt for audit logging.
332
333    Args:
334        prompt: Raw user prompt text.
335        max_chars: Maximum character length. Defaults to 2000.
336
337    Returns:
338        Normalised (and possibly truncated) prompt string.
339    """
340    normalized = " ".join(prompt.split())
341    if len(normalized) <= max_chars:
342        return normalized
343    return f"{normalized[:max_chars]}... [truncated]"

Normalise and truncate a user prompt for audit logging.

Arguments:
  • prompt: Raw user prompt text.
  • max_chars: Maximum character length. Defaults to 2000.
Returns:

Normalised (and possibly truncated) prompt string.

def resolve_profiles_root(config_path: str | pathlib.Path) -> pathlib.Path:
374def resolve_profiles_root(config_path: str | Path) -> Path:
375    """Resolve the directory where artifact profiles are stored.
376
377    Args:
378        config_path: Path to the AIFT configuration file.
379
380    Returns:
381        Absolute ``Path`` to the profiles directory.
382    """
383    return Path(config_path).parent / PROFILE_DIRNAME

Resolve the directory where artifact profiles are stored.

Arguments:
  • config_path: Path to the AIFT configuration file.
Returns:

Absolute Path to the profiles directory.

def compose_profile_response(profiles_root: pathlib.Path) -> list[dict[str, typing.Any]]:
552def compose_profile_response(profiles_root: Path) -> list[dict[str, Any]]:
553    """Build the API response payload for all artifact profiles.
554
555    Args:
556        profiles_root: Directory containing profile files.
557
558    Returns:
559        List of dicts with ``name``, ``builtin``, and ``artifact_options``.
560    """
561    return [
562        {
563            "name": str(profile.get("name", "")).strip(),
564            "builtin": bool(profile.get("builtin", False)),
565            "artifact_options": list(profile.get("artifact_options", [])),
566        }
567        for profile in load_profiles_from_directory(profiles_root)
568    ]

Build the API response payload for all artifact profiles.

Arguments:
  • profiles_root: Directory containing profile files.
Returns:

List of dicts with name, builtin, and artifact_options.

def load_profiles_from_directory(profiles_root: pathlib.Path) -> list[dict[str, typing.Any]]:
472def load_profiles_from_directory(profiles_root: Path) -> list[dict[str, Any]]:
473    """Load all valid artifact profiles from the profiles directory.
474
475    Args:
476        profiles_root: Directory containing profile JSON files.
477
478    Returns:
479        Sorted list of validated profile dicts.
480    """
481    profiles_root.mkdir(parents=True, exist_ok=True)
482    _ensure_recommended_profile(profiles_root)
483
484    profiles: list[dict[str, Any]] = []
485    seen_names: set[str] = set()
486    for path in sorted(profiles_root.glob(f"*{PROFILE_FILE_SUFFIX}"), key=lambda item: item.name.lower()):
487        profile = _load_profile_file(path)
488        if profile is None:
489            continue
490        profile_key = str(profile.get("name", "")).strip().lower()
491        if not profile_key or profile_key in seen_names:
492            continue
493        seen_names.add(profile_key)
494        profiles.append(profile)
495
496    profiles.sort(
497        key=lambda item: (
498            0 if str(item.get("name", "")).strip().lower() == BUILTIN_RECOMMENDED_PROFILE else 1,
499            str(item.get("name", "")).strip().lower(),
500        )
501    )
502    return profiles

Load all valid artifact profiles from the profiles directory.

Arguments:
  • profiles_root: Directory containing profile JSON files.
Returns:

Sorted list of validated profile dicts.

def normalize_profile_name(value: Any) -> str:
528def normalize_profile_name(value: Any) -> str:
529    """Validate and normalise a profile name from user input.
530
531    Args:
532        value: Raw profile name.
533
534    Returns:
535        Stripped, validated profile name.
536
537    Raises:
538        ValueError: If the name is empty, reserved, or invalid.
539    """
540    name = str(value or "").strip()
541    if not name:
542        raise ValueError("Profile name is required.")
543    if name.lower() == BUILTIN_RECOMMENDED_PROFILE:
544        raise ValueError("`recommended` is a built-in profile and cannot be overwritten.")
545    if not PROFILE_NAME_RE.fullmatch(name):
546        raise ValueError(
547            "Profile name must be 1-64 chars and use letters, numbers, spaces, period, underscore, or hyphen."
548        )
549    return name

Validate and normalise a profile name from user input.

Arguments:
  • value: Raw profile name.
Returns:

Stripped, validated profile name.

Raises:
  • ValueError: If the name is empty, reserved, or invalid.
def profile_path_for_new_name(profiles_root: pathlib.Path, profile_name: str) -> pathlib.Path:
505def profile_path_for_new_name(profiles_root: Path, profile_name: str) -> Path:
506    """Compute a unique file path for a new artifact profile.
507
508    Args:
509        profiles_root: Directory for profile files.
510        profile_name: Human-readable profile name.
511
512    Returns:
513        A non-existent ``Path`` suitable for writing.
514    """
515    stem = safe_name(profile_name.lower(), fallback="profile")
516    candidate = profiles_root / f"{stem}{PROFILE_FILE_SUFFIX}"
517    if not candidate.exists():
518        return candidate
519
520    index = 1
521    while True:
522        candidate = profiles_root / f"{stem}_{index}{PROFILE_FILE_SUFFIX}"
523        if not candidate.exists():
524            return candidate
525        index += 1

Compute a unique file path for a new artifact profile.

Arguments:
  • profiles_root: Directory for profile files.
  • profile_name: Human-readable profile name.
Returns:

A non-existent Path suitable for writing.

def write_profile_file(path: pathlib.Path, payload: dict[str, typing.Any]) -> None:
399def write_profile_file(path: Path, payload: dict[str, Any]) -> None:
400    """Write an artifact profile to a JSON file.
401
402    Args:
403        path: Destination file path.
404        payload: Profile data to serialise.
405    """
406    path.parent.mkdir(parents=True, exist_ok=True)
407    content = json.dumps(payload, indent=2, ensure_ascii=True)
408    path.write_text(f"{content}\n", encoding="utf-8")

Write an artifact profile to a JSON file.

Arguments:
  • path: Destination file path.
  • payload: Profile data to serialise.