app.routes.evidence

Evidence intake, archive extraction, CSV/hash helpers, and route handlers.

This module handles all evidence-related logic: uploading files, resolving paths, extracting ZIP/tar/7z archives, computing and verifying hashes, collecting parsed CSV paths, reading audit log entries, and the Flask route handlers for evidence intake, report generation, and CSV bundle downloads.

Attributes:
  • EWF_SEGMENT_RE: Compiled regex for EWF split segment filenames.
  • SPLIT_RAW_SEGMENT_RE: Compiled regex for split raw disk image segments.
  • evidence_bp: Flask Blueprint for evidence-related routes.
   1"""Evidence intake, archive extraction, CSV/hash helpers, and route handlers.
   2
   3This module handles all evidence-related logic: uploading files, resolving
   4paths, extracting ZIP/tar/7z archives, computing and verifying hashes,
   5collecting parsed CSV paths, reading audit log entries, and the Flask route
   6handlers for evidence intake, report generation, and CSV bundle downloads.
   7
   8Attributes:
   9    EWF_SEGMENT_RE: Compiled regex for EWF split segment filenames.
  10    SPLIT_RAW_SEGMENT_RE: Compiled regex for split raw disk image segments.
  11    evidence_bp: Flask Blueprint for evidence-related routes.
  12"""
  13
  14from __future__ import annotations
  15
  16import json
  17import logging
  18import re
  19import shutil
  20import tarfile
  21import time
  22import uuid
  23from datetime import datetime, timezone
  24from pathlib import Path
  25from typing import Any, Callable
  26from zipfile import BadZipFile, ZipFile, ZIP_DEFLATED
  27
  28import py7zr
  29
  30from flask import Blueprint, Response, current_app, request, send_file
  31from werkzeug.utils import secure_filename
  32
  33from ..hasher import compute_hashes, verify_hash
  34from ..parser import ForensicParser
  35from ..reporter import ReportGenerator
  36
  37from .state import (
  38    ANALYSIS_PROGRESS,
  39    CASES_ROOT,
  40    CHAT_PROGRESS,
  41    PARSE_PROGRESS,
  42    PROJECT_ROOT,
  43    STATE_LOCK,
  44    error_response,
  45    get_case,
  46    mark_case_status,
  47    safe_name,
  48    success_response,
  49)
  50
  51__all__ = [
  52    "EWF_SEGMENT_RE",
  53    "SPLIT_RAW_SEGMENT_RE",
  54    "evidence_bp",
  55    "resolve_evidence_payload",
  56    "resolve_hash_verification_path",
  57    "resolve_case_csv_output_dir",
  58    "collect_case_csv_paths",
  59    "build_csv_map",
  60    "read_audit_entries",
  61    "generate_case_report",
  62]
  63
  64LOGGER = logging.getLogger(__name__)
  65
  66EWF_SEGMENT_RE = re.compile(r"^(?P<base>.+)\.(?:e|ex|s|l)(?P<segment>\d{2})$", re.IGNORECASE)
  67SPLIT_RAW_SEGMENT_RE = re.compile(r"^(?P<base>.+)\.(?P<segment>\d{3})$")
  68
  69# Extensions for evidence files we look for inside extracted archives.
  70_EVIDENCE_FILE_EXTENSIONS = frozenset({
  71    ".e01", ".ex01", ".s01", ".l01",
  72    ".dd", ".img", ".raw", ".bin", ".iso",
  73    ".vmdk", ".vhd", ".vhdx", ".vdi", ".qcow2", ".hdd", ".hds",
  74    ".vmx", ".vbox", ".vmcx", ".ovf", ".ova",
  75    ".asdf", ".asif", ".ad1",
  76    ".000", ".001",
  77})
  78
  79
  80# ---------------------------------------------------------------------------
  81# Archive extraction
  82# ---------------------------------------------------------------------------
  83
  84def _extract_archive_members(
  85    destination: Path,
  86    members: list[tuple[str, Any]],
  87    *,
  88    empty_message: str,
  89    unsafe_paths_message: str,
  90    no_files_message: str,
  91    extract_member: Callable[[Any, Path], None] | None = None,
  92    extract_all_members: Callable[[list[tuple[Any, Path]]], None] | None = None,
  93) -> Path:
  94    """Extract archive members safely and return the best Dissect target path.
  95
  96    Validates path traversal, extracts, then locates the best evidence file.
  97    Exactly one of *extract_member* or *extract_all_members* must be provided.
  98
  99    Args:
 100        destination: Root directory to extract into.
 101        members: List of ``(member_name, member_object)`` tuples.
 102        empty_message: Error for empty archives.
 103        unsafe_paths_message: Error for path traversal.
 104        no_files_message: Error when extraction produces no files.
 105        extract_member: Callback to extract a single member.
 106        extract_all_members: Callback to extract all members at once.
 107
 108    Returns:
 109        Path to the best evidence file or extraction directory.
 110
 111    Raises:
 112        ValueError: On empty, unsafe, or failed extraction.
 113    """
 114    if (extract_member is None) == (extract_all_members is None):
 115        raise ValueError("Exactly one extraction callback must be provided.")
 116
 117    if destination.exists():
 118        shutil.rmtree(destination)
 119    destination.mkdir(parents=True, exist_ok=True)
 120    root = destination.resolve()
 121
 122    if not members:
 123        raise ValueError(empty_message)
 124
 125    validated_members: list[tuple[Any, Path]] = []
 126    for member_name, member in members:
 127        member_path = Path(member_name)
 128        if member_path.is_absolute() or ".." in member_path.parts:
 129            raise ValueError(unsafe_paths_message)
 130        target = (root / member_path).resolve()
 131        if not target.is_relative_to(root):
 132            raise ValueError(unsafe_paths_message)
 133        target.parent.mkdir(parents=True, exist_ok=True)
 134        validated_members.append((member, target))
 135
 136    if extract_all_members is not None:
 137        extract_all_members(validated_members)
 138    else:
 139        for member, target in validated_members:
 140            extract_member(member, target)
 141
 142    files = sorted(path for path in destination.rglob("*") if path.is_file())
 143    if not files:
 144        raise ValueError(no_files_message)
 145    evidence_files = [
 146        path for path in files if path.suffix.lower() in _EVIDENCE_FILE_EXTENSIONS
 147    ]
 148    if evidence_files:
 149        for ef in evidence_files:
 150            if ef.suffix.lower() == ".e01":
 151                return ef
 152        return evidence_files[0]
 153
 154    top_level_entries: set[str] = set()
 155    has_top_level_file = False
 156    for file_path in files:
 157        relative_parts = file_path.relative_to(destination).parts
 158        if not relative_parts:
 159            continue
 160        top_level_entries.add(relative_parts[0])
 161        if len(relative_parts) == 1:
 162            has_top_level_file = True
 163
 164    if not has_top_level_file and len(top_level_entries) == 1:
 165        wrapper_dir = destination / sorted(top_level_entries)[0]
 166        if wrapper_dir.is_dir():
 167            return wrapper_dir
 168
 169    return destination
 170
 171
 172def _extract_zip(zip_path: Path, destination: Path) -> Path:
 173    """Extract a ZIP archive and return the best Dissect target path.
 174
 175    Args:
 176        zip_path: Path to the ZIP file.
 177        destination: Directory to extract into.
 178
 179    Returns:
 180        Path to the best evidence file or directory.
 181
 182    Raises:
 183        ValueError: If the ZIP is invalid, empty, or contains unsafe paths.
 184    """
 185    try:
 186        with ZipFile(zip_path, "r") as archive:
 187            members = [(member.filename, member) for member in archive.infolist() if not member.is_dir()]
 188
 189            def _extract_member(member: Any, target: Path) -> None:
 190                """Extract a single ZIP member to the target path."""
 191                with archive.open(member, "r") as src, target.open("wb") as dst:
 192                    shutil.copyfileobj(src, dst)
 193            return _extract_archive_members(
 194                destination,
 195                members,
 196                empty_message="Evidence ZIP is empty.",
 197                unsafe_paths_message="Archive rejected: contains unsafe file paths",
 198                no_files_message="Evidence ZIP extraction produced no files.",
 199                extract_member=_extract_member,
 200            )
 201    except BadZipFile as error:
 202        raise ValueError(f"Invalid ZIP evidence file: {zip_path.name}") from error
 203
 204
 205def _extract_tar(tar_path: Path, destination: Path) -> Path:
 206    """Extract a tar archive and return the best Dissect target path.
 207
 208    Args:
 209        tar_path: Path to the tar file.
 210        destination: Directory to extract into.
 211
 212    Returns:
 213        Path to the best evidence file or directory.
 214
 215    Raises:
 216        ValueError: If the tar is invalid, empty, or contains unsafe paths.
 217    """
 218    try:
 219        with tarfile.open(tar_path, "r:*") as archive:
 220            raw_members = archive.getmembers()
 221            for member in raw_members:
 222                if member.islnk() or member.issym():
 223                    raise ValueError("Archive rejected: contains unsafe file paths")
 224            members = [(member.name, member) for member in raw_members if member.isfile()]
 225
 226            def _extract_member(member: Any, target: Path) -> None:
 227                """Extract a single tar member to the target path."""
 228                src = archive.extractfile(member)
 229                if src is None:
 230                    return
 231                with src, target.open("wb") as dst:
 232                    shutil.copyfileobj(src, dst)
 233            return _extract_archive_members(
 234                destination,
 235                members,
 236                empty_message="Evidence tar archive is empty.",
 237                unsafe_paths_message="Archive rejected: contains unsafe file paths",
 238                no_files_message="Evidence tar extraction produced no files.",
 239                extract_member=_extract_member,
 240            )
 241    except tarfile.TarError as error:
 242        raise ValueError(f"Invalid tar evidence file: {tar_path.name}") from error
 243
 244
 245def _extract_7z(archive_path: Path, destination: Path) -> Path:
 246    """Extract a 7z archive and return the best Dissect target path.
 247
 248    Args:
 249        archive_path: Path to the 7z file.
 250        destination: Directory to extract into.
 251
 252    Returns:
 253        Path to the best evidence file or directory.
 254
 255    Raises:
 256        ValueError: If the 7z is invalid, empty, or contains unsafe paths.
 257    """
 258    try:
 259        with py7zr.SevenZipFile(archive_path, mode="r") as archive:
 260            members = [(name, name) for name in archive.getnames() if not name.endswith("/")]
 261
 262            def _extract_members(validated: list[tuple[Any, Path]]) -> None:
 263                """Extract 7z members via temp directory for path-traversal safety."""
 264                import tempfile
 265                with tempfile.TemporaryDirectory() as tmpdir:
 266                    tmp = Path(tmpdir)
 267                    archive.extractall(path=tmp)
 268                    for member_name, target in validated:
 269                        src = tmp / member_name
 270                        if src.is_file():
 271                            target.parent.mkdir(parents=True, exist_ok=True)
 272                            shutil.copy2(src, target)
 273
 274            return _extract_archive_members(
 275                destination,
 276                members,
 277                empty_message="Evidence 7z archive is empty.",
 278                unsafe_paths_message="Archive rejected: contains unsafe file paths",
 279                no_files_message="Evidence 7z extraction produced no files.",
 280                extract_all_members=_extract_members,
 281            )
 282    except py7zr.Bad7zFile as error:
 283        raise ValueError(f"Invalid 7z evidence file: {archive_path.name}") from error
 284
 285
 286# ---------------------------------------------------------------------------
 287# Upload / path resolution
 288# ---------------------------------------------------------------------------
 289
 290def _collect_uploaded_files() -> list[Any]:
 291    """Collect all uploaded ``FileStorage`` objects from the current request.
 292
 293    Returns:
 294        A list of ``FileStorage`` objects with non-empty filenames.
 295    """
 296    uploaded: list[Any] = []
 297    for key in request.files:
 298        for file_storage in request.files.getlist(key):
 299            if file_storage and file_storage.filename:
 300                uploaded.append(file_storage)
 301    return uploaded
 302
 303
 304_SAVE_CHUNK_SIZE = 4 * 1024 * 1024  # 4 MiB
 305
 306
 307def _save_with_limit(
 308    file_storage: Any,
 309    dest: Path,
 310    max_bytes: int,
 311    cumulative: int,
 312) -> int:
 313    """Stream-save an uploaded file, enforcing an optional size limit.
 314
 315    Args:
 316        file_storage: Werkzeug ``FileStorage`` to save.
 317        dest: Destination path on disk.
 318        max_bytes: Maximum allowed total bytes across all files (0 = unlimited).
 319        cumulative: Bytes already written by prior files in this upload batch.
 320
 321    Returns:
 322        Updated cumulative byte count after this file.
 323
 324    Raises:
 325        ValueError: If the cumulative size exceeds *max_bytes*.
 326    """
 327    if max_bytes <= 0:
 328        file_storage.save(dest)
 329        return cumulative + dest.stat().st_size
 330
 331    written = 0
 332    stream = file_storage.stream
 333    with open(dest, "wb") as out:
 334        while True:
 335            chunk = stream.read(_SAVE_CHUNK_SIZE)
 336            if not chunk:
 337                break
 338            written += len(chunk)
 339            if cumulative + written > max_bytes:
 340                out.close()
 341                dest.unlink(missing_ok=True)
 342                limit_gb = max_bytes / (1024 * 1024 * 1024)
 343                raise ValueError(
 344                    f"Upload exceeds the Evidence Size Threshold "
 345                    f"({limit_gb:.1f} GB). Use path mode instead, or "
 346                    f"increase the threshold in Settings \u2192 Advanced."
 347                )
 348            out.write(chunk)
 349    return cumulative + written
 350
 351
 352def _unique_destination(path: Path) -> Path:
 353    """Generate a unique file path by appending a numeric suffix if needed.
 354
 355    Args:
 356        path: Desired file path.
 357
 358    Returns:
 359        A ``Path`` guaranteed not to exist on disk.
 360    """
 361    if not path.exists():
 362        return path
 363    counter = 1
 364    while True:
 365        candidate = path.with_name(f"{path.stem}_{counter}{path.suffix}")
 366        if not candidate.exists():
 367            return candidate
 368        counter += 1
 369
 370
 371def _segment_identity(path_or_name: Path | str) -> tuple[str, str, int] | None:
 372    """Parse split-image segment identity from a filename.
 373
 374    Args:
 375        path_or_name: Path or filename to inspect.
 376
 377    Returns:
 378        ``(kind, base_name, segment_number)`` for known split-image naming
 379        schemes, or ``None`` if the name is not a recognized segment.
 380    """
 381    name = Path(path_or_name).name if isinstance(path_or_name, Path) else str(path_or_name)
 382    for kind, pattern in (("ewf", EWF_SEGMENT_RE), ("raw", SPLIT_RAW_SEGMENT_RE)):
 383        match = pattern.match(name)
 384        if match is not None:
 385            return kind, match.group("base").lower(), int(match.group("segment"))
 386    return None
 387
 388
 389def _collect_segment_group_paths(source_path: Path) -> list[Path]:
 390    """Collect all sibling segment paths for a split-image source file.
 391
 392    Args:
 393        source_path: Candidate source evidence file.
 394
 395    Returns:
 396        Sorted list of sibling segment paths for the same split-image set, or
 397        an empty list when the path is not a recognized split-image segment.
 398    """
 399    if not source_path.is_file():
 400        return []
 401
 402    identity = _segment_identity(source_path)
 403    if identity is None:
 404        return []
 405
 406    kind, base_name, _segment_number = identity
 407    segment_paths: list[tuple[int, Path]] = []
 408    try:
 409        siblings = source_path.parent.iterdir()
 410    except OSError:
 411        return [source_path]
 412
 413    for sibling in siblings:
 414        if not sibling.is_file():
 415            continue
 416        sibling_identity = _segment_identity(sibling)
 417        if sibling_identity is None:
 418            continue
 419        sibling_kind, sibling_base_name, sibling_segment_number = sibling_identity
 420        if sibling_kind == kind and sibling_base_name == base_name:
 421            segment_paths.append((sibling_segment_number, sibling))
 422
 423    if not segment_paths:
 424        return [source_path]
 425    return [path for _segment_number, path in sorted(segment_paths, key=lambda item: item[0])]
 426
 427
 428def _resolve_uploaded_dissect_path(uploaded_paths: list[Path]) -> Path:
 429    """Determine the primary Dissect target path from uploaded files.
 430
 431    Handles single files, split EWF/raw segment sets, and rejects mixed
 432    archive-plus-segment uploads.
 433
 434    Args:
 435        uploaded_paths: List of uploaded evidence file paths.
 436
 437    Returns:
 438        The ``Path`` to pass to Dissect's ``Target.open()``.
 439
 440    Raises:
 441        ValueError: If no files uploaded or archive mixed with segments.
 442    """
 443    if not uploaded_paths:
 444        raise ValueError("No uploaded evidence files were provided.")
 445
 446    if len(uploaded_paths) == 1:
 447        return uploaded_paths[0]
 448
 449    archive_exts = {".zip", ".tar", ".gz", ".tgz", ".7z"}
 450    archive_paths = [path for path in uploaded_paths if path.suffix.lower() in archive_exts]
 451    if archive_paths and len(uploaded_paths) > 1:
 452        raise ValueError("Upload either one archive file or raw evidence segments, not both.")
 453
 454    segment_groups: dict[tuple[str, str], list[tuple[int, Path]]] = {}
 455    for path in uploaded_paths:
 456        identity = _segment_identity(path)
 457        if identity is None:
 458            continue
 459        kind, base_name, segment_number = identity
 460        segment_groups.setdefault((kind, base_name), []).append((segment_number, path))
 461
 462    if segment_groups:
 463        if len(segment_groups) > 1:
 464            group_names = sorted({base_name for _kind, base_name in segment_groups})
 465            raise ValueError(
 466                "Ambiguous upload: multiple segment groups detected "
 467                f"({', '.join(group_names)}). "
 468                "Upload only one split segment set at a time."
 469            )
 470        only_group = next(iter(segment_groups.values()))
 471        return min(only_group, key=lambda item: item[0])[1]
 472
 473    # Multiple files that are neither a single archive nor a recognized
 474    # segment set — reject rather than silently analyzing only the first.
 475    raise ValueError(
 476        "Ambiguous upload: multiple files were provided but they do not "
 477        "form a recognized segment set. Upload a single evidence file, "
 478        "one archive, or a complete split-image segment set."
 479    )
 480
 481
 482def _normalize_user_path(value: str) -> str:
 483    """Strip surrounding quotes and whitespace from a user-supplied path.
 484
 485    Args:
 486        value: Raw path string.
 487
 488    Returns:
 489        Cleaned path string.
 490    """
 491    return (
 492        str(value)
 493        .replace('"', "")
 494        .replace("\u201c", "")
 495        .replace("\u201d", "")
 496        .strip()
 497    )
 498
 499
 500def _make_extract_dir(evidence_dir: Path, source_path: Path) -> Path:
 501    """Build a unique extraction directory path for an archive.
 502
 503    Args:
 504        evidence_dir: Parent evidence directory.
 505        source_path: Path to the archive being extracted.
 506
 507    Returns:
 508        A timestamped extraction directory path.
 509    """
 510    return evidence_dir / f"extracted_{safe_name(source_path.stem, 'evidence')}_{uuid.uuid4().hex[:12]}"
 511
 512
 513def resolve_evidence_payload(case_dir: Path) -> dict[str, Any]:
 514    """Resolve the evidence source from the current request.
 515
 516    Handles upload and JSON path reference modes. Archives are extracted.
 517
 518    Args:
 519        case_dir: Path to the case's root directory.
 520
 521    Returns:
 522        Dict with ``mode``, ``filename``, ``source_path``, ``stored_path``,
 523        ``dissect_path``, and ``uploaded_files``.
 524
 525    Raises:
 526        ValueError: If no evidence provided or archive extraction fails.
 527        FileNotFoundError: If the referenced path does not exist.
 528    """
 529    evidence_dir = case_dir / "evidence"
 530    evidence_dir.mkdir(parents=True, exist_ok=True)
 531
 532    uploaded_files = _collect_uploaded_files()
 533    uploaded_paths: list[Path] = []
 534    if uploaded_files:
 535        aift_config = current_app.config.get("AIFT_CONFIG", {})
 536        threshold_mb = aift_config.get("evidence", {}).get("large_file_threshold_mb", 0)
 537        max_bytes = int(threshold_mb) * 1024 * 1024 if threshold_mb and threshold_mb > 0 else 0
 538        cumulative_bytes = 0
 539        timestamp = int(time.time())
 540        for index, uploaded_file in enumerate(uploaded_files, start=1):
 541            filename = secure_filename(uploaded_file.filename) or f"evidence_{timestamp}_{index}.bin"
 542            stored_path = _unique_destination(evidence_dir / filename)
 543            cumulative_bytes = _save_with_limit(uploaded_file, stored_path, max_bytes, cumulative_bytes)
 544            uploaded_paths.append(stored_path)
 545
 546        source_path = _resolve_uploaded_dissect_path(uploaded_paths)
 547        mode = "upload"
 548    else:
 549        payload = request.get_json(silent=True) or {}
 550        if not isinstance(payload, dict):
 551            raise ValueError("Request body must be a JSON object.")
 552        path_value = payload.get("path")
 553        if not isinstance(path_value, str):
 554            raise ValueError(
 555                "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}."
 556            )
 557        normalized_path = _normalize_user_path(path_value)
 558        if not normalized_path:
 559            raise ValueError(
 560                "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}."
 561            )
 562        source_path = Path(normalized_path).expanduser()
 563        if not source_path.exists():
 564            raise FileNotFoundError(f"Evidence path does not exist: {source_path}")
 565        if not source_path.is_file() and not source_path.is_dir():
 566            raise ValueError(f"Evidence path is not a file or directory: {source_path}")
 567        uploaded_paths = []
 568        mode = "path"
 569
 570    # Extract archives into the evidence directory.
 571    _ARCHIVE_EXTRACTORS = {
 572        ".zip": _extract_zip,
 573        ".tar": _extract_tar,
 574        ".gz": _extract_tar,
 575        ".tgz": _extract_tar,
 576        ".7z": _extract_7z,
 577    }
 578    dissect_path = source_path
 579    suffix = source_path.suffix.lower()
 580    extractor = _ARCHIVE_EXTRACTORS.get(suffix)
 581    if source_path.is_file() and extractor is not None:
 582        extract_dir = _make_extract_dir(evidence_dir, source_path)
 583        dissect_path = extractor(source_path, extract_dir)
 584
 585    # Determine the files to hash for integrity verification.
 586    # Archives are intentionally verified as the original container file.
 587    # Split-image uploads hash all uploaded segments, and path-based split
 588    # images hash all matching sibling segments on disk. Directories get N/A.
 589    if source_path.is_file() and len(uploaded_paths) > 1:
 590        evidence_files_to_hash = sorted(set(str(p) for p in uploaded_paths))
 591    elif source_path.is_file():
 592        segment_paths = _collect_segment_group_paths(source_path)
 593        evidence_files_to_hash = [str(path) for path in segment_paths] if segment_paths else [str(source_path)]
 594    else:
 595        evidence_files_to_hash = []
 596
 597    return {
 598        "mode": mode,
 599        "filename": source_path.name,
 600        "source_path": str(source_path),
 601        "stored_path": str(source_path) if mode == "upload" else "",
 602        "dissect_path": str(dissect_path),
 603        "uploaded_files": [str(path) for path in uploaded_paths],
 604        "evidence_files_to_hash": evidence_files_to_hash,
 605    }
 606
 607
 608# ---------------------------------------------------------------------------
 609# Hash / CSV / audit helpers
 610# ---------------------------------------------------------------------------
 611
 612def resolve_hash_verification_path(case: dict[str, Any]) -> Path | None:
 613    """Resolve the file path for evidence hash verification.
 614
 615    Args:
 616        case: The in-memory case state dictionary.
 617
 618    Returns:
 619        Path to the evidence file, or ``None``.
 620    """
 621    source_path = str(case.get("source_path", "")).strip()
 622    if source_path:
 623        return Path(source_path)
 624    evidence_path = str(case.get("evidence_path", "")).strip()
 625    if evidence_path:
 626        return Path(evidence_path)
 627    return None
 628
 629
 630def resolve_case_csv_output_dir(case: dict[str, Any], config_snapshot: dict[str, Any]) -> Path:
 631    """Resolve the output directory for parsed CSV files.
 632
 633    Args:
 634        case: The in-memory case state dictionary.
 635        config_snapshot: Application configuration snapshot.
 636
 637    Returns:
 638        Absolute ``Path`` to the CSV output directory.
 639    """
 640    config = config_snapshot if isinstance(config_snapshot, dict) else {}
 641    evidence_config = config.get("evidence", {}) if isinstance(config, dict) else {}
 642    configured = str(evidence_config.get("csv_output_dir", "")).strip() if isinstance(evidence_config, dict) else ""
 643    case_dir = Path(case["case_dir"])
 644    case_id = str(case.get("case_id", "")).strip()
 645
 646    if not configured:
 647        return case_dir / "parsed"
 648
 649    output_root = Path(configured).expanduser()
 650    if not output_root.is_absolute():
 651        output_root = (PROJECT_ROOT / output_root).resolve()
 652    if case_id:
 653        return output_root / case_id / "parsed"
 654    return output_root / "parsed"
 655
 656
 657def collect_case_csv_paths(case: dict[str, Any]) -> list[Path]:
 658    """Collect all parsed CSV file paths for a case.
 659
 660    Args:
 661        case: The in-memory case state dictionary.
 662
 663    Returns:
 664        A sorted list of existing CSV file paths.
 665    """
 666    collected: list[Path] = []
 667    seen: set[str] = set()
 668
 669    def _add_path(candidate: Any) -> None:
 670        """Add a CSV path if it exists and is not a duplicate."""
 671        path_text = str(candidate or "").strip()
 672        if not path_text:
 673            return
 674        path = Path(path_text)
 675        if not path.exists() or not path.is_file():
 676            return
 677        key = str(path.resolve())
 678        if key in seen:
 679            return
 680        seen.add(key)
 681        collected.append(path)
 682
 683    csv_map = case.get("artifact_csv_paths")
 684    if isinstance(csv_map, dict):
 685        for csv_path in csv_map.values():
 686            if isinstance(csv_path, list):
 687                for p in csv_path:
 688                    _add_path(p)
 689            else:
 690                _add_path(csv_path)
 691
 692    parse_results = case.get("parse_results")
 693    if isinstance(parse_results, list):
 694        for result in parse_results:
 695            if not isinstance(result, dict) or not result.get("success"):
 696                continue
 697            _add_path(result.get("csv_path"))
 698            csv_paths = result.get("csv_paths")
 699            if isinstance(csv_paths, list):
 700                for path in csv_paths:
 701                    _add_path(path)
 702
 703    if collected:
 704        return sorted(collected, key=lambda path: path.name.lower())
 705
 706    parsed_dir = Path(case["case_dir"]) / "parsed"
 707    return sorted(path for path in parsed_dir.glob("*.csv") if path.is_file())
 708
 709
 710def build_csv_map(parse_results: list[dict[str, Any]]) -> dict[str, str | list[str]]:
 711    """Build a mapping of artifact keys to their parsed CSV file paths.
 712
 713    Split artifacts (e.g. EVTX) that produce multiple CSV files are
 714    represented as a ``list[str]`` value.  Single-file artifacts remain
 715    a plain ``str`` so existing callers are unaffected.
 716
 717    Args:
 718        parse_results: List of per-artifact parse result dicts.
 719
 720    Returns:
 721        Dict mapping artifact key strings to a single CSV path string
 722        or a list of CSV path strings for split artifacts.
 723    """
 724    mapping: dict[str, str | list[str]] = {}
 725    for result in parse_results:
 726        artifact = str(result.get("artifact_key", "")).strip()
 727        if not artifact or not result.get("success"):
 728            continue
 729        csv_paths = result.get("csv_paths")
 730        if isinstance(csv_paths, list) and csv_paths:
 731            non_empty = [str(p) for p in csv_paths if str(p).strip()]
 732            if len(non_empty) > 1:
 733                mapping[artifact] = non_empty
 734                continue
 735            if non_empty:
 736                mapping[artifact] = non_empty[0]
 737                continue
 738        csv_path = str(result.get("csv_path", "")).strip()
 739        if csv_path:
 740            mapping[artifact] = csv_path
 741    return mapping
 742
 743
 744def read_audit_entries(case_dir: Path) -> list[dict[str, Any]]:
 745    """Read all audit log entries from a case's ``audit.jsonl`` file.
 746
 747    Args:
 748        case_dir: Path to the case's root directory.
 749
 750    Returns:
 751        A list of parsed audit entry dicts, or empty list if missing.
 752    """
 753    audit_path = case_dir / "audit.jsonl"
 754    if not audit_path.exists():
 755        return []
 756    entries: list[dict[str, Any]] = []
 757    with audit_path.open("r", encoding="utf-8", errors="replace") as stream:
 758        for line in stream:
 759            text = line.strip()
 760            if not text:
 761                continue
 762            try:
 763                parsed = json.loads(text)
 764            except json.JSONDecodeError:
 765                continue
 766            if isinstance(parsed, dict):
 767                entries.append(parsed)
 768    return entries
 769
 770
 771# ---------------------------------------------------------------------------
 772# Cleanup helpers
 773# ---------------------------------------------------------------------------
 774
 775
 776def _cleanup_parsed_output(case_dir: Path, prev_csv_output_dir: str) -> None:
 777    """Remove stale parsed CSV output from a previous parse run.
 778
 779    Handles both the default ``case_dir/parsed`` location and external
 780    directories configured via ``evidence.csv_output_dir``.  Only the
 781    case-specific parsed directory is removed — parent directories and
 782    unrelated paths are never touched.
 783
 784    Args:
 785        case_dir: Path to the case's root directory.
 786        prev_csv_output_dir: The ``csv_output_dir`` value stored from the
 787            previous parse run (may be empty).
 788    """
 789    if not prev_csv_output_dir:
 790        return
 791
 792    prev_path = Path(prev_csv_output_dir)
 793
 794    # Nothing to do if the directory doesn't exist.
 795    if not prev_path.is_dir():
 796        return
 797
 798    resolved_prev = prev_path.resolve()
 799    resolved_case = case_dir.resolve()
 800
 801    # If the previous output dir is inside the case directory, the normal
 802    # ``case_dir/parsed`` cleanup already handles it — skip.
 803    try:
 804        if resolved_prev.is_relative_to(resolved_case):
 805            return
 806    except (TypeError, ValueError):
 807        return
 808
 809    # Safety: refuse to delete filesystem roots or very short paths that
 810    # could indicate misconfiguration.
 811    if resolved_prev == resolved_prev.root or resolved_prev == resolved_prev.anchor:
 812        LOGGER.warning(
 813            "Refusing to remove parsed output at filesystem root: %s",
 814            resolved_prev,
 815        )
 816        return
 817    if len(resolved_prev.parts) <= 2:
 818        LOGGER.warning(
 819            "Refusing to remove parsed output with suspiciously short path: %s",
 820            resolved_prev,
 821        )
 822        return
 823
 824    LOGGER.info("Removing stale external parsed output: %s", resolved_prev)
 825    shutil.rmtree(resolved_prev, ignore_errors=True)
 826
 827
 828# ---------------------------------------------------------------------------
 829# Route handlers
 830# ---------------------------------------------------------------------------
 831
 832evidence_bp = Blueprint("evidence", __name__)
 833
 834
 835@evidence_bp.post("/api/cases/<case_id>/evidence")
 836def intake_evidence(case_id: str) -> Response | tuple[Response, int]:
 837    """Ingest evidence for an existing case.
 838
 839    Args:
 840        case_id: UUID of the case.
 841
 842    Returns:
 843        JSON with evidence metadata, hashes, and available artifacts.
 844    """
 845    case = get_case(case_id)
 846    if case is None:
 847        return error_response(f"Case not found: {case_id}", 404)
 848
 849    with STATE_LOCK:
 850        case_dir = case["case_dir"]
 851        audit_logger = case["audit"]
 852
 853    try:
 854        evidence_payload = resolve_evidence_payload(case_dir)
 855        source_path = Path(evidence_payload["source_path"])
 856        dissect_path = Path(evidence_payload["dissect_path"])
 857
 858        # Determine whether the user opted to skip hashing.
 859        skip_hashing = False
 860        if request.content_type and "multipart" in request.content_type:
 861            skip_hashing = bool(request.form.get("skip_hashing"))
 862        else:
 863            payload = request.get_json(silent=True) or {}
 864            if isinstance(payload, dict):
 865                skip_hashing = bool(payload.get("skip_hashing"))
 866
 867        files_to_hash = evidence_payload.get("evidence_files_to_hash", [])
 868        if skip_hashing:
 869            hashes = {"sha256": "N/A (skipped)", "md5": "N/A (skipped)", "size_bytes": 0}
 870            file_hashes = []
 871        elif files_to_hash:
 872            file_hashes: list[dict[str, Any]] = []
 873            for fpath in files_to_hash:
 874                h = dict(compute_hashes(fpath))
 875                h["path"] = fpath
 876                file_hashes.append(h)
 877            if len(file_hashes) == 1:
 878                hashes = dict(file_hashes[0])
 879            else:
 880                # Summary entry for backward compat — individual hashes
 881                # are persisted separately in evidence_file_hashes.
 882                hashes = {
 883                    "sha256": file_hashes[0]["sha256"],
 884                    "md5": file_hashes[0]["md5"],
 885                    "size_bytes": sum(h["size_bytes"] for h in file_hashes),
 886                }
 887        else:
 888            hashes = {"sha256": "N/A (directory)", "md5": "N/A (directory)", "size_bytes": 0}
 889            file_hashes = []
 890        hashes["filename"] = source_path.name
 891
 892        try:
 893            with ForensicParser(
 894                evidence_path=dissect_path,
 895                case_dir=case_dir,
 896                audit_logger=audit_logger,
 897            ) as parser:
 898                metadata = parser.get_image_metadata()
 899                available_artifacts = parser.get_available_artifacts()
 900                detected_os_type = parser.os_type
 901        except Exception:
 902            LOGGER.warning(
 903                "Failed to open evidence with Dissect for case %s — "
 904                "returning degraded response so the user sees the "
 905                "unsupported-evidence screen.",
 906                case_id,
 907                exc_info=True,
 908            )
 909            metadata = {
 910                "hostname": "Unknown",
 911                "os_version": "Unknown",
 912                "domain": "Unknown",
 913            }
 914            available_artifacts = []
 915            detected_os_type = "unknown"
 916
 917        audit_logger.log(
 918            "evidence_intake",
 919            {
 920                "filename": source_path.name,
 921                "source_mode": evidence_payload["mode"],
 922                "source_path": evidence_payload["source_path"],
 923                "stored_path": evidence_payload["stored_path"],
 924                "uploaded_files": list(evidence_payload.get("uploaded_files", [])),
 925                "dissect_path": str(dissect_path),
 926                "sha256": hashes["sha256"],
 927                "md5": hashes["md5"],
 928                "file_size_bytes": hashes["size_bytes"],
 929                "evidence_file_hashes": [
 930                    {"path": h["path"], "sha256": h["sha256"], "md5": h["md5"], "size_bytes": h["size_bytes"]}
 931                    for h in file_hashes
 932                ],
 933            },
 934        )
 935        audit_logger.log(
 936            "image_opened",
 937            {
 938                "hostname": metadata.get("hostname", "Unknown"),
 939                "os_version": metadata.get("os_version", "Unknown"),
 940                "os_type": detected_os_type,
 941                "domain": metadata.get("domain", "Unknown"),
 942                "available_artifacts": [
 943                    str(item.get("key"))
 944                    for item in available_artifacts
 945                    if item.get("available")
 946                ],
 947            },
 948        )
 949
 950        with STATE_LOCK:
 951            # Capture the previous csv_output_dir before clearing it so
 952            # we can remove stale parsed CSVs even when they live outside
 953            # the case directory (external csv_output_dir).
 954            prev_csv_output_dir = str(case.get("csv_output_dir", "")).strip()
 955
 956            # Set new evidence metadata.
 957            case["evidence_mode"] = evidence_payload["mode"]
 958            case["source_path"] = evidence_payload["source_path"]
 959            case["stored_path"] = evidence_payload["stored_path"]
 960            case["uploaded_files"] = list(evidence_payload.get("uploaded_files", []))
 961            case["evidence_path"] = str(dissect_path)
 962            case["evidence_hashes"] = hashes
 963            case["evidence_file_hashes"] = [
 964                {"path": h["path"], "sha256": h["sha256"], "md5": h["md5"], "size_bytes": h["size_bytes"]}
 965                for h in file_hashes
 966            ]
 967            case["image_metadata"] = metadata
 968            case["os_type"] = detected_os_type
 969            case["available_artifacts"] = available_artifacts
 970
 971            # Invalidate all downstream state derived from prior evidence.
 972            case["parse_results"] = []
 973            case["artifact_csv_paths"] = {}
 974            case["analysis_results"] = {}
 975            case["csv_output_dir"] = ""
 976            case["selected_artifacts"] = []
 977            case["analysis_artifacts"] = []
 978            case["artifact_options"] = []
 979            case["analysis_date_range"] = None
 980            case["investigation_context"] = ""
 981            case["status"] = "evidence_loaded"
 982
 983            # Clear progress stores so stale SSE streams are not reused.
 984            PARSE_PROGRESS.pop(case_id, None)
 985            ANALYSIS_PROGRESS.pop(case_id, None)
 986            CHAT_PROGRESS.pop(case_id, None)
 987
 988        # Remove stale on-disk artifacts so disk fallbacks cannot
 989        # resurrect results from prior evidence.
 990        _cleanup_parsed_output(case_dir, prev_csv_output_dir)
 991        parsed_dir = case_dir / "parsed"
 992        if parsed_dir.is_dir():
 993            shutil.rmtree(parsed_dir, ignore_errors=True)
 994        for stale_file in ("analysis_results.json", "prompt.txt", "chat_history.jsonl"):
 995            stale_path = case_dir / stale_file
 996            if stale_path.exists():
 997                stale_path.unlink(missing_ok=True)
 998
 999        os_warning = ""
1000        if detected_os_type == "unknown":
1001            os_warning = (
1002                "Could not detect the operating system of this image. "
1003                "Artifact availability may be incomplete — verify that the "
1004                "image format is supported by Dissect."
1005            )
1006
1007        response_data: dict[str, Any] = {
1008            "case_id": case_id,
1009            "source_mode": evidence_payload["mode"],
1010            "source_path": evidence_payload["source_path"],
1011            "evidence_path": str(dissect_path),
1012            "uploaded_files": list(evidence_payload.get("uploaded_files", [])),
1013            "hashes": hashes,
1014            "metadata": metadata,
1015            "os_type": detected_os_type,
1016            "available_artifacts": available_artifacts,
1017        }
1018        if os_warning:
1019            response_data["os_warning"] = os_warning
1020
1021        return success_response(response_data)
1022    except (ValueError, FileNotFoundError) as error:
1023        return error_response(str(error), 400)
1024    except Exception:
1025        LOGGER.exception("Evidence intake failed for case %s", case_id)
1026        return error_response(
1027            "Evidence intake failed due to an unexpected error. "
1028            "Confirm the evidence file is supported and try again.",
1029            500,
1030        )
1031
1032
1033def generate_case_report(case_id: str) -> dict[str, Any]:
1034    """Generate the HTML forensic report for a case and save it to disk.
1035
1036    Performs hash verification, assembles analysis context, renders the
1037    report via :class:`ReportGenerator`, and logs the result to the audit
1038    trail.  This function can be called from both the download route and
1039    from background tasks (e.g. auto-generation after analysis).
1040
1041    Args:
1042        case_id: UUID of the case.
1043
1044    Returns:
1045        A result dict with keys ``success`` (bool), and on success:
1046        ``report_path`` (:class:`~pathlib.Path`), ``hash_ok`` (bool).
1047        On failure: ``error`` (str).
1048    """
1049    case = get_case(case_id)
1050    if case is None:
1051        return {"success": False, "error": f"Case not found: {case_id}"}
1052
1053    with STATE_LOCK:
1054        case_snapshot = dict(case)
1055        audit_logger = case["audit"]
1056
1057    hashes = dict(case_snapshot.get("evidence_hashes", {}))
1058    intake_sha256 = str(hashes.get("sha256", "")).strip()
1059    file_hash_entries = list(case_snapshot.get("evidence_file_hashes", []))
1060
1061    hashing_skipped = intake_sha256 == "N/A (skipped)"
1062
1063    if hashing_skipped:
1064        hash_ok = True
1065        computed_sha256 = intake_sha256
1066        verify_details: list[dict[str, object]] = []
1067    elif intake_sha256.startswith("N/A"):
1068        hash_ok = True
1069        computed_sha256 = intake_sha256
1070        verify_details = []
1071    elif file_hash_entries:
1072        # Verify every file that was hashed at intake.
1073        hash_ok = True
1074        verify_details = []
1075        for entry in file_hash_entries:
1076            fpath = Path(str(entry["path"]))
1077            expected = str(entry["sha256"]).strip().lower()
1078            if not fpath.exists():
1079                verify_details.append({
1080                    "path": str(fpath), "match": False,
1081                    "expected": expected, "computed": "FILE_MISSING",
1082                })
1083                hash_ok = False
1084                continue
1085            ok, computed = verify_hash(fpath, expected, return_computed=True)
1086            verify_details.append({
1087                "path": str(fpath), "match": ok,
1088                "expected": expected, "computed": computed,
1089            })
1090            if not ok:
1091                hash_ok = False
1092        computed_sha256 = (
1093            str(verify_details[0]["computed"]) if len(verify_details) == 1
1094            else "; ".join(str(d["computed"]) for d in verify_details)
1095        )
1096    else:
1097        # Fallback for cases created before evidence_file_hashes existed.
1098        verification_path = resolve_hash_verification_path(case_snapshot)
1099        if verification_path is None or not intake_sha256:
1100            return {"success": False, "error": "Evidence hash context is missing for this case."}
1101        if not verification_path.exists():
1102            return {"success": False, "error": "Evidence file is no longer available for hash verification."}
1103        hash_ok, computed_sha256 = verify_hash(
1104            verification_path, intake_sha256, return_computed=True,
1105        )
1106        verify_details = [{
1107            "path": str(verification_path),
1108            "match": hash_ok,
1109            "expected": intake_sha256,
1110            "computed": computed_sha256,
1111        }]
1112
1113    audit_logger.log(
1114        "hash_verification",
1115        {
1116            "expected_sha256": intake_sha256,
1117            "computed_sha256": computed_sha256,
1118            "match": hash_ok,
1119            "skipped": hashing_skipped,
1120            "verified_files": verify_details,
1121        },
1122    )
1123
1124    hashes["case_id"] = case_id
1125    hashes["expected_sha256"] = intake_sha256
1126    hashes["hash_verified"] = "skipped" if hashing_skipped else hash_ok
1127
1128    analysis_results = dict(case_snapshot.get("analysis_results", {}))
1129
1130    has_per_artifact = bool(analysis_results.get("per_artifact") or analysis_results.get("per_artifact_findings"))
1131    has_summary = bool(
1132        str(analysis_results.get("summary", "")).strip()
1133        or str(analysis_results.get("executive_summary", "")).strip()
1134    )
1135    if not has_per_artifact and not has_summary:
1136        return {
1137            "success": False,
1138            "error": "Analysis has not been completed for this case.",
1139        }
1140
1141    analysis_results.setdefault("case_id", case_id)
1142    analysis_results.setdefault("case_name", str(case_snapshot.get("case_name", "")))
1143    analysis_results.setdefault("per_artifact", [])
1144    analysis_results.setdefault("summary", "")
1145
1146    case_dir = case_snapshot["case_dir"]
1147    investigation_context = str(case_snapshot.get("investigation_context", ""))
1148    if not investigation_context:
1149        prompt_path = Path(case_dir) / "prompt.txt"
1150        if prompt_path.exists():
1151            investigation_context = prompt_path.read_text(encoding="utf-8")
1152
1153    report_generator = ReportGenerator(cases_root=CASES_ROOT)
1154    report_path = report_generator.generate(
1155        analysis_results=analysis_results,
1156        image_metadata=dict(case_snapshot.get("image_metadata", {})),
1157        evidence_hashes=hashes,
1158        investigation_context=investigation_context,
1159        audit_log_entries=read_audit_entries(Path(case_dir)),
1160    )
1161    audit_logger.log(
1162        "report_generated",
1163        {"report_filename": report_path.name, "hash_verified": hash_ok},
1164    )
1165    mark_case_status(case_id, "completed")
1166
1167    return {"success": True, "report_path": report_path, "hash_ok": hash_ok}
1168
1169
1170@evidence_bp.get("/api/cases/<case_id>/report")
1171def download_report(case_id: str) -> Response | tuple[Response, int]:
1172    """Generate and download the HTML forensic analysis report.
1173
1174    If a report was already auto-generated after analysis, serves the
1175    existing file.  Otherwise generates a new one.
1176
1177    Args:
1178        case_id: UUID of the case.
1179
1180    Returns:
1181        The HTML report as an attachment, or error.
1182    """
1183    case = get_case(case_id)
1184    if case is None:
1185        return error_response(f"Case not found: {case_id}", 404)
1186
1187    # Check if a report was already auto-generated after analysis.
1188    with STATE_LOCK:
1189        case_dir = case["case_dir"]
1190    reports_dir = Path(case_dir) / "reports"
1191    if reports_dir.is_dir():
1192        existing = sorted(reports_dir.glob("report_*.html"))
1193        if existing:
1194            report_path = existing[-1]
1195            return send_file(
1196                report_path,
1197                as_attachment=True,
1198                download_name=report_path.name,
1199                mimetype="text/html",
1200            )
1201
1202    result = generate_case_report(case_id)
1203    if not result["success"]:
1204        return error_response(str(result["error"]), 400)
1205
1206    report_path = result["report_path"]
1207    return send_file(
1208        report_path,
1209        as_attachment=True,
1210        download_name=report_path.name,
1211        mimetype="text/html",
1212    )
1213
1214
1215@evidence_bp.get("/api/cases/<case_id>/csvs")
1216def download_csv_bundle(case_id: str) -> Response | tuple[Response, int]:
1217    """Download all parsed CSV files as a ZIP archive.
1218
1219    Args:
1220        case_id: UUID of the case.
1221
1222    Returns:
1223        ZIP archive as attachment, or 404 error.
1224    """
1225    case = get_case(case_id)
1226    if case is None:
1227        return error_response(f"Case not found: {case_id}", 404)
1228
1229    with STATE_LOCK:
1230        case_snapshot = dict(case)
1231
1232    csv_paths = collect_case_csv_paths(case_snapshot)
1233    if not csv_paths:
1234        return error_response("No parsed CSV files available for this case.", 404)
1235
1236    reports_dir = Path(case_snapshot["case_dir"]) / "reports"
1237    reports_dir.mkdir(parents=True, exist_ok=True)
1238    timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
1239    zip_path = reports_dir / f"parsed_csvs_{timestamp}.zip"
1240    used_names: set[str] = set()
1241    with ZipFile(zip_path, "w", compression=ZIP_DEFLATED) as archive:
1242        for csv_path in csv_paths:
1243            base_name = csv_path.name
1244            arcname = base_name
1245            counter = 1
1246            while arcname in used_names:
1247                stem = Path(base_name).stem
1248                suffix = Path(base_name).suffix
1249                arcname = f"{stem}_{counter}{suffix}"
1250                counter += 1
1251            used_names.add(arcname)
1252            archive.write(csv_path, arcname=arcname)
1253
1254    return send_file(
1255        zip_path,
1256        as_attachment=True,
1257        download_name=f"{case_id}_parsed_csvs.zip",
1258        mimetype="application/zip",
1259    )
EWF_SEGMENT_RE = re.compile('^(?P<base>.+)\\.(?:e|ex|s|l)(?P<segment>\\d{2})$', re.IGNORECASE)
SPLIT_RAW_SEGMENT_RE = re.compile('^(?P<base>.+)\\.(?P<segment>\\d{3})$')
evidence_bp = <Blueprint 'evidence'>
def resolve_evidence_payload(case_dir: pathlib.Path) -> dict[str, typing.Any]:
514def resolve_evidence_payload(case_dir: Path) -> dict[str, Any]:
515    """Resolve the evidence source from the current request.
516
517    Handles upload and JSON path reference modes. Archives are extracted.
518
519    Args:
520        case_dir: Path to the case's root directory.
521
522    Returns:
523        Dict with ``mode``, ``filename``, ``source_path``, ``stored_path``,
524        ``dissect_path``, and ``uploaded_files``.
525
526    Raises:
527        ValueError: If no evidence provided or archive extraction fails.
528        FileNotFoundError: If the referenced path does not exist.
529    """
530    evidence_dir = case_dir / "evidence"
531    evidence_dir.mkdir(parents=True, exist_ok=True)
532
533    uploaded_files = _collect_uploaded_files()
534    uploaded_paths: list[Path] = []
535    if uploaded_files:
536        aift_config = current_app.config.get("AIFT_CONFIG", {})
537        threshold_mb = aift_config.get("evidence", {}).get("large_file_threshold_mb", 0)
538        max_bytes = int(threshold_mb) * 1024 * 1024 if threshold_mb and threshold_mb > 0 else 0
539        cumulative_bytes = 0
540        timestamp = int(time.time())
541        for index, uploaded_file in enumerate(uploaded_files, start=1):
542            filename = secure_filename(uploaded_file.filename) or f"evidence_{timestamp}_{index}.bin"
543            stored_path = _unique_destination(evidence_dir / filename)
544            cumulative_bytes = _save_with_limit(uploaded_file, stored_path, max_bytes, cumulative_bytes)
545            uploaded_paths.append(stored_path)
546
547        source_path = _resolve_uploaded_dissect_path(uploaded_paths)
548        mode = "upload"
549    else:
550        payload = request.get_json(silent=True) or {}
551        if not isinstance(payload, dict):
552            raise ValueError("Request body must be a JSON object.")
553        path_value = payload.get("path")
554        if not isinstance(path_value, str):
555            raise ValueError(
556                "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}."
557            )
558        normalized_path = _normalize_user_path(path_value)
559        if not normalized_path:
560            raise ValueError(
561                "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}."
562            )
563        source_path = Path(normalized_path).expanduser()
564        if not source_path.exists():
565            raise FileNotFoundError(f"Evidence path does not exist: {source_path}")
566        if not source_path.is_file() and not source_path.is_dir():
567            raise ValueError(f"Evidence path is not a file or directory: {source_path}")
568        uploaded_paths = []
569        mode = "path"
570
571    # Extract archives into the evidence directory.
572    _ARCHIVE_EXTRACTORS = {
573        ".zip": _extract_zip,
574        ".tar": _extract_tar,
575        ".gz": _extract_tar,
576        ".tgz": _extract_tar,
577        ".7z": _extract_7z,
578    }
579    dissect_path = source_path
580    suffix = source_path.suffix.lower()
581    extractor = _ARCHIVE_EXTRACTORS.get(suffix)
582    if source_path.is_file() and extractor is not None:
583        extract_dir = _make_extract_dir(evidence_dir, source_path)
584        dissect_path = extractor(source_path, extract_dir)
585
586    # Determine the files to hash for integrity verification.
587    # Archives are intentionally verified as the original container file.
588    # Split-image uploads hash all uploaded segments, and path-based split
589    # images hash all matching sibling segments on disk. Directories get N/A.
590    if source_path.is_file() and len(uploaded_paths) > 1:
591        evidence_files_to_hash = sorted(set(str(p) for p in uploaded_paths))
592    elif source_path.is_file():
593        segment_paths = _collect_segment_group_paths(source_path)
594        evidence_files_to_hash = [str(path) for path in segment_paths] if segment_paths else [str(source_path)]
595    else:
596        evidence_files_to_hash = []
597
598    return {
599        "mode": mode,
600        "filename": source_path.name,
601        "source_path": str(source_path),
602        "stored_path": str(source_path) if mode == "upload" else "",
603        "dissect_path": str(dissect_path),
604        "uploaded_files": [str(path) for path in uploaded_paths],
605        "evidence_files_to_hash": evidence_files_to_hash,
606    }

Resolve the evidence source from the current request.

Handles upload and JSON path reference modes. Archives are extracted.

Arguments:
  • case_dir: Path to the case's root directory.
Returns:

Dict with mode, filename, source_path, stored_path, dissect_path, and uploaded_files.

Raises:
  • ValueError: If no evidence provided or archive extraction fails.
  • FileNotFoundError: If the referenced path does not exist.
def resolve_hash_verification_path(case: dict[str, typing.Any]) -> pathlib.Path | None:
613def resolve_hash_verification_path(case: dict[str, Any]) -> Path | None:
614    """Resolve the file path for evidence hash verification.
615
616    Args:
617        case: The in-memory case state dictionary.
618
619    Returns:
620        Path to the evidence file, or ``None``.
621    """
622    source_path = str(case.get("source_path", "")).strip()
623    if source_path:
624        return Path(source_path)
625    evidence_path = str(case.get("evidence_path", "")).strip()
626    if evidence_path:
627        return Path(evidence_path)
628    return None

Resolve the file path for evidence hash verification.

Arguments:
  • case: The in-memory case state dictionary.
Returns:

Path to the evidence file, or None.

def resolve_case_csv_output_dir( case: dict[str, typing.Any], config_snapshot: dict[str, typing.Any]) -> pathlib.Path:
631def resolve_case_csv_output_dir(case: dict[str, Any], config_snapshot: dict[str, Any]) -> Path:
632    """Resolve the output directory for parsed CSV files.
633
634    Args:
635        case: The in-memory case state dictionary.
636        config_snapshot: Application configuration snapshot.
637
638    Returns:
639        Absolute ``Path`` to the CSV output directory.
640    """
641    config = config_snapshot if isinstance(config_snapshot, dict) else {}
642    evidence_config = config.get("evidence", {}) if isinstance(config, dict) else {}
643    configured = str(evidence_config.get("csv_output_dir", "")).strip() if isinstance(evidence_config, dict) else ""
644    case_dir = Path(case["case_dir"])
645    case_id = str(case.get("case_id", "")).strip()
646
647    if not configured:
648        return case_dir / "parsed"
649
650    output_root = Path(configured).expanduser()
651    if not output_root.is_absolute():
652        output_root = (PROJECT_ROOT / output_root).resolve()
653    if case_id:
654        return output_root / case_id / "parsed"
655    return output_root / "parsed"

Resolve the output directory for parsed CSV files.

Arguments:
  • case: The in-memory case state dictionary.
  • config_snapshot: Application configuration snapshot.
Returns:

Absolute Path to the CSV output directory.

def collect_case_csv_paths(case: dict[str, typing.Any]) -> list[pathlib.Path]:
658def collect_case_csv_paths(case: dict[str, Any]) -> list[Path]:
659    """Collect all parsed CSV file paths for a case.
660
661    Args:
662        case: The in-memory case state dictionary.
663
664    Returns:
665        A sorted list of existing CSV file paths.
666    """
667    collected: list[Path] = []
668    seen: set[str] = set()
669
670    def _add_path(candidate: Any) -> None:
671        """Add a CSV path if it exists and is not a duplicate."""
672        path_text = str(candidate or "").strip()
673        if not path_text:
674            return
675        path = Path(path_text)
676        if not path.exists() or not path.is_file():
677            return
678        key = str(path.resolve())
679        if key in seen:
680            return
681        seen.add(key)
682        collected.append(path)
683
684    csv_map = case.get("artifact_csv_paths")
685    if isinstance(csv_map, dict):
686        for csv_path in csv_map.values():
687            if isinstance(csv_path, list):
688                for p in csv_path:
689                    _add_path(p)
690            else:
691                _add_path(csv_path)
692
693    parse_results = case.get("parse_results")
694    if isinstance(parse_results, list):
695        for result in parse_results:
696            if not isinstance(result, dict) or not result.get("success"):
697                continue
698            _add_path(result.get("csv_path"))
699            csv_paths = result.get("csv_paths")
700            if isinstance(csv_paths, list):
701                for path in csv_paths:
702                    _add_path(path)
703
704    if collected:
705        return sorted(collected, key=lambda path: path.name.lower())
706
707    parsed_dir = Path(case["case_dir"]) / "parsed"
708    return sorted(path for path in parsed_dir.glob("*.csv") if path.is_file())

Collect all parsed CSV file paths for a case.

Arguments:
  • case: The in-memory case state dictionary.
Returns:

A sorted list of existing CSV file paths.

def build_csv_map(parse_results: list[dict[str, typing.Any]]) -> dict[str, str | list[str]]:
711def build_csv_map(parse_results: list[dict[str, Any]]) -> dict[str, str | list[str]]:
712    """Build a mapping of artifact keys to their parsed CSV file paths.
713
714    Split artifacts (e.g. EVTX) that produce multiple CSV files are
715    represented as a ``list[str]`` value.  Single-file artifacts remain
716    a plain ``str`` so existing callers are unaffected.
717
718    Args:
719        parse_results: List of per-artifact parse result dicts.
720
721    Returns:
722        Dict mapping artifact key strings to a single CSV path string
723        or a list of CSV path strings for split artifacts.
724    """
725    mapping: dict[str, str | list[str]] = {}
726    for result in parse_results:
727        artifact = str(result.get("artifact_key", "")).strip()
728        if not artifact or not result.get("success"):
729            continue
730        csv_paths = result.get("csv_paths")
731        if isinstance(csv_paths, list) and csv_paths:
732            non_empty = [str(p) for p in csv_paths if str(p).strip()]
733            if len(non_empty) > 1:
734                mapping[artifact] = non_empty
735                continue
736            if non_empty:
737                mapping[artifact] = non_empty[0]
738                continue
739        csv_path = str(result.get("csv_path", "")).strip()
740        if csv_path:
741            mapping[artifact] = csv_path
742    return mapping

Build a mapping of artifact keys to their parsed CSV file paths.

Split artifacts (e.g. EVTX) that produce multiple CSV files are represented as a list[str] value. Single-file artifacts remain a plain str so existing callers are unaffected.

Arguments:
  • parse_results: List of per-artifact parse result dicts.
Returns:

Dict mapping artifact key strings to a single CSV path string or a list of CSV path strings for split artifacts.

def read_audit_entries(case_dir: pathlib.Path) -> list[dict[str, typing.Any]]:
745def read_audit_entries(case_dir: Path) -> list[dict[str, Any]]:
746    """Read all audit log entries from a case's ``audit.jsonl`` file.
747
748    Args:
749        case_dir: Path to the case's root directory.
750
751    Returns:
752        A list of parsed audit entry dicts, or empty list if missing.
753    """
754    audit_path = case_dir / "audit.jsonl"
755    if not audit_path.exists():
756        return []
757    entries: list[dict[str, Any]] = []
758    with audit_path.open("r", encoding="utf-8", errors="replace") as stream:
759        for line in stream:
760            text = line.strip()
761            if not text:
762                continue
763            try:
764                parsed = json.loads(text)
765            except json.JSONDecodeError:
766                continue
767            if isinstance(parsed, dict):
768                entries.append(parsed)
769    return entries

Read all audit log entries from a case's audit.jsonl file.

Arguments:
  • case_dir: Path to the case's root directory.
Returns:

A list of parsed audit entry dicts, or empty list if missing.

def generate_case_report(case_id: str) -> dict[str, typing.Any]:
1034def generate_case_report(case_id: str) -> dict[str, Any]:
1035    """Generate the HTML forensic report for a case and save it to disk.
1036
1037    Performs hash verification, assembles analysis context, renders the
1038    report via :class:`ReportGenerator`, and logs the result to the audit
1039    trail.  This function can be called from both the download route and
1040    from background tasks (e.g. auto-generation after analysis).
1041
1042    Args:
1043        case_id: UUID of the case.
1044
1045    Returns:
1046        A result dict with keys ``success`` (bool), and on success:
1047        ``report_path`` (:class:`~pathlib.Path`), ``hash_ok`` (bool).
1048        On failure: ``error`` (str).
1049    """
1050    case = get_case(case_id)
1051    if case is None:
1052        return {"success": False, "error": f"Case not found: {case_id}"}
1053
1054    with STATE_LOCK:
1055        case_snapshot = dict(case)
1056        audit_logger = case["audit"]
1057
1058    hashes = dict(case_snapshot.get("evidence_hashes", {}))
1059    intake_sha256 = str(hashes.get("sha256", "")).strip()
1060    file_hash_entries = list(case_snapshot.get("evidence_file_hashes", []))
1061
1062    hashing_skipped = intake_sha256 == "N/A (skipped)"
1063
1064    if hashing_skipped:
1065        hash_ok = True
1066        computed_sha256 = intake_sha256
1067        verify_details: list[dict[str, object]] = []
1068    elif intake_sha256.startswith("N/A"):
1069        hash_ok = True
1070        computed_sha256 = intake_sha256
1071        verify_details = []
1072    elif file_hash_entries:
1073        # Verify every file that was hashed at intake.
1074        hash_ok = True
1075        verify_details = []
1076        for entry in file_hash_entries:
1077            fpath = Path(str(entry["path"]))
1078            expected = str(entry["sha256"]).strip().lower()
1079            if not fpath.exists():
1080                verify_details.append({
1081                    "path": str(fpath), "match": False,
1082                    "expected": expected, "computed": "FILE_MISSING",
1083                })
1084                hash_ok = False
1085                continue
1086            ok, computed = verify_hash(fpath, expected, return_computed=True)
1087            verify_details.append({
1088                "path": str(fpath), "match": ok,
1089                "expected": expected, "computed": computed,
1090            })
1091            if not ok:
1092                hash_ok = False
1093        computed_sha256 = (
1094            str(verify_details[0]["computed"]) if len(verify_details) == 1
1095            else "; ".join(str(d["computed"]) for d in verify_details)
1096        )
1097    else:
1098        # Fallback for cases created before evidence_file_hashes existed.
1099        verification_path = resolve_hash_verification_path(case_snapshot)
1100        if verification_path is None or not intake_sha256:
1101            return {"success": False, "error": "Evidence hash context is missing for this case."}
1102        if not verification_path.exists():
1103            return {"success": False, "error": "Evidence file is no longer available for hash verification."}
1104        hash_ok, computed_sha256 = verify_hash(
1105            verification_path, intake_sha256, return_computed=True,
1106        )
1107        verify_details = [{
1108            "path": str(verification_path),
1109            "match": hash_ok,
1110            "expected": intake_sha256,
1111            "computed": computed_sha256,
1112        }]
1113
1114    audit_logger.log(
1115        "hash_verification",
1116        {
1117            "expected_sha256": intake_sha256,
1118            "computed_sha256": computed_sha256,
1119            "match": hash_ok,
1120            "skipped": hashing_skipped,
1121            "verified_files": verify_details,
1122        },
1123    )
1124
1125    hashes["case_id"] = case_id
1126    hashes["expected_sha256"] = intake_sha256
1127    hashes["hash_verified"] = "skipped" if hashing_skipped else hash_ok
1128
1129    analysis_results = dict(case_snapshot.get("analysis_results", {}))
1130
1131    has_per_artifact = bool(analysis_results.get("per_artifact") or analysis_results.get("per_artifact_findings"))
1132    has_summary = bool(
1133        str(analysis_results.get("summary", "")).strip()
1134        or str(analysis_results.get("executive_summary", "")).strip()
1135    )
1136    if not has_per_artifact and not has_summary:
1137        return {
1138            "success": False,
1139            "error": "Analysis has not been completed for this case.",
1140        }
1141
1142    analysis_results.setdefault("case_id", case_id)
1143    analysis_results.setdefault("case_name", str(case_snapshot.get("case_name", "")))
1144    analysis_results.setdefault("per_artifact", [])
1145    analysis_results.setdefault("summary", "")
1146
1147    case_dir = case_snapshot["case_dir"]
1148    investigation_context = str(case_snapshot.get("investigation_context", ""))
1149    if not investigation_context:
1150        prompt_path = Path(case_dir) / "prompt.txt"
1151        if prompt_path.exists():
1152            investigation_context = prompt_path.read_text(encoding="utf-8")
1153
1154    report_generator = ReportGenerator(cases_root=CASES_ROOT)
1155    report_path = report_generator.generate(
1156        analysis_results=analysis_results,
1157        image_metadata=dict(case_snapshot.get("image_metadata", {})),
1158        evidence_hashes=hashes,
1159        investigation_context=investigation_context,
1160        audit_log_entries=read_audit_entries(Path(case_dir)),
1161    )
1162    audit_logger.log(
1163        "report_generated",
1164        {"report_filename": report_path.name, "hash_verified": hash_ok},
1165    )
1166    mark_case_status(case_id, "completed")
1167
1168    return {"success": True, "report_path": report_path, "hash_ok": hash_ok}

Generate the HTML forensic report for a case and save it to disk.

Performs hash verification, assembles analysis context, renders the report via ReportGenerator, and logs the result to the audit trail. This function can be called from both the download route and from background tasks (e.g. auto-generation after analysis).

Arguments:
  • case_id: UUID of the case.
Returns:

A result dict with keys success (bool), and on success: report_path (~pathlib.Path), hash_ok (bool). On failure: error (str).