app.routes.artifacts
Artifact option normalisation, profile management, validation, and route handlers.
This module handles:
- Normalising artifact selection payloads (new
artifact_optionsformat and legacyartifacts/ai_artifactsformat). - Artifact profile CRUD (load, save, list) including the built-in
recommendedprofile. - Analysis date-range validation.
- Parse-progress extraction and prompt sanitisation utilities.
- Flask route handlers for starting/streaming parse operations and profile CRUD.
Attributes:
- PROFILE_NAME_RE: Regex for validating artifact profile names.
- BUILTIN_RECOMMENDED_PROFILE: Name of the built-in recommended profile.
- PROFILE_DIRNAME: Subdirectory for profile JSON files.
- PROFILE_FILE_SUFFIX: File extension for profile files.
- RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS: Artifacts excluded from the recommended profile.
- artifact_bp: Flask Blueprint for artifact and parse routes.
1"""Artifact option normalisation, profile management, validation, and route handlers. 2 3This module handles: 4 5* Normalising artifact selection payloads (new ``artifact_options`` format 6 and legacy ``artifacts``/``ai_artifacts`` format). 7* Artifact profile CRUD (load, save, list) including the built-in 8 ``recommended`` profile. 9* Analysis date-range validation. 10* Parse-progress extraction and prompt sanitisation utilities. 11* Flask route handlers for starting/streaming parse operations and profile CRUD. 12 13Attributes: 14 PROFILE_NAME_RE: Regex for validating artifact profile names. 15 BUILTIN_RECOMMENDED_PROFILE: Name of the built-in recommended profile. 16 PROFILE_DIRNAME: Subdirectory for profile JSON files. 17 PROFILE_FILE_SUFFIX: File extension for profile files. 18 RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS: Artifacts excluded from the 19 recommended profile. 20 artifact_bp: Flask Blueprint for artifact and parse routes. 21""" 22 23from __future__ import annotations 24 25import copy 26import json 27import logging 28import shutil 29import threading 30from datetime import datetime 31from pathlib import Path 32import re 33from typing import Any 34 35from flask import Blueprint, Response, current_app, jsonify, request 36 37from ..parser import LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY 38from .state import ( 39 MODE_PARSE_AND_AI, 40 MODE_PARSE_ONLY, 41 PARSE_PROGRESS, 42 STATE_LOCK, 43 cancel_progress, 44 emit_progress, 45 error_response, 46 get_case, 47 new_progress, 48 safe_int, 49 safe_name, 50 stream_sse, 51 success_response, 52) 53 54# NOTE: .tasks imports are deferred to avoid circular import 55# (tasks.py imports from artifacts.py). See _get_task_runners(). 56 57__all__ = [ 58 "PROFILE_NAME_RE", 59 "BUILTIN_RECOMMENDED_PROFILE", 60 "PROFILE_DIRNAME", 61 "PROFILE_FILE_SUFFIX", 62 "RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS", 63 "artifact_bp", 64 "normalize_artifact_mode", 65 "normalize_artifact_options", 66 "artifact_options_to_lists", 67 "extract_parse_selection_payload", 68 "validate_analysis_date_range", 69 "extract_parse_progress", 70 "sanitize_prompt", 71 "resolve_profiles_root", 72 "compose_profile_response", 73 "load_profiles_from_directory", 74 "normalize_profile_name", 75 "profile_path_for_new_name", 76 "write_profile_file", 77] 78 79LOGGER = logging.getLogger(__name__) 80 81PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$") 82BUILTIN_RECOMMENDED_PROFILE = "recommended" 83PROFILE_DIRNAME = "profile" 84PROFILE_FILE_SUFFIX = ".json" 85RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS = {"mft", "usnjrnl", "evtx", "defender.evtx"} 86 87 88# --------------------------------------------------------------------------- 89# Artifact option helpers 90# --------------------------------------------------------------------------- 91 92def normalize_artifact_mode(value: Any, default_mode: str = MODE_PARSE_AND_AI) -> str: 93 """Normalise an artifact processing mode to a valid constant. 94 95 Args: 96 value: Raw mode value. 97 default_mode: Fallback mode. 98 99 Returns: 100 ``MODE_PARSE_AND_AI`` or ``MODE_PARSE_ONLY``. 101 """ 102 mode = str(value or "").strip().lower() 103 if mode == MODE_PARSE_ONLY: 104 return MODE_PARSE_ONLY 105 if mode == MODE_PARSE_AND_AI: 106 return MODE_PARSE_AND_AI 107 return default_mode 108 109 110def _normalize_string_list(values: Any) -> list[str]: 111 """Deduplicate and normalise a list of values to non-empty strings. 112 113 Args: 114 values: Input list (or non-list, returns empty). 115 116 Returns: 117 Deduplicated list of non-empty stripped strings. 118 """ 119 if not isinstance(values, list): 120 return [] 121 normalized: list[str] = [] 122 seen: set[str] = set() 123 for value in values: 124 text = str(value).strip() 125 if not text or text in seen: 126 continue 127 seen.add(text) 128 normalized.append(text) 129 return normalized 130 131 132def normalize_artifact_options(payload: Any) -> list[dict[str, str]]: 133 """Normalise a raw artifact options payload into canonical form. 134 135 Accepts lists of strings or dicts with various key names. 136 137 Args: 138 payload: Raw ``artifact_options`` value. 139 140 Returns: 141 List of dicts with ``artifact_key`` and ``mode`` keys. 142 143 Raises: 144 ValueError: If *payload* is not a list. 145 """ 146 if not isinstance(payload, list): 147 raise ValueError("`artifact_options` must be a JSON array.") 148 149 normalized: list[dict[str, str]] = [] 150 seen: set[str] = set() 151 for item in payload: 152 artifact_key = "" 153 mode = MODE_PARSE_AND_AI 154 155 if isinstance(item, str): 156 artifact_key = item.strip() 157 elif isinstance(item, dict): 158 artifact_key = str(item.get("artifact_key") or item.get("key") or "").strip() 159 if "mode" in item: 160 mode = normalize_artifact_mode(item.get("mode")) 161 elif "ai_enabled" in item: 162 mode = MODE_PARSE_AND_AI if bool(item.get("ai_enabled")) else MODE_PARSE_ONLY 163 else: 164 mode = normalize_artifact_mode(item.get("parse_mode"), default_mode=MODE_PARSE_AND_AI) 165 else: 166 continue 167 168 if not artifact_key or artifact_key in seen: 169 continue 170 seen.add(artifact_key) 171 normalized.append({"artifact_key": artifact_key, "mode": mode}) 172 173 return normalized 174 175 176def artifact_options_to_lists(artifact_options: list[dict[str, str]]) -> tuple[list[str], list[str]]: 177 """Split normalised artifact options into parse and analysis lists. 178 179 Args: 180 artifact_options: Canonical artifact option dicts. 181 182 Returns: 183 ``(parse_artifacts, analysis_artifacts)`` tuple. 184 """ 185 parse_artifacts: list[str] = [] 186 analysis_artifacts: list[str] = [] 187 for option in artifact_options: 188 artifact_key = str(option.get("artifact_key", "")).strip() 189 if not artifact_key: 190 continue 191 parse_artifacts.append(artifact_key) 192 if normalize_artifact_mode(option.get("mode")) == MODE_PARSE_AND_AI: 193 analysis_artifacts.append(artifact_key) 194 return parse_artifacts, analysis_artifacts 195 196 197def _build_artifact_options_from_lists( 198 parse_artifacts: list[str], 199 analysis_artifacts: list[str], 200) -> list[dict[str, str]]: 201 """Construct canonical artifact options from separate lists. 202 203 Args: 204 parse_artifacts: All artifact keys to parse. 205 analysis_artifacts: Subset for AI analysis. 206 207 Returns: 208 List of dicts with ``artifact_key`` and ``mode``. 209 """ 210 analysis_set = set(analysis_artifacts) 211 return [ 212 { 213 "artifact_key": artifact_key, 214 "mode": MODE_PARSE_AND_AI if artifact_key in analysis_set else MODE_PARSE_ONLY, 215 } 216 for artifact_key in parse_artifacts 217 ] 218 219 220def extract_parse_selection_payload( 221 payload: dict[str, Any], 222) -> tuple[list[dict[str, str]], list[str], list[str]]: 223 """Extract and normalise artifact selection from a parse request payload. 224 225 Supports both ``artifact_options`` (new) and ``artifacts``/``ai_artifacts`` 226 (legacy) formats. 227 228 Args: 229 payload: Parsed JSON body from the parse-start request. 230 231 Returns: 232 ``(artifact_options, parse_artifacts, analysis_artifacts)`` tuple. 233 234 Raises: 235 ValueError: If the payload contains invalid fields. 236 """ 237 if "artifact_options" in payload: 238 artifact_options = normalize_artifact_options(payload.get("artifact_options")) 239 parse_artifacts, analysis_artifacts = artifact_options_to_lists(artifact_options) 240 return artifact_options, parse_artifacts, analysis_artifacts 241 242 artifacts_raw = payload.get("artifacts", []) 243 if not isinstance(artifacts_raw, list): 244 raise ValueError("`artifacts` must be a JSON array.") 245 parse_artifacts = _normalize_string_list(artifacts_raw) 246 247 if "ai_artifacts" in payload: 248 ai_raw = payload.get("ai_artifacts") 249 if not isinstance(ai_raw, list): 250 raise ValueError("`ai_artifacts` must be a JSON array.") 251 selected_set = set(parse_artifacts) 252 analysis_artifacts = [key for key in _normalize_string_list(ai_raw) if key in selected_set] 253 else: 254 analysis_artifacts = list(parse_artifacts) 255 256 artifact_options = _build_artifact_options_from_lists( 257 parse_artifacts=parse_artifacts, 258 analysis_artifacts=analysis_artifacts, 259 ) 260 return artifact_options, parse_artifacts, analysis_artifacts 261 262 263def validate_analysis_date_range(payload: Any) -> dict[str, str] | None: 264 """Validate and normalise an optional analysis date range. 265 266 Args: 267 payload: Raw ``analysis_date_range`` value from request JSON. 268 269 Returns: 270 Dict with ``start_date`` and ``end_date``, or ``None``. 271 272 Raises: 273 ValueError: On invalid format or range. 274 """ 275 if payload is None: 276 return None 277 278 if not isinstance(payload, dict): 279 raise ValueError("`analysis_date_range` must be an object.") 280 281 start_raw = payload.get("start_date") 282 end_raw = payload.get("end_date") 283 start_text = str(start_raw).strip() if start_raw is not None else "" 284 end_text = str(end_raw).strip() if end_raw is not None else "" 285 if not start_text and not end_text: 286 return None 287 if not start_text or not end_text: 288 raise ValueError( 289 "Provide both `analysis_date_range.start_date` and `analysis_date_range.end_date`." 290 ) 291 292 try: 293 start_date = datetime.strptime(start_text, "%Y-%m-%d").date() 294 end_date = datetime.strptime(end_text, "%Y-%m-%d").date() 295 except ValueError as error: 296 raise ValueError("Date range values must use YYYY-MM-DD format.") from error 297 298 if start_date > end_date: 299 raise ValueError( 300 "`analysis_date_range.start_date` must be earlier than or equal to `end_date`." 301 ) 302 303 return { 304 "start_date": start_date.isoformat(), 305 "end_date": end_date.isoformat(), 306 } 307 308 309def extract_parse_progress(fallback_artifact: str, args: tuple[Any, ...]) -> tuple[str, int]: 310 """Extract artifact key and record count from a parser progress callback. 311 312 Args: 313 fallback_artifact: Default artifact key. 314 args: Positional arguments from the callback. 315 316 Returns: 317 ``(artifact_key, record_count)`` tuple. 318 """ 319 if not args: 320 return fallback_artifact, 0 321 first = args[0] 322 if isinstance(first, dict): 323 return str(first.get("artifact_key", fallback_artifact)), safe_int(first.get("record_count", 0)) 324 if len(args) >= 2: 325 return str(args[0] or fallback_artifact), safe_int(args[1], 0) 326 return fallback_artifact, safe_int(first, 0) 327 328 329def sanitize_prompt(prompt: str, max_chars: int = 2000) -> str: 330 """Normalise and truncate a user prompt for audit logging. 331 332 Args: 333 prompt: Raw user prompt text. 334 max_chars: Maximum character length. Defaults to 2000. 335 336 Returns: 337 Normalised (and possibly truncated) prompt string. 338 """ 339 normalized = " ".join(prompt.split()) 340 if len(normalized) <= max_chars: 341 return normalized 342 return f"{normalized[:max_chars]}... [truncated]" 343 344 345# --------------------------------------------------------------------------- 346# Profile management 347# --------------------------------------------------------------------------- 348 349def _recommended_artifact_options() -> list[dict[str, str]]: 350 """Build artifact options for the built-in 'recommended' profile. 351 352 Includes artifacts from both the Windows and Linux registries so that 353 a single profile works regardless of the evidence OS. Duplicate keys 354 (e.g. ``services``) are emitted only once. 355 356 Returns: 357 List of artifact option dicts for the recommended profile. 358 """ 359 profile: list[dict[str, str]] = [] 360 seen: set[str] = set() 361 for registry in (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY): 362 for artifact_key in registry: 363 normalized_key = str(artifact_key).strip().lower() 364 if normalized_key in RECOMMENDED_PROFILE_EXCLUDED_ARTIFACTS: 365 continue 366 if normalized_key in seen: 367 continue 368 seen.add(normalized_key) 369 profile.append({"artifact_key": str(artifact_key), "mode": MODE_PARSE_AND_AI}) 370 return profile 371 372 373def resolve_profiles_root(config_path: str | Path) -> Path: 374 """Resolve the directory where artifact profiles are stored. 375 376 Args: 377 config_path: Path to the AIFT configuration file. 378 379 Returns: 380 Absolute ``Path`` to the profiles directory. 381 """ 382 return Path(config_path).parent / PROFILE_DIRNAME 383 384 385def _recommended_profile_payload() -> dict[str, Any]: 386 """Build the full payload for the built-in recommended profile. 387 388 Returns: 389 Dict with ``name``, ``builtin``, and ``artifact_options``. 390 """ 391 return { 392 "name": BUILTIN_RECOMMENDED_PROFILE, 393 "builtin": True, 394 "artifact_options": _recommended_artifact_options(), 395 } 396 397 398def write_profile_file(path: Path, payload: dict[str, Any]) -> None: 399 """Write an artifact profile to a JSON file. 400 401 Args: 402 path: Destination file path. 403 payload: Profile data to serialise. 404 """ 405 path.parent.mkdir(parents=True, exist_ok=True) 406 content = json.dumps(payload, indent=2, ensure_ascii=True) 407 path.write_text(f"{content}\n", encoding="utf-8") 408 409 410def _load_profile_file(path: Path) -> dict[str, Any] | None: 411 """Load and validate a single artifact profile from a JSON file. 412 413 Args: 414 path: Path to the profile JSON file. 415 416 Returns: 417 Validated profile dict, or ``None`` if invalid. 418 """ 419 try: 420 raw = json.loads(path.read_text(encoding="utf-8")) 421 except (OSError, json.JSONDecodeError): 422 LOGGER.warning("Skipping unreadable profile file: %s", path) 423 return None 424 425 if not isinstance(raw, dict): 426 LOGGER.warning("Skipping invalid profile payload in %s", path) 427 return None 428 429 name = str(raw.get("name", "")).strip() or path.stem 430 if not name: 431 return None 432 if name.lower() != BUILTIN_RECOMMENDED_PROFILE and not PROFILE_NAME_RE.fullmatch(name): 433 LOGGER.warning("Skipping profile with invalid name in %s", path) 434 return None 435 436 options_payload = raw.get("artifact_options") 437 if options_payload is None: 438 options_payload = raw.get("selections", []) 439 try: 440 artifact_options = normalize_artifact_options(options_payload if options_payload is not None else []) 441 except ValueError: 442 LOGGER.warning("Skipping profile with invalid artifact options in %s", path) 443 return None 444 445 builtin = bool(raw.get("builtin", False)) 446 if name.lower() == BUILTIN_RECOMMENDED_PROFILE: 447 builtin = True 448 artifact_options = _recommended_artifact_options() 449 elif not artifact_options: 450 LOGGER.warning("Skipping profile with no artifact options in %s", path) 451 return None 452 453 return { 454 "name": name, 455 "builtin": builtin, 456 "artifact_options": artifact_options, 457 "path": path, 458 } 459 460 461def _ensure_recommended_profile(profiles_root: Path) -> None: 462 """Ensure the built-in recommended profile exists on disk. 463 464 Args: 465 profiles_root: Directory for profile files. 466 """ 467 recommended_path = profiles_root / f"{BUILTIN_RECOMMENDED_PROFILE}{PROFILE_FILE_SUFFIX}" 468 write_profile_file(recommended_path, _recommended_profile_payload()) 469 470 471def load_profiles_from_directory(profiles_root: Path) -> list[dict[str, Any]]: 472 """Load all valid artifact profiles from the profiles directory. 473 474 Args: 475 profiles_root: Directory containing profile JSON files. 476 477 Returns: 478 Sorted list of validated profile dicts. 479 """ 480 profiles_root.mkdir(parents=True, exist_ok=True) 481 _ensure_recommended_profile(profiles_root) 482 483 profiles: list[dict[str, Any]] = [] 484 seen_names: set[str] = set() 485 for path in sorted(profiles_root.glob(f"*{PROFILE_FILE_SUFFIX}"), key=lambda item: item.name.lower()): 486 profile = _load_profile_file(path) 487 if profile is None: 488 continue 489 profile_key = str(profile.get("name", "")).strip().lower() 490 if not profile_key or profile_key in seen_names: 491 continue 492 seen_names.add(profile_key) 493 profiles.append(profile) 494 495 profiles.sort( 496 key=lambda item: ( 497 0 if str(item.get("name", "")).strip().lower() == BUILTIN_RECOMMENDED_PROFILE else 1, 498 str(item.get("name", "")).strip().lower(), 499 ) 500 ) 501 return profiles 502 503 504def profile_path_for_new_name(profiles_root: Path, profile_name: str) -> Path: 505 """Compute a unique file path for a new artifact profile. 506 507 Args: 508 profiles_root: Directory for profile files. 509 profile_name: Human-readable profile name. 510 511 Returns: 512 A non-existent ``Path`` suitable for writing. 513 """ 514 stem = safe_name(profile_name.lower(), fallback="profile") 515 candidate = profiles_root / f"{stem}{PROFILE_FILE_SUFFIX}" 516 if not candidate.exists(): 517 return candidate 518 519 index = 1 520 while True: 521 candidate = profiles_root / f"{stem}_{index}{PROFILE_FILE_SUFFIX}" 522 if not candidate.exists(): 523 return candidate 524 index += 1 525 526 527def normalize_profile_name(value: Any) -> str: 528 """Validate and normalise a profile name from user input. 529 530 Args: 531 value: Raw profile name. 532 533 Returns: 534 Stripped, validated profile name. 535 536 Raises: 537 ValueError: If the name is empty, reserved, or invalid. 538 """ 539 name = str(value or "").strip() 540 if not name: 541 raise ValueError("Profile name is required.") 542 if name.lower() == BUILTIN_RECOMMENDED_PROFILE: 543 raise ValueError("`recommended` is a built-in profile and cannot be overwritten.") 544 if not PROFILE_NAME_RE.fullmatch(name): 545 raise ValueError( 546 "Profile name must be 1-64 chars and use letters, numbers, spaces, period, underscore, or hyphen." 547 ) 548 return name 549 550 551def compose_profile_response(profiles_root: Path) -> list[dict[str, Any]]: 552 """Build the API response payload for all artifact profiles. 553 554 Args: 555 profiles_root: Directory containing profile files. 556 557 Returns: 558 List of dicts with ``name``, ``builtin``, and ``artifact_options``. 559 """ 560 return [ 561 { 562 "name": str(profile.get("name", "")).strip(), 563 "builtin": bool(profile.get("builtin", False)), 564 "artifact_options": list(profile.get("artifact_options", [])), 565 } 566 for profile in load_profiles_from_directory(profiles_root) 567 ] 568 569 570# --------------------------------------------------------------------------- 571# Route handlers 572# --------------------------------------------------------------------------- 573 574artifact_bp = Blueprint("artifacts", __name__) 575 576 577def _purge_stale_parsed_data(case_dir: Path, prev_csv_output_dir: str) -> None: 578 """Remove parsed CSV data from disk before a new parse run. 579 580 Cleans both the default ``case_dir/parsed`` directory and any external 581 CSV output directory that was used by the previous parse run. 582 583 Args: 584 case_dir: Path to the case directory. 585 prev_csv_output_dir: The ``csv_output_dir`` stored from the previous 586 parse run. May be empty if no prior run exists. 587 """ 588 # Clean the default parsed directory inside the case folder. 589 default_parsed = case_dir / "parsed" 590 if default_parsed.is_dir(): 591 LOGGER.info("Removing stale parsed output: %s", default_parsed) 592 shutil.rmtree(default_parsed, ignore_errors=True) 593 594 # Clean external CSV output directory if configured and different 595 # from the default location. 596 if not prev_csv_output_dir: 597 return 598 prev_path = Path(prev_csv_output_dir) 599 if not prev_path.is_dir(): 600 return 601 resolved_prev = prev_path.resolve() 602 resolved_default = default_parsed.resolve() 603 if resolved_prev == resolved_default: 604 return # Already handled above. 605 # Safety: refuse to delete filesystem roots or suspiciously short paths. 606 if resolved_prev == resolved_prev.root or resolved_prev == resolved_prev.anchor: 607 LOGGER.warning("Refusing to remove parsed output at filesystem root: %s", resolved_prev) 608 return 609 if len(resolved_prev.parts) <= 2: 610 LOGGER.warning("Refusing to remove parsed output with suspiciously short path: %s", resolved_prev) 611 return 612 LOGGER.info("Removing stale external parsed output: %s", resolved_prev) 613 shutil.rmtree(resolved_prev, ignore_errors=True) 614 615 616def _purge_stale_downstream_case_files(case_dir: Path) -> None: 617 """Remove stale analysis/chat artifacts before a new parse run. 618 619 Args: 620 case_dir: Path to the case directory. 621 """ 622 for stale_name in ("analysis_results.json", "prompt.txt", "chat_history.jsonl"): 623 stale_path = case_dir / stale_name 624 try: 625 stale_path.unlink(missing_ok=True) 626 except OSError: 627 LOGGER.warning("Failed to remove stale case artifact: %s", stale_path, exc_info=True) 628 629 630@artifact_bp.post("/api/cases/<case_id>/parse") 631def start_parse(case_id: str) -> tuple[Response, int]: 632 """Start background parsing of selected forensic artifacts. 633 634 Args: 635 case_id: UUID of the case. 636 637 Returns: 638 ``(Response, 202)`` confirming start, or error. 639 """ 640 case = get_case(case_id) 641 if case is None: 642 return error_response(f"Case not found: {case_id}", 404) 643 with STATE_LOCK: 644 has_evidence = bool(str(case.get("evidence_path", "")).strip()) 645 if not has_evidence: 646 return error_response("No evidence loaded for this case.", 400) 647 648 payload = request.get_json(silent=True) or {} 649 if not isinstance(payload, dict): 650 return error_response("Request body must be a JSON object.", 400) 651 try: 652 artifact_options, parse_artifacts, analysis_artifacts = extract_parse_selection_payload(payload) 653 except ValueError as error: 654 return error_response(str(error), 400) 655 656 if not parse_artifacts: 657 return error_response("Provide at least one artifact key to parse.", 400) 658 try: 659 analysis_date_range = validate_analysis_date_range(payload.get("analysis_date_range")) 660 except ValueError as error: 661 return error_response(str(error), 400) 662 663 with STATE_LOCK: 664 parse_state = PARSE_PROGRESS.setdefault(case_id, new_progress()) 665 if parse_state.get("status") == "running": 666 return error_response("Parsing is already running for this case.", 409) 667 case_dir = Path(case["case_dir"]) 668 PARSE_PROGRESS[case_id] = new_progress(status="running") 669 case["status"] = "running" 670 case["selected_artifacts"] = list(parse_artifacts) 671 case["analysis_artifacts"] = list(analysis_artifacts) 672 case["artifact_options"] = list(artifact_options) 673 case["analysis_date_range"] = analysis_date_range 674 675 # Capture previous CSV output dir before clearing so we can 676 # remove stale on-disk data outside the case directory. 677 prev_csv_output_dir = str(case.get("csv_output_dir", "")).strip() 678 679 # Invalidate prior parse-derived outputs so a failed rerun 680 # cannot leave stale data usable by downstream analysis. 681 case["parse_results"] = [] 682 case["artifact_csv_paths"] = {} 683 case["analysis_results"] = {} 684 case["csv_output_dir"] = "" 685 case["investigation_context"] = "" 686 687 _purge_stale_parsed_data(case_dir, prev_csv_output_dir) 688 _purge_stale_downstream_case_files(case_dir) 689 690 parse_started_event: dict[str, Any] = { 691 "type": "parse_started", 692 "artifacts": parse_artifacts, 693 "analysis_artifacts": analysis_artifacts, 694 "artifact_options": artifact_options, 695 "total_artifacts": len(parse_artifacts), 696 } 697 if analysis_date_range is not None: 698 parse_started_event["analysis_date_range"] = analysis_date_range 699 emit_progress(PARSE_PROGRESS, case_id, parse_started_event) 700 config_snapshot = copy.deepcopy(current_app.config.get("AIFT_CONFIG", {})) 701 from .tasks import run_task_with_case_log_context, run_parse # deferred to avoid circular import 702 threading.Thread( 703 target=run_task_with_case_log_context, 704 args=(case_id, run_parse, case_id, parse_artifacts, analysis_artifacts, artifact_options, config_snapshot), 705 daemon=True, 706 ).start() 707 708 response_payload: dict[str, Any] = { 709 "status": "started", 710 "case_id": case_id, 711 "artifacts": parse_artifacts, 712 "ai_artifacts": analysis_artifacts, 713 "artifact_options": artifact_options, 714 } 715 if analysis_date_range is not None: 716 response_payload["analysis_date_range"] = analysis_date_range 717 response_payload["success"] = True 718 return jsonify(response_payload), 202 719 720 721@artifact_bp.get("/api/cases/<case_id>/parse/progress") 722def stream_parse_progress(case_id: str) -> Response | tuple[Response, int]: 723 """Stream parsing progress events via SSE. 724 725 Args: 726 case_id: UUID of the case. 727 728 Returns: 729 SSE Response, or 404 error. 730 """ 731 if get_case(case_id) is None: 732 return error_response(f"Case not found: {case_id}", 404) 733 return stream_sse(PARSE_PROGRESS, case_id) 734 735 736@artifact_bp.post("/api/cases/<case_id>/parse/cancel") 737def cancel_parse(case_id: str) -> tuple[Response, int]: 738 """Cancel a running parse operation for a case. 739 740 Args: 741 case_id: UUID of the case. 742 743 Returns: 744 ``(Response, 200)`` confirming cancellation, or error. 745 """ 746 if get_case(case_id) is None: 747 return error_response(f"Case not found: {case_id}", 404) 748 cancelled = cancel_progress(PARSE_PROGRESS, case_id) 749 if not cancelled: 750 return error_response("No running parse to cancel.", 409) 751 return success_response({"status": "cancelling", "case_id": case_id}) 752 753 754@artifact_bp.get("/api/artifact-profiles") 755def list_artifact_profiles() -> Response: 756 """List all available artifact profiles. 757 758 Returns: 759 JSON response with the ``profiles`` list. 760 """ 761 config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml"))) 762 profiles_root = resolve_profiles_root(config_path) 763 return success_response({"profiles": compose_profile_response(profiles_root)}) 764 765 766@artifact_bp.post("/api/artifact-profiles") 767def save_artifact_profile() -> Response | tuple[Response, int]: 768 """Create or update a user-defined artifact profile. 769 770 Returns: 771 JSON with saved profile and updated profiles list, or error. 772 """ 773 payload = request.get_json(silent=True) 774 if not isinstance(payload, dict): 775 return error_response("Profile payload must be a JSON object.", 400) 776 777 try: 778 profile_name = normalize_profile_name(payload.get("name")) 779 except ValueError as error: 780 return error_response(str(error), 400) 781 782 try: 783 artifact_options = normalize_artifact_options(payload.get("artifact_options")) 784 except ValueError as error: 785 return error_response(str(error), 400) 786 if not artifact_options: 787 return error_response("Profile must include at least one artifact option.", 400) 788 789 config_path = Path(str(current_app.config.get("AIFT_CONFIG_PATH", "config.yaml"))) 790 profiles_root = resolve_profiles_root(config_path) 791 792 try: 793 profiles = load_profiles_from_directory(profiles_root) 794 profile_key = profile_name.lower() 795 existing = next( 796 ( 797 profile 798 for profile in profiles 799 if str(profile.get("name", "")).strip().lower() == profile_key 800 ), 801 None, 802 ) 803 if existing is not None and bool(existing.get("builtin", False)): 804 return error_response("`recommended` is a built-in profile and cannot be overwritten.", 400) 805 806 if existing is not None: 807 target_path = Path(existing.get("path")) 808 else: 809 target_path = profile_path_for_new_name(profiles_root, profile_name) 810 811 response_profile = { 812 "name": profile_name, 813 "builtin": False, 814 "artifact_options": artifact_options, 815 } 816 write_profile_file(target_path, response_profile) 817 except OSError: 818 LOGGER.exception("Failed to save artifact profile '%s'", profile_name) 819 return error_response( 820 "Failed to save the profile due to a filesystem error. " 821 "Check directory permissions and retry.", 822 500, 823 ) 824 825 return success_response( 826 { 827 "status": "saved", 828 "profile": response_profile, 829 "profiles": compose_profile_response(profiles_root), 830 } 831 )
93def normalize_artifact_mode(value: Any, default_mode: str = MODE_PARSE_AND_AI) -> str: 94 """Normalise an artifact processing mode to a valid constant. 95 96 Args: 97 value: Raw mode value. 98 default_mode: Fallback mode. 99 100 Returns: 101 ``MODE_PARSE_AND_AI`` or ``MODE_PARSE_ONLY``. 102 """ 103 mode = str(value or "").strip().lower() 104 if mode == MODE_PARSE_ONLY: 105 return MODE_PARSE_ONLY 106 if mode == MODE_PARSE_AND_AI: 107 return MODE_PARSE_AND_AI 108 return default_mode
Normalise an artifact processing mode to a valid constant.
Arguments:
- value: Raw mode value.
- default_mode: Fallback mode.
Returns:
MODE_PARSE_AND_AIorMODE_PARSE_ONLY.
133def normalize_artifact_options(payload: Any) -> list[dict[str, str]]: 134 """Normalise a raw artifact options payload into canonical form. 135 136 Accepts lists of strings or dicts with various key names. 137 138 Args: 139 payload: Raw ``artifact_options`` value. 140 141 Returns: 142 List of dicts with ``artifact_key`` and ``mode`` keys. 143 144 Raises: 145 ValueError: If *payload* is not a list. 146 """ 147 if not isinstance(payload, list): 148 raise ValueError("`artifact_options` must be a JSON array.") 149 150 normalized: list[dict[str, str]] = [] 151 seen: set[str] = set() 152 for item in payload: 153 artifact_key = "" 154 mode = MODE_PARSE_AND_AI 155 156 if isinstance(item, str): 157 artifact_key = item.strip() 158 elif isinstance(item, dict): 159 artifact_key = str(item.get("artifact_key") or item.get("key") or "").strip() 160 if "mode" in item: 161 mode = normalize_artifact_mode(item.get("mode")) 162 elif "ai_enabled" in item: 163 mode = MODE_PARSE_AND_AI if bool(item.get("ai_enabled")) else MODE_PARSE_ONLY 164 else: 165 mode = normalize_artifact_mode(item.get("parse_mode"), default_mode=MODE_PARSE_AND_AI) 166 else: 167 continue 168 169 if not artifact_key or artifact_key in seen: 170 continue 171 seen.add(artifact_key) 172 normalized.append({"artifact_key": artifact_key, "mode": mode}) 173 174 return normalized
Normalise a raw artifact options payload into canonical form.
Accepts lists of strings or dicts with various key names.
Arguments:
- payload: Raw
artifact_optionsvalue.
Returns:
List of dicts with
artifact_keyandmodekeys.
Raises:
- ValueError: If payload is not a list.
177def artifact_options_to_lists(artifact_options: list[dict[str, str]]) -> tuple[list[str], list[str]]: 178 """Split normalised artifact options into parse and analysis lists. 179 180 Args: 181 artifact_options: Canonical artifact option dicts. 182 183 Returns: 184 ``(parse_artifacts, analysis_artifacts)`` tuple. 185 """ 186 parse_artifacts: list[str] = [] 187 analysis_artifacts: list[str] = [] 188 for option in artifact_options: 189 artifact_key = str(option.get("artifact_key", "")).strip() 190 if not artifact_key: 191 continue 192 parse_artifacts.append(artifact_key) 193 if normalize_artifact_mode(option.get("mode")) == MODE_PARSE_AND_AI: 194 analysis_artifacts.append(artifact_key) 195 return parse_artifacts, analysis_artifacts
Split normalised artifact options into parse and analysis lists.
Arguments:
- artifact_options: Canonical artifact option dicts.
Returns:
(parse_artifacts, analysis_artifacts)tuple.
221def extract_parse_selection_payload( 222 payload: dict[str, Any], 223) -> tuple[list[dict[str, str]], list[str], list[str]]: 224 """Extract and normalise artifact selection from a parse request payload. 225 226 Supports both ``artifact_options`` (new) and ``artifacts``/``ai_artifacts`` 227 (legacy) formats. 228 229 Args: 230 payload: Parsed JSON body from the parse-start request. 231 232 Returns: 233 ``(artifact_options, parse_artifacts, analysis_artifacts)`` tuple. 234 235 Raises: 236 ValueError: If the payload contains invalid fields. 237 """ 238 if "artifact_options" in payload: 239 artifact_options = normalize_artifact_options(payload.get("artifact_options")) 240 parse_artifacts, analysis_artifacts = artifact_options_to_lists(artifact_options) 241 return artifact_options, parse_artifacts, analysis_artifacts 242 243 artifacts_raw = payload.get("artifacts", []) 244 if not isinstance(artifacts_raw, list): 245 raise ValueError("`artifacts` must be a JSON array.") 246 parse_artifacts = _normalize_string_list(artifacts_raw) 247 248 if "ai_artifacts" in payload: 249 ai_raw = payload.get("ai_artifacts") 250 if not isinstance(ai_raw, list): 251 raise ValueError("`ai_artifacts` must be a JSON array.") 252 selected_set = set(parse_artifacts) 253 analysis_artifacts = [key for key in _normalize_string_list(ai_raw) if key in selected_set] 254 else: 255 analysis_artifacts = list(parse_artifacts) 256 257 artifact_options = _build_artifact_options_from_lists( 258 parse_artifacts=parse_artifacts, 259 analysis_artifacts=analysis_artifacts, 260 ) 261 return artifact_options, parse_artifacts, analysis_artifacts
Extract and normalise artifact selection from a parse request payload.
Supports both artifact_options (new) and artifacts/ai_artifacts
(legacy) formats.
Arguments:
- payload: Parsed JSON body from the parse-start request.
Returns:
(artifact_options, parse_artifacts, analysis_artifacts)tuple.
Raises:
- ValueError: If the payload contains invalid fields.
264def validate_analysis_date_range(payload: Any) -> dict[str, str] | None: 265 """Validate and normalise an optional analysis date range. 266 267 Args: 268 payload: Raw ``analysis_date_range`` value from request JSON. 269 270 Returns: 271 Dict with ``start_date`` and ``end_date``, or ``None``. 272 273 Raises: 274 ValueError: On invalid format or range. 275 """ 276 if payload is None: 277 return None 278 279 if not isinstance(payload, dict): 280 raise ValueError("`analysis_date_range` must be an object.") 281 282 start_raw = payload.get("start_date") 283 end_raw = payload.get("end_date") 284 start_text = str(start_raw).strip() if start_raw is not None else "" 285 end_text = str(end_raw).strip() if end_raw is not None else "" 286 if not start_text and not end_text: 287 return None 288 if not start_text or not end_text: 289 raise ValueError( 290 "Provide both `analysis_date_range.start_date` and `analysis_date_range.end_date`." 291 ) 292 293 try: 294 start_date = datetime.strptime(start_text, "%Y-%m-%d").date() 295 end_date = datetime.strptime(end_text, "%Y-%m-%d").date() 296 except ValueError as error: 297 raise ValueError("Date range values must use YYYY-MM-DD format.") from error 298 299 if start_date > end_date: 300 raise ValueError( 301 "`analysis_date_range.start_date` must be earlier than or equal to `end_date`." 302 ) 303 304 return { 305 "start_date": start_date.isoformat(), 306 "end_date": end_date.isoformat(), 307 }
Validate and normalise an optional analysis date range.
Arguments:
- payload: Raw
analysis_date_rangevalue from request JSON.
Returns:
Dict with
start_dateandend_date, orNone.
Raises:
- ValueError: On invalid format or range.
310def extract_parse_progress(fallback_artifact: str, args: tuple[Any, ...]) -> tuple[str, int]: 311 """Extract artifact key and record count from a parser progress callback. 312 313 Args: 314 fallback_artifact: Default artifact key. 315 args: Positional arguments from the callback. 316 317 Returns: 318 ``(artifact_key, record_count)`` tuple. 319 """ 320 if not args: 321 return fallback_artifact, 0 322 first = args[0] 323 if isinstance(first, dict): 324 return str(first.get("artifact_key", fallback_artifact)), safe_int(first.get("record_count", 0)) 325 if len(args) >= 2: 326 return str(args[0] or fallback_artifact), safe_int(args[1], 0) 327 return fallback_artifact, safe_int(first, 0)
Extract artifact key and record count from a parser progress callback.
Arguments:
- fallback_artifact: Default artifact key.
- args: Positional arguments from the callback.
Returns:
(artifact_key, record_count)tuple.
330def sanitize_prompt(prompt: str, max_chars: int = 2000) -> str: 331 """Normalise and truncate a user prompt for audit logging. 332 333 Args: 334 prompt: Raw user prompt text. 335 max_chars: Maximum character length. Defaults to 2000. 336 337 Returns: 338 Normalised (and possibly truncated) prompt string. 339 """ 340 normalized = " ".join(prompt.split()) 341 if len(normalized) <= max_chars: 342 return normalized 343 return f"{normalized[:max_chars]}... [truncated]"
Normalise and truncate a user prompt for audit logging.
Arguments:
- prompt: Raw user prompt text.
- max_chars: Maximum character length. Defaults to 2000.
Returns:
Normalised (and possibly truncated) prompt string.
374def resolve_profiles_root(config_path: str | Path) -> Path: 375 """Resolve the directory where artifact profiles are stored. 376 377 Args: 378 config_path: Path to the AIFT configuration file. 379 380 Returns: 381 Absolute ``Path`` to the profiles directory. 382 """ 383 return Path(config_path).parent / PROFILE_DIRNAME
Resolve the directory where artifact profiles are stored.
Arguments:
- config_path: Path to the AIFT configuration file.
Returns:
Absolute
Pathto the profiles directory.
552def compose_profile_response(profiles_root: Path) -> list[dict[str, Any]]: 553 """Build the API response payload for all artifact profiles. 554 555 Args: 556 profiles_root: Directory containing profile files. 557 558 Returns: 559 List of dicts with ``name``, ``builtin``, and ``artifact_options``. 560 """ 561 return [ 562 { 563 "name": str(profile.get("name", "")).strip(), 564 "builtin": bool(profile.get("builtin", False)), 565 "artifact_options": list(profile.get("artifact_options", [])), 566 } 567 for profile in load_profiles_from_directory(profiles_root) 568 ]
Build the API response payload for all artifact profiles.
Arguments:
- profiles_root: Directory containing profile files.
Returns:
List of dicts with
name,builtin, andartifact_options.
472def load_profiles_from_directory(profiles_root: Path) -> list[dict[str, Any]]: 473 """Load all valid artifact profiles from the profiles directory. 474 475 Args: 476 profiles_root: Directory containing profile JSON files. 477 478 Returns: 479 Sorted list of validated profile dicts. 480 """ 481 profiles_root.mkdir(parents=True, exist_ok=True) 482 _ensure_recommended_profile(profiles_root) 483 484 profiles: list[dict[str, Any]] = [] 485 seen_names: set[str] = set() 486 for path in sorted(profiles_root.glob(f"*{PROFILE_FILE_SUFFIX}"), key=lambda item: item.name.lower()): 487 profile = _load_profile_file(path) 488 if profile is None: 489 continue 490 profile_key = str(profile.get("name", "")).strip().lower() 491 if not profile_key or profile_key in seen_names: 492 continue 493 seen_names.add(profile_key) 494 profiles.append(profile) 495 496 profiles.sort( 497 key=lambda item: ( 498 0 if str(item.get("name", "")).strip().lower() == BUILTIN_RECOMMENDED_PROFILE else 1, 499 str(item.get("name", "")).strip().lower(), 500 ) 501 ) 502 return profiles
Load all valid artifact profiles from the profiles directory.
Arguments:
- profiles_root: Directory containing profile JSON files.
Returns:
Sorted list of validated profile dicts.
528def normalize_profile_name(value: Any) -> str: 529 """Validate and normalise a profile name from user input. 530 531 Args: 532 value: Raw profile name. 533 534 Returns: 535 Stripped, validated profile name. 536 537 Raises: 538 ValueError: If the name is empty, reserved, or invalid. 539 """ 540 name = str(value or "").strip() 541 if not name: 542 raise ValueError("Profile name is required.") 543 if name.lower() == BUILTIN_RECOMMENDED_PROFILE: 544 raise ValueError("`recommended` is a built-in profile and cannot be overwritten.") 545 if not PROFILE_NAME_RE.fullmatch(name): 546 raise ValueError( 547 "Profile name must be 1-64 chars and use letters, numbers, spaces, period, underscore, or hyphen." 548 ) 549 return name
Validate and normalise a profile name from user input.
Arguments:
- value: Raw profile name.
Returns:
Stripped, validated profile name.
Raises:
- ValueError: If the name is empty, reserved, or invalid.
505def profile_path_for_new_name(profiles_root: Path, profile_name: str) -> Path: 506 """Compute a unique file path for a new artifact profile. 507 508 Args: 509 profiles_root: Directory for profile files. 510 profile_name: Human-readable profile name. 511 512 Returns: 513 A non-existent ``Path`` suitable for writing. 514 """ 515 stem = safe_name(profile_name.lower(), fallback="profile") 516 candidate = profiles_root / f"{stem}{PROFILE_FILE_SUFFIX}" 517 if not candidate.exists(): 518 return candidate 519 520 index = 1 521 while True: 522 candidate = profiles_root / f"{stem}_{index}{PROFILE_FILE_SUFFIX}" 523 if not candidate.exists(): 524 return candidate 525 index += 1
Compute a unique file path for a new artifact profile.
Arguments:
- profiles_root: Directory for profile files.
- profile_name: Human-readable profile name.
Returns:
A non-existent
Pathsuitable for writing.
399def write_profile_file(path: Path, payload: dict[str, Any]) -> None: 400 """Write an artifact profile to a JSON file. 401 402 Args: 403 path: Destination file path. 404 payload: Profile data to serialise. 405 """ 406 path.parent.mkdir(parents=True, exist_ok=True) 407 content = json.dumps(payload, indent=2, ensure_ascii=True) 408 path.write_text(f"{content}\n", encoding="utf-8")
Write an artifact profile to a JSON file.
Arguments:
- path: Destination file path.
- payload: Profile data to serialise.