app.routes.evidence
Evidence intake, archive extraction, CSV/hash helpers, and route handlers.
This module handles all evidence-related logic: uploading files, resolving paths, extracting ZIP/tar/7z archives, computing and verifying hashes, collecting parsed CSV paths, reading audit log entries, and the Flask route handlers for evidence intake, report generation, and CSV bundle downloads.
Attributes:
- EWF_SEGMENT_RE: Compiled regex for EWF split segment filenames.
- SPLIT_RAW_SEGMENT_RE: Compiled regex for split raw disk image segments.
- evidence_bp: Flask Blueprint for evidence-related routes.
1"""Evidence intake, archive extraction, CSV/hash helpers, and route handlers. 2 3This module handles all evidence-related logic: uploading files, resolving 4paths, extracting ZIP/tar/7z archives, computing and verifying hashes, 5collecting parsed CSV paths, reading audit log entries, and the Flask route 6handlers for evidence intake, report generation, and CSV bundle downloads. 7 8Attributes: 9 EWF_SEGMENT_RE: Compiled regex for EWF split segment filenames. 10 SPLIT_RAW_SEGMENT_RE: Compiled regex for split raw disk image segments. 11 evidence_bp: Flask Blueprint for evidence-related routes. 12""" 13 14from __future__ import annotations 15 16import json 17import logging 18import re 19import shutil 20import tarfile 21import time 22import uuid 23from datetime import datetime, timezone 24from pathlib import Path 25from typing import Any, Callable 26from zipfile import BadZipFile, ZipFile, ZIP_DEFLATED 27 28import py7zr 29 30from flask import Blueprint, Response, current_app, request, send_file 31from werkzeug.utils import secure_filename 32 33from ..hasher import compute_hashes, verify_hash 34from ..parser import ForensicParser 35from ..reporter import ReportGenerator 36 37from .state import ( 38 ANALYSIS_PROGRESS, 39 CASES_ROOT, 40 CHAT_PROGRESS, 41 PARSE_PROGRESS, 42 PROJECT_ROOT, 43 STATE_LOCK, 44 error_response, 45 get_case, 46 mark_case_status, 47 safe_name, 48 success_response, 49) 50 51__all__ = [ 52 "EWF_SEGMENT_RE", 53 "SPLIT_RAW_SEGMENT_RE", 54 "evidence_bp", 55 "resolve_evidence_payload", 56 "resolve_hash_verification_path", 57 "resolve_case_csv_output_dir", 58 "collect_case_csv_paths", 59 "build_csv_map", 60 "read_audit_entries", 61 "generate_case_report", 62] 63 64LOGGER = logging.getLogger(__name__) 65 66EWF_SEGMENT_RE = re.compile(r"^(?P<base>.+)\.(?:e|ex|s|l)(?P<segment>\d{2})$", re.IGNORECASE) 67SPLIT_RAW_SEGMENT_RE = re.compile(r"^(?P<base>.+)\.(?P<segment>\d{3})$") 68 69# Extensions for evidence files we look for inside extracted archives. 70_EVIDENCE_FILE_EXTENSIONS = frozenset({ 71 ".e01", ".ex01", ".s01", ".l01", 72 ".dd", ".img", ".raw", ".bin", ".iso", 73 ".vmdk", ".vhd", ".vhdx", ".vdi", ".qcow2", ".hdd", ".hds", 74 ".vmx", ".vbox", ".vmcx", ".ovf", ".ova", 75 ".asdf", ".asif", ".ad1", 76 ".000", ".001", 77}) 78 79 80# --------------------------------------------------------------------------- 81# Archive extraction 82# --------------------------------------------------------------------------- 83 84def _extract_archive_members( 85 destination: Path, 86 members: list[tuple[str, Any]], 87 *, 88 empty_message: str, 89 unsafe_paths_message: str, 90 no_files_message: str, 91 extract_member: Callable[[Any, Path], None] | None = None, 92 extract_all_members: Callable[[list[tuple[Any, Path]]], None] | None = None, 93) -> Path: 94 """Extract archive members safely and return the best Dissect target path. 95 96 Validates path traversal, extracts, then locates the best evidence file. 97 Exactly one of *extract_member* or *extract_all_members* must be provided. 98 99 Args: 100 destination: Root directory to extract into. 101 members: List of ``(member_name, member_object)`` tuples. 102 empty_message: Error for empty archives. 103 unsafe_paths_message: Error for path traversal. 104 no_files_message: Error when extraction produces no files. 105 extract_member: Callback to extract a single member. 106 extract_all_members: Callback to extract all members at once. 107 108 Returns: 109 Path to the best evidence file or extraction directory. 110 111 Raises: 112 ValueError: On empty, unsafe, or failed extraction. 113 """ 114 if (extract_member is None) == (extract_all_members is None): 115 raise ValueError("Exactly one extraction callback must be provided.") 116 117 if destination.exists(): 118 shutil.rmtree(destination) 119 destination.mkdir(parents=True, exist_ok=True) 120 root = destination.resolve() 121 122 if not members: 123 raise ValueError(empty_message) 124 125 validated_members: list[tuple[Any, Path]] = [] 126 for member_name, member in members: 127 member_path = Path(member_name) 128 if member_path.is_absolute() or ".." in member_path.parts: 129 raise ValueError(unsafe_paths_message) 130 target = (root / member_path).resolve() 131 if not target.is_relative_to(root): 132 raise ValueError(unsafe_paths_message) 133 target.parent.mkdir(parents=True, exist_ok=True) 134 validated_members.append((member, target)) 135 136 if extract_all_members is not None: 137 extract_all_members(validated_members) 138 else: 139 for member, target in validated_members: 140 extract_member(member, target) 141 142 files = sorted(path for path in destination.rglob("*") if path.is_file()) 143 if not files: 144 raise ValueError(no_files_message) 145 evidence_files = [ 146 path for path in files if path.suffix.lower() in _EVIDENCE_FILE_EXTENSIONS 147 ] 148 if evidence_files: 149 for ef in evidence_files: 150 if ef.suffix.lower() == ".e01": 151 return ef 152 return evidence_files[0] 153 154 top_level_entries: set[str] = set() 155 has_top_level_file = False 156 for file_path in files: 157 relative_parts = file_path.relative_to(destination).parts 158 if not relative_parts: 159 continue 160 top_level_entries.add(relative_parts[0]) 161 if len(relative_parts) == 1: 162 has_top_level_file = True 163 164 if not has_top_level_file and len(top_level_entries) == 1: 165 wrapper_dir = destination / sorted(top_level_entries)[0] 166 if wrapper_dir.is_dir(): 167 return wrapper_dir 168 169 return destination 170 171 172def _extract_zip(zip_path: Path, destination: Path) -> Path: 173 """Extract a ZIP archive and return the best Dissect target path. 174 175 Args: 176 zip_path: Path to the ZIP file. 177 destination: Directory to extract into. 178 179 Returns: 180 Path to the best evidence file or directory. 181 182 Raises: 183 ValueError: If the ZIP is invalid, empty, or contains unsafe paths. 184 """ 185 try: 186 with ZipFile(zip_path, "r") as archive: 187 members = [(member.filename, member) for member in archive.infolist() if not member.is_dir()] 188 189 def _extract_member(member: Any, target: Path) -> None: 190 """Extract a single ZIP member to the target path.""" 191 with archive.open(member, "r") as src, target.open("wb") as dst: 192 shutil.copyfileobj(src, dst) 193 return _extract_archive_members( 194 destination, 195 members, 196 empty_message="Evidence ZIP is empty.", 197 unsafe_paths_message="Archive rejected: contains unsafe file paths", 198 no_files_message="Evidence ZIP extraction produced no files.", 199 extract_member=_extract_member, 200 ) 201 except BadZipFile as error: 202 raise ValueError(f"Invalid ZIP evidence file: {zip_path.name}") from error 203 204 205def _extract_tar(tar_path: Path, destination: Path) -> Path: 206 """Extract a tar archive and return the best Dissect target path. 207 208 Args: 209 tar_path: Path to the tar file. 210 destination: Directory to extract into. 211 212 Returns: 213 Path to the best evidence file or directory. 214 215 Raises: 216 ValueError: If the tar is invalid, empty, or contains unsafe paths. 217 """ 218 try: 219 with tarfile.open(tar_path, "r:*") as archive: 220 raw_members = archive.getmembers() 221 for member in raw_members: 222 if member.islnk() or member.issym(): 223 raise ValueError("Archive rejected: contains unsafe file paths") 224 members = [(member.name, member) for member in raw_members if member.isfile()] 225 226 def _extract_member(member: Any, target: Path) -> None: 227 """Extract a single tar member to the target path.""" 228 src = archive.extractfile(member) 229 if src is None: 230 return 231 with src, target.open("wb") as dst: 232 shutil.copyfileobj(src, dst) 233 return _extract_archive_members( 234 destination, 235 members, 236 empty_message="Evidence tar archive is empty.", 237 unsafe_paths_message="Archive rejected: contains unsafe file paths", 238 no_files_message="Evidence tar extraction produced no files.", 239 extract_member=_extract_member, 240 ) 241 except tarfile.TarError as error: 242 raise ValueError(f"Invalid tar evidence file: {tar_path.name}") from error 243 244 245def _extract_7z(archive_path: Path, destination: Path) -> Path: 246 """Extract a 7z archive and return the best Dissect target path. 247 248 Args: 249 archive_path: Path to the 7z file. 250 destination: Directory to extract into. 251 252 Returns: 253 Path to the best evidence file or directory. 254 255 Raises: 256 ValueError: If the 7z is invalid, empty, or contains unsafe paths. 257 """ 258 try: 259 with py7zr.SevenZipFile(archive_path, mode="r") as archive: 260 members = [(name, name) for name in archive.getnames() if not name.endswith("/")] 261 262 def _extract_members(validated: list[tuple[Any, Path]]) -> None: 263 """Extract 7z members via temp directory for path-traversal safety.""" 264 import tempfile 265 with tempfile.TemporaryDirectory() as tmpdir: 266 tmp = Path(tmpdir) 267 archive.extractall(path=tmp) 268 for member_name, target in validated: 269 src = tmp / member_name 270 if src.is_file(): 271 target.parent.mkdir(parents=True, exist_ok=True) 272 shutil.copy2(src, target) 273 274 return _extract_archive_members( 275 destination, 276 members, 277 empty_message="Evidence 7z archive is empty.", 278 unsafe_paths_message="Archive rejected: contains unsafe file paths", 279 no_files_message="Evidence 7z extraction produced no files.", 280 extract_all_members=_extract_members, 281 ) 282 except py7zr.Bad7zFile as error: 283 raise ValueError(f"Invalid 7z evidence file: {archive_path.name}") from error 284 285 286# --------------------------------------------------------------------------- 287# Upload / path resolution 288# --------------------------------------------------------------------------- 289 290def _collect_uploaded_files() -> list[Any]: 291 """Collect all uploaded ``FileStorage`` objects from the current request. 292 293 Returns: 294 A list of ``FileStorage`` objects with non-empty filenames. 295 """ 296 uploaded: list[Any] = [] 297 for key in request.files: 298 for file_storage in request.files.getlist(key): 299 if file_storage and file_storage.filename: 300 uploaded.append(file_storage) 301 return uploaded 302 303 304_SAVE_CHUNK_SIZE = 4 * 1024 * 1024 # 4 MiB 305 306 307def _save_with_limit( 308 file_storage: Any, 309 dest: Path, 310 max_bytes: int, 311 cumulative: int, 312) -> int: 313 """Stream-save an uploaded file, enforcing an optional size limit. 314 315 Args: 316 file_storage: Werkzeug ``FileStorage`` to save. 317 dest: Destination path on disk. 318 max_bytes: Maximum allowed total bytes across all files (0 = unlimited). 319 cumulative: Bytes already written by prior files in this upload batch. 320 321 Returns: 322 Updated cumulative byte count after this file. 323 324 Raises: 325 ValueError: If the cumulative size exceeds *max_bytes*. 326 """ 327 if max_bytes <= 0: 328 file_storage.save(dest) 329 return cumulative + dest.stat().st_size 330 331 written = 0 332 stream = file_storage.stream 333 with open(dest, "wb") as out: 334 while True: 335 chunk = stream.read(_SAVE_CHUNK_SIZE) 336 if not chunk: 337 break 338 written += len(chunk) 339 if cumulative + written > max_bytes: 340 out.close() 341 dest.unlink(missing_ok=True) 342 limit_gb = max_bytes / (1024 * 1024 * 1024) 343 raise ValueError( 344 f"Upload exceeds the Evidence Size Threshold " 345 f"({limit_gb:.1f} GB). Use path mode instead, or " 346 f"increase the threshold in Settings \u2192 Advanced." 347 ) 348 out.write(chunk) 349 return cumulative + written 350 351 352def _unique_destination(path: Path) -> Path: 353 """Generate a unique file path by appending a numeric suffix if needed. 354 355 Args: 356 path: Desired file path. 357 358 Returns: 359 A ``Path`` guaranteed not to exist on disk. 360 """ 361 if not path.exists(): 362 return path 363 counter = 1 364 while True: 365 candidate = path.with_name(f"{path.stem}_{counter}{path.suffix}") 366 if not candidate.exists(): 367 return candidate 368 counter += 1 369 370 371def _segment_identity(path_or_name: Path | str) -> tuple[str, str, int] | None: 372 """Parse split-image segment identity from a filename. 373 374 Args: 375 path_or_name: Path or filename to inspect. 376 377 Returns: 378 ``(kind, base_name, segment_number)`` for known split-image naming 379 schemes, or ``None`` if the name is not a recognized segment. 380 """ 381 name = Path(path_or_name).name if isinstance(path_or_name, Path) else str(path_or_name) 382 for kind, pattern in (("ewf", EWF_SEGMENT_RE), ("raw", SPLIT_RAW_SEGMENT_RE)): 383 match = pattern.match(name) 384 if match is not None: 385 return kind, match.group("base").lower(), int(match.group("segment")) 386 return None 387 388 389def _collect_segment_group_paths(source_path: Path) -> list[Path]: 390 """Collect all sibling segment paths for a split-image source file. 391 392 Args: 393 source_path: Candidate source evidence file. 394 395 Returns: 396 Sorted list of sibling segment paths for the same split-image set, or 397 an empty list when the path is not a recognized split-image segment. 398 """ 399 if not source_path.is_file(): 400 return [] 401 402 identity = _segment_identity(source_path) 403 if identity is None: 404 return [] 405 406 kind, base_name, _segment_number = identity 407 segment_paths: list[tuple[int, Path]] = [] 408 try: 409 siblings = source_path.parent.iterdir() 410 except OSError: 411 return [source_path] 412 413 for sibling in siblings: 414 if not sibling.is_file(): 415 continue 416 sibling_identity = _segment_identity(sibling) 417 if sibling_identity is None: 418 continue 419 sibling_kind, sibling_base_name, sibling_segment_number = sibling_identity 420 if sibling_kind == kind and sibling_base_name == base_name: 421 segment_paths.append((sibling_segment_number, sibling)) 422 423 if not segment_paths: 424 return [source_path] 425 return [path for _segment_number, path in sorted(segment_paths, key=lambda item: item[0])] 426 427 428def _resolve_uploaded_dissect_path(uploaded_paths: list[Path]) -> Path: 429 """Determine the primary Dissect target path from uploaded files. 430 431 Handles single files, split EWF/raw segment sets, and rejects mixed 432 archive-plus-segment uploads. 433 434 Args: 435 uploaded_paths: List of uploaded evidence file paths. 436 437 Returns: 438 The ``Path`` to pass to Dissect's ``Target.open()``. 439 440 Raises: 441 ValueError: If no files uploaded or archive mixed with segments. 442 """ 443 if not uploaded_paths: 444 raise ValueError("No uploaded evidence files were provided.") 445 446 if len(uploaded_paths) == 1: 447 return uploaded_paths[0] 448 449 archive_exts = {".zip", ".tar", ".gz", ".tgz", ".7z"} 450 archive_paths = [path for path in uploaded_paths if path.suffix.lower() in archive_exts] 451 if archive_paths and len(uploaded_paths) > 1: 452 raise ValueError("Upload either one archive file or raw evidence segments, not both.") 453 454 segment_groups: dict[tuple[str, str], list[tuple[int, Path]]] = {} 455 for path in uploaded_paths: 456 identity = _segment_identity(path) 457 if identity is None: 458 continue 459 kind, base_name, segment_number = identity 460 segment_groups.setdefault((kind, base_name), []).append((segment_number, path)) 461 462 if segment_groups: 463 if len(segment_groups) > 1: 464 group_names = sorted({base_name for _kind, base_name in segment_groups}) 465 raise ValueError( 466 "Ambiguous upload: multiple segment groups detected " 467 f"({', '.join(group_names)}). " 468 "Upload only one split segment set at a time." 469 ) 470 only_group = next(iter(segment_groups.values())) 471 return min(only_group, key=lambda item: item[0])[1] 472 473 # Multiple files that are neither a single archive nor a recognized 474 # segment set — reject rather than silently analyzing only the first. 475 raise ValueError( 476 "Ambiguous upload: multiple files were provided but they do not " 477 "form a recognized segment set. Upload a single evidence file, " 478 "one archive, or a complete split-image segment set." 479 ) 480 481 482def _normalize_user_path(value: str) -> str: 483 """Strip surrounding quotes and whitespace from a user-supplied path. 484 485 Args: 486 value: Raw path string. 487 488 Returns: 489 Cleaned path string. 490 """ 491 return ( 492 str(value) 493 .replace('"', "") 494 .replace("\u201c", "") 495 .replace("\u201d", "") 496 .strip() 497 ) 498 499 500def _make_extract_dir(evidence_dir: Path, source_path: Path) -> Path: 501 """Build a unique extraction directory path for an archive. 502 503 Args: 504 evidence_dir: Parent evidence directory. 505 source_path: Path to the archive being extracted. 506 507 Returns: 508 A timestamped extraction directory path. 509 """ 510 return evidence_dir / f"extracted_{safe_name(source_path.stem, 'evidence')}_{uuid.uuid4().hex[:12]}" 511 512 513def resolve_evidence_payload(case_dir: Path) -> dict[str, Any]: 514 """Resolve the evidence source from the current request. 515 516 Handles upload and JSON path reference modes. Archives are extracted. 517 518 Args: 519 case_dir: Path to the case's root directory. 520 521 Returns: 522 Dict with ``mode``, ``filename``, ``source_path``, ``stored_path``, 523 ``dissect_path``, and ``uploaded_files``. 524 525 Raises: 526 ValueError: If no evidence provided or archive extraction fails. 527 FileNotFoundError: If the referenced path does not exist. 528 """ 529 evidence_dir = case_dir / "evidence" 530 evidence_dir.mkdir(parents=True, exist_ok=True) 531 532 uploaded_files = _collect_uploaded_files() 533 uploaded_paths: list[Path] = [] 534 if uploaded_files: 535 aift_config = current_app.config.get("AIFT_CONFIG", {}) 536 threshold_mb = aift_config.get("evidence", {}).get("large_file_threshold_mb", 0) 537 max_bytes = int(threshold_mb) * 1024 * 1024 if threshold_mb and threshold_mb > 0 else 0 538 cumulative_bytes = 0 539 timestamp = int(time.time()) 540 for index, uploaded_file in enumerate(uploaded_files, start=1): 541 filename = secure_filename(uploaded_file.filename) or f"evidence_{timestamp}_{index}.bin" 542 stored_path = _unique_destination(evidence_dir / filename) 543 cumulative_bytes = _save_with_limit(uploaded_file, stored_path, max_bytes, cumulative_bytes) 544 uploaded_paths.append(stored_path) 545 546 source_path = _resolve_uploaded_dissect_path(uploaded_paths) 547 mode = "upload" 548 else: 549 payload = request.get_json(silent=True) or {} 550 if not isinstance(payload, dict): 551 raise ValueError("Request body must be a JSON object.") 552 path_value = payload.get("path") 553 if not isinstance(path_value, str): 554 raise ValueError( 555 "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}." 556 ) 557 normalized_path = _normalize_user_path(path_value) 558 if not normalized_path: 559 raise ValueError( 560 "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}." 561 ) 562 source_path = Path(normalized_path).expanduser() 563 if not source_path.exists(): 564 raise FileNotFoundError(f"Evidence path does not exist: {source_path}") 565 if not source_path.is_file() and not source_path.is_dir(): 566 raise ValueError(f"Evidence path is not a file or directory: {source_path}") 567 uploaded_paths = [] 568 mode = "path" 569 570 # Extract archives into the evidence directory. 571 _ARCHIVE_EXTRACTORS = { 572 ".zip": _extract_zip, 573 ".tar": _extract_tar, 574 ".gz": _extract_tar, 575 ".tgz": _extract_tar, 576 ".7z": _extract_7z, 577 } 578 dissect_path = source_path 579 suffix = source_path.suffix.lower() 580 extractor = _ARCHIVE_EXTRACTORS.get(suffix) 581 if source_path.is_file() and extractor is not None: 582 extract_dir = _make_extract_dir(evidence_dir, source_path) 583 dissect_path = extractor(source_path, extract_dir) 584 585 # Determine the files to hash for integrity verification. 586 # Archives are intentionally verified as the original container file. 587 # Split-image uploads hash all uploaded segments, and path-based split 588 # images hash all matching sibling segments on disk. Directories get N/A. 589 if source_path.is_file() and len(uploaded_paths) > 1: 590 evidence_files_to_hash = sorted(set(str(p) for p in uploaded_paths)) 591 elif source_path.is_file(): 592 segment_paths = _collect_segment_group_paths(source_path) 593 evidence_files_to_hash = [str(path) for path in segment_paths] if segment_paths else [str(source_path)] 594 else: 595 evidence_files_to_hash = [] 596 597 return { 598 "mode": mode, 599 "filename": source_path.name, 600 "source_path": str(source_path), 601 "stored_path": str(source_path) if mode == "upload" else "", 602 "dissect_path": str(dissect_path), 603 "uploaded_files": [str(path) for path in uploaded_paths], 604 "evidence_files_to_hash": evidence_files_to_hash, 605 } 606 607 608# --------------------------------------------------------------------------- 609# Hash / CSV / audit helpers 610# --------------------------------------------------------------------------- 611 612def resolve_hash_verification_path(case: dict[str, Any]) -> Path | None: 613 """Resolve the file path for evidence hash verification. 614 615 Args: 616 case: The in-memory case state dictionary. 617 618 Returns: 619 Path to the evidence file, or ``None``. 620 """ 621 source_path = str(case.get("source_path", "")).strip() 622 if source_path: 623 return Path(source_path) 624 evidence_path = str(case.get("evidence_path", "")).strip() 625 if evidence_path: 626 return Path(evidence_path) 627 return None 628 629 630def resolve_case_csv_output_dir(case: dict[str, Any], config_snapshot: dict[str, Any]) -> Path: 631 """Resolve the output directory for parsed CSV files. 632 633 Args: 634 case: The in-memory case state dictionary. 635 config_snapshot: Application configuration snapshot. 636 637 Returns: 638 Absolute ``Path`` to the CSV output directory. 639 """ 640 config = config_snapshot if isinstance(config_snapshot, dict) else {} 641 evidence_config = config.get("evidence", {}) if isinstance(config, dict) else {} 642 configured = str(evidence_config.get("csv_output_dir", "")).strip() if isinstance(evidence_config, dict) else "" 643 case_dir = Path(case["case_dir"]) 644 case_id = str(case.get("case_id", "")).strip() 645 646 if not configured: 647 return case_dir / "parsed" 648 649 output_root = Path(configured).expanduser() 650 if not output_root.is_absolute(): 651 output_root = (PROJECT_ROOT / output_root).resolve() 652 if case_id: 653 return output_root / case_id / "parsed" 654 return output_root / "parsed" 655 656 657def collect_case_csv_paths(case: dict[str, Any]) -> list[Path]: 658 """Collect all parsed CSV file paths for a case. 659 660 Args: 661 case: The in-memory case state dictionary. 662 663 Returns: 664 A sorted list of existing CSV file paths. 665 """ 666 collected: list[Path] = [] 667 seen: set[str] = set() 668 669 def _add_path(candidate: Any) -> None: 670 """Add a CSV path if it exists and is not a duplicate.""" 671 path_text = str(candidate or "").strip() 672 if not path_text: 673 return 674 path = Path(path_text) 675 if not path.exists() or not path.is_file(): 676 return 677 key = str(path.resolve()) 678 if key in seen: 679 return 680 seen.add(key) 681 collected.append(path) 682 683 csv_map = case.get("artifact_csv_paths") 684 if isinstance(csv_map, dict): 685 for csv_path in csv_map.values(): 686 if isinstance(csv_path, list): 687 for p in csv_path: 688 _add_path(p) 689 else: 690 _add_path(csv_path) 691 692 parse_results = case.get("parse_results") 693 if isinstance(parse_results, list): 694 for result in parse_results: 695 if not isinstance(result, dict) or not result.get("success"): 696 continue 697 _add_path(result.get("csv_path")) 698 csv_paths = result.get("csv_paths") 699 if isinstance(csv_paths, list): 700 for path in csv_paths: 701 _add_path(path) 702 703 if collected: 704 return sorted(collected, key=lambda path: path.name.lower()) 705 706 parsed_dir = Path(case["case_dir"]) / "parsed" 707 return sorted(path for path in parsed_dir.glob("*.csv") if path.is_file()) 708 709 710def build_csv_map(parse_results: list[dict[str, Any]]) -> dict[str, str | list[str]]: 711 """Build a mapping of artifact keys to their parsed CSV file paths. 712 713 Split artifacts (e.g. EVTX) that produce multiple CSV files are 714 represented as a ``list[str]`` value. Single-file artifacts remain 715 a plain ``str`` so existing callers are unaffected. 716 717 Args: 718 parse_results: List of per-artifact parse result dicts. 719 720 Returns: 721 Dict mapping artifact key strings to a single CSV path string 722 or a list of CSV path strings for split artifacts. 723 """ 724 mapping: dict[str, str | list[str]] = {} 725 for result in parse_results: 726 artifact = str(result.get("artifact_key", "")).strip() 727 if not artifact or not result.get("success"): 728 continue 729 csv_paths = result.get("csv_paths") 730 if isinstance(csv_paths, list) and csv_paths: 731 non_empty = [str(p) for p in csv_paths if str(p).strip()] 732 if len(non_empty) > 1: 733 mapping[artifact] = non_empty 734 continue 735 if non_empty: 736 mapping[artifact] = non_empty[0] 737 continue 738 csv_path = str(result.get("csv_path", "")).strip() 739 if csv_path: 740 mapping[artifact] = csv_path 741 return mapping 742 743 744def read_audit_entries(case_dir: Path) -> list[dict[str, Any]]: 745 """Read all audit log entries from a case's ``audit.jsonl`` file. 746 747 Args: 748 case_dir: Path to the case's root directory. 749 750 Returns: 751 A list of parsed audit entry dicts, or empty list if missing. 752 """ 753 audit_path = case_dir / "audit.jsonl" 754 if not audit_path.exists(): 755 return [] 756 entries: list[dict[str, Any]] = [] 757 with audit_path.open("r", encoding="utf-8", errors="replace") as stream: 758 for line in stream: 759 text = line.strip() 760 if not text: 761 continue 762 try: 763 parsed = json.loads(text) 764 except json.JSONDecodeError: 765 continue 766 if isinstance(parsed, dict): 767 entries.append(parsed) 768 return entries 769 770 771# --------------------------------------------------------------------------- 772# Cleanup helpers 773# --------------------------------------------------------------------------- 774 775 776def _cleanup_parsed_output(case_dir: Path, prev_csv_output_dir: str) -> None: 777 """Remove stale parsed CSV output from a previous parse run. 778 779 Handles both the default ``case_dir/parsed`` location and external 780 directories configured via ``evidence.csv_output_dir``. Only the 781 case-specific parsed directory is removed — parent directories and 782 unrelated paths are never touched. 783 784 Args: 785 case_dir: Path to the case's root directory. 786 prev_csv_output_dir: The ``csv_output_dir`` value stored from the 787 previous parse run (may be empty). 788 """ 789 if not prev_csv_output_dir: 790 return 791 792 prev_path = Path(prev_csv_output_dir) 793 794 # Nothing to do if the directory doesn't exist. 795 if not prev_path.is_dir(): 796 return 797 798 resolved_prev = prev_path.resolve() 799 resolved_case = case_dir.resolve() 800 801 # If the previous output dir is inside the case directory, the normal 802 # ``case_dir/parsed`` cleanup already handles it — skip. 803 try: 804 if resolved_prev.is_relative_to(resolved_case): 805 return 806 except (TypeError, ValueError): 807 return 808 809 # Safety: refuse to delete filesystem roots or very short paths that 810 # could indicate misconfiguration. 811 if resolved_prev == resolved_prev.root or resolved_prev == resolved_prev.anchor: 812 LOGGER.warning( 813 "Refusing to remove parsed output at filesystem root: %s", 814 resolved_prev, 815 ) 816 return 817 if len(resolved_prev.parts) <= 2: 818 LOGGER.warning( 819 "Refusing to remove parsed output with suspiciously short path: %s", 820 resolved_prev, 821 ) 822 return 823 824 LOGGER.info("Removing stale external parsed output: %s", resolved_prev) 825 shutil.rmtree(resolved_prev, ignore_errors=True) 826 827 828# --------------------------------------------------------------------------- 829# Route handlers 830# --------------------------------------------------------------------------- 831 832evidence_bp = Blueprint("evidence", __name__) 833 834 835@evidence_bp.post("/api/cases/<case_id>/evidence") 836def intake_evidence(case_id: str) -> Response | tuple[Response, int]: 837 """Ingest evidence for an existing case. 838 839 Args: 840 case_id: UUID of the case. 841 842 Returns: 843 JSON with evidence metadata, hashes, and available artifacts. 844 """ 845 case = get_case(case_id) 846 if case is None: 847 return error_response(f"Case not found: {case_id}", 404) 848 849 with STATE_LOCK: 850 case_dir = case["case_dir"] 851 audit_logger = case["audit"] 852 853 try: 854 evidence_payload = resolve_evidence_payload(case_dir) 855 source_path = Path(evidence_payload["source_path"]) 856 dissect_path = Path(evidence_payload["dissect_path"]) 857 858 # Determine whether the user opted to skip hashing. 859 skip_hashing = False 860 if request.content_type and "multipart" in request.content_type: 861 skip_hashing = bool(request.form.get("skip_hashing")) 862 else: 863 payload = request.get_json(silent=True) or {} 864 if isinstance(payload, dict): 865 skip_hashing = bool(payload.get("skip_hashing")) 866 867 files_to_hash = evidence_payload.get("evidence_files_to_hash", []) 868 if skip_hashing: 869 hashes = {"sha256": "N/A (skipped)", "md5": "N/A (skipped)", "size_bytes": 0} 870 file_hashes = [] 871 elif files_to_hash: 872 file_hashes: list[dict[str, Any]] = [] 873 for fpath in files_to_hash: 874 h = dict(compute_hashes(fpath)) 875 h["path"] = fpath 876 file_hashes.append(h) 877 if len(file_hashes) == 1: 878 hashes = dict(file_hashes[0]) 879 else: 880 # Summary entry for backward compat — individual hashes 881 # are persisted separately in evidence_file_hashes. 882 hashes = { 883 "sha256": file_hashes[0]["sha256"], 884 "md5": file_hashes[0]["md5"], 885 "size_bytes": sum(h["size_bytes"] for h in file_hashes), 886 } 887 else: 888 hashes = {"sha256": "N/A (directory)", "md5": "N/A (directory)", "size_bytes": 0} 889 file_hashes = [] 890 hashes["filename"] = source_path.name 891 892 try: 893 with ForensicParser( 894 evidence_path=dissect_path, 895 case_dir=case_dir, 896 audit_logger=audit_logger, 897 ) as parser: 898 metadata = parser.get_image_metadata() 899 available_artifacts = parser.get_available_artifacts() 900 detected_os_type = parser.os_type 901 except Exception: 902 LOGGER.warning( 903 "Failed to open evidence with Dissect for case %s — " 904 "returning degraded response so the user sees the " 905 "unsupported-evidence screen.", 906 case_id, 907 exc_info=True, 908 ) 909 metadata = { 910 "hostname": "Unknown", 911 "os_version": "Unknown", 912 "domain": "Unknown", 913 } 914 available_artifacts = [] 915 detected_os_type = "unknown" 916 917 audit_logger.log( 918 "evidence_intake", 919 { 920 "filename": source_path.name, 921 "source_mode": evidence_payload["mode"], 922 "source_path": evidence_payload["source_path"], 923 "stored_path": evidence_payload["stored_path"], 924 "uploaded_files": list(evidence_payload.get("uploaded_files", [])), 925 "dissect_path": str(dissect_path), 926 "sha256": hashes["sha256"], 927 "md5": hashes["md5"], 928 "file_size_bytes": hashes["size_bytes"], 929 "evidence_file_hashes": [ 930 {"path": h["path"], "sha256": h["sha256"], "md5": h["md5"], "size_bytes": h["size_bytes"]} 931 for h in file_hashes 932 ], 933 }, 934 ) 935 audit_logger.log( 936 "image_opened", 937 { 938 "hostname": metadata.get("hostname", "Unknown"), 939 "os_version": metadata.get("os_version", "Unknown"), 940 "os_type": detected_os_type, 941 "domain": metadata.get("domain", "Unknown"), 942 "available_artifacts": [ 943 str(item.get("key")) 944 for item in available_artifacts 945 if item.get("available") 946 ], 947 }, 948 ) 949 950 with STATE_LOCK: 951 # Capture the previous csv_output_dir before clearing it so 952 # we can remove stale parsed CSVs even when they live outside 953 # the case directory (external csv_output_dir). 954 prev_csv_output_dir = str(case.get("csv_output_dir", "")).strip() 955 956 # Set new evidence metadata. 957 case["evidence_mode"] = evidence_payload["mode"] 958 case["source_path"] = evidence_payload["source_path"] 959 case["stored_path"] = evidence_payload["stored_path"] 960 case["uploaded_files"] = list(evidence_payload.get("uploaded_files", [])) 961 case["evidence_path"] = str(dissect_path) 962 case["evidence_hashes"] = hashes 963 case["evidence_file_hashes"] = [ 964 {"path": h["path"], "sha256": h["sha256"], "md5": h["md5"], "size_bytes": h["size_bytes"]} 965 for h in file_hashes 966 ] 967 case["image_metadata"] = metadata 968 case["os_type"] = detected_os_type 969 case["available_artifacts"] = available_artifacts 970 971 # Invalidate all downstream state derived from prior evidence. 972 case["parse_results"] = [] 973 case["artifact_csv_paths"] = {} 974 case["analysis_results"] = {} 975 case["csv_output_dir"] = "" 976 case["selected_artifacts"] = [] 977 case["analysis_artifacts"] = [] 978 case["artifact_options"] = [] 979 case["analysis_date_range"] = None 980 case["investigation_context"] = "" 981 case["status"] = "evidence_loaded" 982 983 # Clear progress stores so stale SSE streams are not reused. 984 PARSE_PROGRESS.pop(case_id, None) 985 ANALYSIS_PROGRESS.pop(case_id, None) 986 CHAT_PROGRESS.pop(case_id, None) 987 988 # Remove stale on-disk artifacts so disk fallbacks cannot 989 # resurrect results from prior evidence. 990 _cleanup_parsed_output(case_dir, prev_csv_output_dir) 991 parsed_dir = case_dir / "parsed" 992 if parsed_dir.is_dir(): 993 shutil.rmtree(parsed_dir, ignore_errors=True) 994 for stale_file in ("analysis_results.json", "prompt.txt", "chat_history.jsonl"): 995 stale_path = case_dir / stale_file 996 if stale_path.exists(): 997 stale_path.unlink(missing_ok=True) 998 999 os_warning = "" 1000 if detected_os_type == "unknown": 1001 os_warning = ( 1002 "Could not detect the operating system of this image. " 1003 "Artifact availability may be incomplete — verify that the " 1004 "image format is supported by Dissect." 1005 ) 1006 1007 response_data: dict[str, Any] = { 1008 "case_id": case_id, 1009 "source_mode": evidence_payload["mode"], 1010 "source_path": evidence_payload["source_path"], 1011 "evidence_path": str(dissect_path), 1012 "uploaded_files": list(evidence_payload.get("uploaded_files", [])), 1013 "hashes": hashes, 1014 "metadata": metadata, 1015 "os_type": detected_os_type, 1016 "available_artifacts": available_artifacts, 1017 } 1018 if os_warning: 1019 response_data["os_warning"] = os_warning 1020 1021 return success_response(response_data) 1022 except (ValueError, FileNotFoundError) as error: 1023 return error_response(str(error), 400) 1024 except Exception: 1025 LOGGER.exception("Evidence intake failed for case %s", case_id) 1026 return error_response( 1027 "Evidence intake failed due to an unexpected error. " 1028 "Confirm the evidence file is supported and try again.", 1029 500, 1030 ) 1031 1032 1033def generate_case_report(case_id: str) -> dict[str, Any]: 1034 """Generate the HTML forensic report for a case and save it to disk. 1035 1036 Performs hash verification, assembles analysis context, renders the 1037 report via :class:`ReportGenerator`, and logs the result to the audit 1038 trail. This function can be called from both the download route and 1039 from background tasks (e.g. auto-generation after analysis). 1040 1041 Args: 1042 case_id: UUID of the case. 1043 1044 Returns: 1045 A result dict with keys ``success`` (bool), and on success: 1046 ``report_path`` (:class:`~pathlib.Path`), ``hash_ok`` (bool). 1047 On failure: ``error`` (str). 1048 """ 1049 case = get_case(case_id) 1050 if case is None: 1051 return {"success": False, "error": f"Case not found: {case_id}"} 1052 1053 with STATE_LOCK: 1054 case_snapshot = dict(case) 1055 audit_logger = case["audit"] 1056 1057 hashes = dict(case_snapshot.get("evidence_hashes", {})) 1058 intake_sha256 = str(hashes.get("sha256", "")).strip() 1059 file_hash_entries = list(case_snapshot.get("evidence_file_hashes", [])) 1060 1061 hashing_skipped = intake_sha256 == "N/A (skipped)" 1062 1063 if hashing_skipped: 1064 hash_ok = True 1065 computed_sha256 = intake_sha256 1066 verify_details: list[dict[str, object]] = [] 1067 elif intake_sha256.startswith("N/A"): 1068 hash_ok = True 1069 computed_sha256 = intake_sha256 1070 verify_details = [] 1071 elif file_hash_entries: 1072 # Verify every file that was hashed at intake. 1073 hash_ok = True 1074 verify_details = [] 1075 for entry in file_hash_entries: 1076 fpath = Path(str(entry["path"])) 1077 expected = str(entry["sha256"]).strip().lower() 1078 if not fpath.exists(): 1079 verify_details.append({ 1080 "path": str(fpath), "match": False, 1081 "expected": expected, "computed": "FILE_MISSING", 1082 }) 1083 hash_ok = False 1084 continue 1085 ok, computed = verify_hash(fpath, expected, return_computed=True) 1086 verify_details.append({ 1087 "path": str(fpath), "match": ok, 1088 "expected": expected, "computed": computed, 1089 }) 1090 if not ok: 1091 hash_ok = False 1092 computed_sha256 = ( 1093 str(verify_details[0]["computed"]) if len(verify_details) == 1 1094 else "; ".join(str(d["computed"]) for d in verify_details) 1095 ) 1096 else: 1097 # Fallback for cases created before evidence_file_hashes existed. 1098 verification_path = resolve_hash_verification_path(case_snapshot) 1099 if verification_path is None or not intake_sha256: 1100 return {"success": False, "error": "Evidence hash context is missing for this case."} 1101 if not verification_path.exists(): 1102 return {"success": False, "error": "Evidence file is no longer available for hash verification."} 1103 hash_ok, computed_sha256 = verify_hash( 1104 verification_path, intake_sha256, return_computed=True, 1105 ) 1106 verify_details = [{ 1107 "path": str(verification_path), 1108 "match": hash_ok, 1109 "expected": intake_sha256, 1110 "computed": computed_sha256, 1111 }] 1112 1113 audit_logger.log( 1114 "hash_verification", 1115 { 1116 "expected_sha256": intake_sha256, 1117 "computed_sha256": computed_sha256, 1118 "match": hash_ok, 1119 "skipped": hashing_skipped, 1120 "verified_files": verify_details, 1121 }, 1122 ) 1123 1124 hashes["case_id"] = case_id 1125 hashes["expected_sha256"] = intake_sha256 1126 hashes["hash_verified"] = "skipped" if hashing_skipped else hash_ok 1127 1128 analysis_results = dict(case_snapshot.get("analysis_results", {})) 1129 1130 has_per_artifact = bool(analysis_results.get("per_artifact") or analysis_results.get("per_artifact_findings")) 1131 has_summary = bool( 1132 str(analysis_results.get("summary", "")).strip() 1133 or str(analysis_results.get("executive_summary", "")).strip() 1134 ) 1135 if not has_per_artifact and not has_summary: 1136 return { 1137 "success": False, 1138 "error": "Analysis has not been completed for this case.", 1139 } 1140 1141 analysis_results.setdefault("case_id", case_id) 1142 analysis_results.setdefault("case_name", str(case_snapshot.get("case_name", ""))) 1143 analysis_results.setdefault("per_artifact", []) 1144 analysis_results.setdefault("summary", "") 1145 1146 case_dir = case_snapshot["case_dir"] 1147 investigation_context = str(case_snapshot.get("investigation_context", "")) 1148 if not investigation_context: 1149 prompt_path = Path(case_dir) / "prompt.txt" 1150 if prompt_path.exists(): 1151 investigation_context = prompt_path.read_text(encoding="utf-8") 1152 1153 report_generator = ReportGenerator(cases_root=CASES_ROOT) 1154 report_path = report_generator.generate( 1155 analysis_results=analysis_results, 1156 image_metadata=dict(case_snapshot.get("image_metadata", {})), 1157 evidence_hashes=hashes, 1158 investigation_context=investigation_context, 1159 audit_log_entries=read_audit_entries(Path(case_dir)), 1160 ) 1161 audit_logger.log( 1162 "report_generated", 1163 {"report_filename": report_path.name, "hash_verified": hash_ok}, 1164 ) 1165 mark_case_status(case_id, "completed") 1166 1167 return {"success": True, "report_path": report_path, "hash_ok": hash_ok} 1168 1169 1170@evidence_bp.get("/api/cases/<case_id>/report") 1171def download_report(case_id: str) -> Response | tuple[Response, int]: 1172 """Generate and download the HTML forensic analysis report. 1173 1174 If a report was already auto-generated after analysis, serves the 1175 existing file. Otherwise generates a new one. 1176 1177 Args: 1178 case_id: UUID of the case. 1179 1180 Returns: 1181 The HTML report as an attachment, or error. 1182 """ 1183 case = get_case(case_id) 1184 if case is None: 1185 return error_response(f"Case not found: {case_id}", 404) 1186 1187 # Check if a report was already auto-generated after analysis. 1188 with STATE_LOCK: 1189 case_dir = case["case_dir"] 1190 reports_dir = Path(case_dir) / "reports" 1191 if reports_dir.is_dir(): 1192 existing = sorted(reports_dir.glob("report_*.html")) 1193 if existing: 1194 report_path = existing[-1] 1195 return send_file( 1196 report_path, 1197 as_attachment=True, 1198 download_name=report_path.name, 1199 mimetype="text/html", 1200 ) 1201 1202 result = generate_case_report(case_id) 1203 if not result["success"]: 1204 return error_response(str(result["error"]), 400) 1205 1206 report_path = result["report_path"] 1207 return send_file( 1208 report_path, 1209 as_attachment=True, 1210 download_name=report_path.name, 1211 mimetype="text/html", 1212 ) 1213 1214 1215@evidence_bp.get("/api/cases/<case_id>/csvs") 1216def download_csv_bundle(case_id: str) -> Response | tuple[Response, int]: 1217 """Download all parsed CSV files as a ZIP archive. 1218 1219 Args: 1220 case_id: UUID of the case. 1221 1222 Returns: 1223 ZIP archive as attachment, or 404 error. 1224 """ 1225 case = get_case(case_id) 1226 if case is None: 1227 return error_response(f"Case not found: {case_id}", 404) 1228 1229 with STATE_LOCK: 1230 case_snapshot = dict(case) 1231 1232 csv_paths = collect_case_csv_paths(case_snapshot) 1233 if not csv_paths: 1234 return error_response("No parsed CSV files available for this case.", 404) 1235 1236 reports_dir = Path(case_snapshot["case_dir"]) / "reports" 1237 reports_dir.mkdir(parents=True, exist_ok=True) 1238 timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") 1239 zip_path = reports_dir / f"parsed_csvs_{timestamp}.zip" 1240 used_names: set[str] = set() 1241 with ZipFile(zip_path, "w", compression=ZIP_DEFLATED) as archive: 1242 for csv_path in csv_paths: 1243 base_name = csv_path.name 1244 arcname = base_name 1245 counter = 1 1246 while arcname in used_names: 1247 stem = Path(base_name).stem 1248 suffix = Path(base_name).suffix 1249 arcname = f"{stem}_{counter}{suffix}" 1250 counter += 1 1251 used_names.add(arcname) 1252 archive.write(csv_path, arcname=arcname) 1253 1254 return send_file( 1255 zip_path, 1256 as_attachment=True, 1257 download_name=f"{case_id}_parsed_csvs.zip", 1258 mimetype="application/zip", 1259 )
514def resolve_evidence_payload(case_dir: Path) -> dict[str, Any]: 515 """Resolve the evidence source from the current request. 516 517 Handles upload and JSON path reference modes. Archives are extracted. 518 519 Args: 520 case_dir: Path to the case's root directory. 521 522 Returns: 523 Dict with ``mode``, ``filename``, ``source_path``, ``stored_path``, 524 ``dissect_path``, and ``uploaded_files``. 525 526 Raises: 527 ValueError: If no evidence provided or archive extraction fails. 528 FileNotFoundError: If the referenced path does not exist. 529 """ 530 evidence_dir = case_dir / "evidence" 531 evidence_dir.mkdir(parents=True, exist_ok=True) 532 533 uploaded_files = _collect_uploaded_files() 534 uploaded_paths: list[Path] = [] 535 if uploaded_files: 536 aift_config = current_app.config.get("AIFT_CONFIG", {}) 537 threshold_mb = aift_config.get("evidence", {}).get("large_file_threshold_mb", 0) 538 max_bytes = int(threshold_mb) * 1024 * 1024 if threshold_mb and threshold_mb > 0 else 0 539 cumulative_bytes = 0 540 timestamp = int(time.time()) 541 for index, uploaded_file in enumerate(uploaded_files, start=1): 542 filename = secure_filename(uploaded_file.filename) or f"evidence_{timestamp}_{index}.bin" 543 stored_path = _unique_destination(evidence_dir / filename) 544 cumulative_bytes = _save_with_limit(uploaded_file, stored_path, max_bytes, cumulative_bytes) 545 uploaded_paths.append(stored_path) 546 547 source_path = _resolve_uploaded_dissect_path(uploaded_paths) 548 mode = "upload" 549 else: 550 payload = request.get_json(silent=True) or {} 551 if not isinstance(payload, dict): 552 raise ValueError("Request body must be a JSON object.") 553 path_value = payload.get("path") 554 if not isinstance(path_value, str): 555 raise ValueError( 556 "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}." 557 ) 558 normalized_path = _normalize_user_path(path_value) 559 if not normalized_path: 560 raise ValueError( 561 "Provide evidence via multipart upload or JSON body with {'path': 'C:\\Evidence\\disk-image.E01'}." 562 ) 563 source_path = Path(normalized_path).expanduser() 564 if not source_path.exists(): 565 raise FileNotFoundError(f"Evidence path does not exist: {source_path}") 566 if not source_path.is_file() and not source_path.is_dir(): 567 raise ValueError(f"Evidence path is not a file or directory: {source_path}") 568 uploaded_paths = [] 569 mode = "path" 570 571 # Extract archives into the evidence directory. 572 _ARCHIVE_EXTRACTORS = { 573 ".zip": _extract_zip, 574 ".tar": _extract_tar, 575 ".gz": _extract_tar, 576 ".tgz": _extract_tar, 577 ".7z": _extract_7z, 578 } 579 dissect_path = source_path 580 suffix = source_path.suffix.lower() 581 extractor = _ARCHIVE_EXTRACTORS.get(suffix) 582 if source_path.is_file() and extractor is not None: 583 extract_dir = _make_extract_dir(evidence_dir, source_path) 584 dissect_path = extractor(source_path, extract_dir) 585 586 # Determine the files to hash for integrity verification. 587 # Archives are intentionally verified as the original container file. 588 # Split-image uploads hash all uploaded segments, and path-based split 589 # images hash all matching sibling segments on disk. Directories get N/A. 590 if source_path.is_file() and len(uploaded_paths) > 1: 591 evidence_files_to_hash = sorted(set(str(p) for p in uploaded_paths)) 592 elif source_path.is_file(): 593 segment_paths = _collect_segment_group_paths(source_path) 594 evidence_files_to_hash = [str(path) for path in segment_paths] if segment_paths else [str(source_path)] 595 else: 596 evidence_files_to_hash = [] 597 598 return { 599 "mode": mode, 600 "filename": source_path.name, 601 "source_path": str(source_path), 602 "stored_path": str(source_path) if mode == "upload" else "", 603 "dissect_path": str(dissect_path), 604 "uploaded_files": [str(path) for path in uploaded_paths], 605 "evidence_files_to_hash": evidence_files_to_hash, 606 }
Resolve the evidence source from the current request.
Handles upload and JSON path reference modes. Archives are extracted.
Arguments:
- case_dir: Path to the case's root directory.
Returns:
Dict with
mode,filename,source_path,stored_path,dissect_path, anduploaded_files.
Raises:
- ValueError: If no evidence provided or archive extraction fails.
- FileNotFoundError: If the referenced path does not exist.
613def resolve_hash_verification_path(case: dict[str, Any]) -> Path | None: 614 """Resolve the file path for evidence hash verification. 615 616 Args: 617 case: The in-memory case state dictionary. 618 619 Returns: 620 Path to the evidence file, or ``None``. 621 """ 622 source_path = str(case.get("source_path", "")).strip() 623 if source_path: 624 return Path(source_path) 625 evidence_path = str(case.get("evidence_path", "")).strip() 626 if evidence_path: 627 return Path(evidence_path) 628 return None
Resolve the file path for evidence hash verification.
Arguments:
- case: The in-memory case state dictionary.
Returns:
Path to the evidence file, or
None.
631def resolve_case_csv_output_dir(case: dict[str, Any], config_snapshot: dict[str, Any]) -> Path: 632 """Resolve the output directory for parsed CSV files. 633 634 Args: 635 case: The in-memory case state dictionary. 636 config_snapshot: Application configuration snapshot. 637 638 Returns: 639 Absolute ``Path`` to the CSV output directory. 640 """ 641 config = config_snapshot if isinstance(config_snapshot, dict) else {} 642 evidence_config = config.get("evidence", {}) if isinstance(config, dict) else {} 643 configured = str(evidence_config.get("csv_output_dir", "")).strip() if isinstance(evidence_config, dict) else "" 644 case_dir = Path(case["case_dir"]) 645 case_id = str(case.get("case_id", "")).strip() 646 647 if not configured: 648 return case_dir / "parsed" 649 650 output_root = Path(configured).expanduser() 651 if not output_root.is_absolute(): 652 output_root = (PROJECT_ROOT / output_root).resolve() 653 if case_id: 654 return output_root / case_id / "parsed" 655 return output_root / "parsed"
Resolve the output directory for parsed CSV files.
Arguments:
- case: The in-memory case state dictionary.
- config_snapshot: Application configuration snapshot.
Returns:
Absolute
Pathto the CSV output directory.
658def collect_case_csv_paths(case: dict[str, Any]) -> list[Path]: 659 """Collect all parsed CSV file paths for a case. 660 661 Args: 662 case: The in-memory case state dictionary. 663 664 Returns: 665 A sorted list of existing CSV file paths. 666 """ 667 collected: list[Path] = [] 668 seen: set[str] = set() 669 670 def _add_path(candidate: Any) -> None: 671 """Add a CSV path if it exists and is not a duplicate.""" 672 path_text = str(candidate or "").strip() 673 if not path_text: 674 return 675 path = Path(path_text) 676 if not path.exists() or not path.is_file(): 677 return 678 key = str(path.resolve()) 679 if key in seen: 680 return 681 seen.add(key) 682 collected.append(path) 683 684 csv_map = case.get("artifact_csv_paths") 685 if isinstance(csv_map, dict): 686 for csv_path in csv_map.values(): 687 if isinstance(csv_path, list): 688 for p in csv_path: 689 _add_path(p) 690 else: 691 _add_path(csv_path) 692 693 parse_results = case.get("parse_results") 694 if isinstance(parse_results, list): 695 for result in parse_results: 696 if not isinstance(result, dict) or not result.get("success"): 697 continue 698 _add_path(result.get("csv_path")) 699 csv_paths = result.get("csv_paths") 700 if isinstance(csv_paths, list): 701 for path in csv_paths: 702 _add_path(path) 703 704 if collected: 705 return sorted(collected, key=lambda path: path.name.lower()) 706 707 parsed_dir = Path(case["case_dir"]) / "parsed" 708 return sorted(path for path in parsed_dir.glob("*.csv") if path.is_file())
Collect all parsed CSV file paths for a case.
Arguments:
- case: The in-memory case state dictionary.
Returns:
A sorted list of existing CSV file paths.
711def build_csv_map(parse_results: list[dict[str, Any]]) -> dict[str, str | list[str]]: 712 """Build a mapping of artifact keys to their parsed CSV file paths. 713 714 Split artifacts (e.g. EVTX) that produce multiple CSV files are 715 represented as a ``list[str]`` value. Single-file artifacts remain 716 a plain ``str`` so existing callers are unaffected. 717 718 Args: 719 parse_results: List of per-artifact parse result dicts. 720 721 Returns: 722 Dict mapping artifact key strings to a single CSV path string 723 or a list of CSV path strings for split artifacts. 724 """ 725 mapping: dict[str, str | list[str]] = {} 726 for result in parse_results: 727 artifact = str(result.get("artifact_key", "")).strip() 728 if not artifact or not result.get("success"): 729 continue 730 csv_paths = result.get("csv_paths") 731 if isinstance(csv_paths, list) and csv_paths: 732 non_empty = [str(p) for p in csv_paths if str(p).strip()] 733 if len(non_empty) > 1: 734 mapping[artifact] = non_empty 735 continue 736 if non_empty: 737 mapping[artifact] = non_empty[0] 738 continue 739 csv_path = str(result.get("csv_path", "")).strip() 740 if csv_path: 741 mapping[artifact] = csv_path 742 return mapping
Build a mapping of artifact keys to their parsed CSV file paths.
Split artifacts (e.g. EVTX) that produce multiple CSV files are
represented as a list[str] value. Single-file artifacts remain
a plain str so existing callers are unaffected.
Arguments:
- parse_results: List of per-artifact parse result dicts.
Returns:
Dict mapping artifact key strings to a single CSV path string or a list of CSV path strings for split artifacts.
745def read_audit_entries(case_dir: Path) -> list[dict[str, Any]]: 746 """Read all audit log entries from a case's ``audit.jsonl`` file. 747 748 Args: 749 case_dir: Path to the case's root directory. 750 751 Returns: 752 A list of parsed audit entry dicts, or empty list if missing. 753 """ 754 audit_path = case_dir / "audit.jsonl" 755 if not audit_path.exists(): 756 return [] 757 entries: list[dict[str, Any]] = [] 758 with audit_path.open("r", encoding="utf-8", errors="replace") as stream: 759 for line in stream: 760 text = line.strip() 761 if not text: 762 continue 763 try: 764 parsed = json.loads(text) 765 except json.JSONDecodeError: 766 continue 767 if isinstance(parsed, dict): 768 entries.append(parsed) 769 return entries
Read all audit log entries from a case's audit.jsonl file.
Arguments:
- case_dir: Path to the case's root directory.
Returns:
A list of parsed audit entry dicts, or empty list if missing.
1034def generate_case_report(case_id: str) -> dict[str, Any]: 1035 """Generate the HTML forensic report for a case and save it to disk. 1036 1037 Performs hash verification, assembles analysis context, renders the 1038 report via :class:`ReportGenerator`, and logs the result to the audit 1039 trail. This function can be called from both the download route and 1040 from background tasks (e.g. auto-generation after analysis). 1041 1042 Args: 1043 case_id: UUID of the case. 1044 1045 Returns: 1046 A result dict with keys ``success`` (bool), and on success: 1047 ``report_path`` (:class:`~pathlib.Path`), ``hash_ok`` (bool). 1048 On failure: ``error`` (str). 1049 """ 1050 case = get_case(case_id) 1051 if case is None: 1052 return {"success": False, "error": f"Case not found: {case_id}"} 1053 1054 with STATE_LOCK: 1055 case_snapshot = dict(case) 1056 audit_logger = case["audit"] 1057 1058 hashes = dict(case_snapshot.get("evidence_hashes", {})) 1059 intake_sha256 = str(hashes.get("sha256", "")).strip() 1060 file_hash_entries = list(case_snapshot.get("evidence_file_hashes", [])) 1061 1062 hashing_skipped = intake_sha256 == "N/A (skipped)" 1063 1064 if hashing_skipped: 1065 hash_ok = True 1066 computed_sha256 = intake_sha256 1067 verify_details: list[dict[str, object]] = [] 1068 elif intake_sha256.startswith("N/A"): 1069 hash_ok = True 1070 computed_sha256 = intake_sha256 1071 verify_details = [] 1072 elif file_hash_entries: 1073 # Verify every file that was hashed at intake. 1074 hash_ok = True 1075 verify_details = [] 1076 for entry in file_hash_entries: 1077 fpath = Path(str(entry["path"])) 1078 expected = str(entry["sha256"]).strip().lower() 1079 if not fpath.exists(): 1080 verify_details.append({ 1081 "path": str(fpath), "match": False, 1082 "expected": expected, "computed": "FILE_MISSING", 1083 }) 1084 hash_ok = False 1085 continue 1086 ok, computed = verify_hash(fpath, expected, return_computed=True) 1087 verify_details.append({ 1088 "path": str(fpath), "match": ok, 1089 "expected": expected, "computed": computed, 1090 }) 1091 if not ok: 1092 hash_ok = False 1093 computed_sha256 = ( 1094 str(verify_details[0]["computed"]) if len(verify_details) == 1 1095 else "; ".join(str(d["computed"]) for d in verify_details) 1096 ) 1097 else: 1098 # Fallback for cases created before evidence_file_hashes existed. 1099 verification_path = resolve_hash_verification_path(case_snapshot) 1100 if verification_path is None or not intake_sha256: 1101 return {"success": False, "error": "Evidence hash context is missing for this case."} 1102 if not verification_path.exists(): 1103 return {"success": False, "error": "Evidence file is no longer available for hash verification."} 1104 hash_ok, computed_sha256 = verify_hash( 1105 verification_path, intake_sha256, return_computed=True, 1106 ) 1107 verify_details = [{ 1108 "path": str(verification_path), 1109 "match": hash_ok, 1110 "expected": intake_sha256, 1111 "computed": computed_sha256, 1112 }] 1113 1114 audit_logger.log( 1115 "hash_verification", 1116 { 1117 "expected_sha256": intake_sha256, 1118 "computed_sha256": computed_sha256, 1119 "match": hash_ok, 1120 "skipped": hashing_skipped, 1121 "verified_files": verify_details, 1122 }, 1123 ) 1124 1125 hashes["case_id"] = case_id 1126 hashes["expected_sha256"] = intake_sha256 1127 hashes["hash_verified"] = "skipped" if hashing_skipped else hash_ok 1128 1129 analysis_results = dict(case_snapshot.get("analysis_results", {})) 1130 1131 has_per_artifact = bool(analysis_results.get("per_artifact") or analysis_results.get("per_artifact_findings")) 1132 has_summary = bool( 1133 str(analysis_results.get("summary", "")).strip() 1134 or str(analysis_results.get("executive_summary", "")).strip() 1135 ) 1136 if not has_per_artifact and not has_summary: 1137 return { 1138 "success": False, 1139 "error": "Analysis has not been completed for this case.", 1140 } 1141 1142 analysis_results.setdefault("case_id", case_id) 1143 analysis_results.setdefault("case_name", str(case_snapshot.get("case_name", ""))) 1144 analysis_results.setdefault("per_artifact", []) 1145 analysis_results.setdefault("summary", "") 1146 1147 case_dir = case_snapshot["case_dir"] 1148 investigation_context = str(case_snapshot.get("investigation_context", "")) 1149 if not investigation_context: 1150 prompt_path = Path(case_dir) / "prompt.txt" 1151 if prompt_path.exists(): 1152 investigation_context = prompt_path.read_text(encoding="utf-8") 1153 1154 report_generator = ReportGenerator(cases_root=CASES_ROOT) 1155 report_path = report_generator.generate( 1156 analysis_results=analysis_results, 1157 image_metadata=dict(case_snapshot.get("image_metadata", {})), 1158 evidence_hashes=hashes, 1159 investigation_context=investigation_context, 1160 audit_log_entries=read_audit_entries(Path(case_dir)), 1161 ) 1162 audit_logger.log( 1163 "report_generated", 1164 {"report_filename": report_path.name, "hash_verified": hash_ok}, 1165 ) 1166 mark_case_status(case_id, "completed") 1167 1168 return {"success": True, "report_path": report_path, "hash_ok": hash_ok}
Generate the HTML forensic report for a case and save it to disk.
Performs hash verification, assembles analysis context, renders the
report via ReportGenerator, and logs the result to the audit
trail. This function can be called from both the download route and
from background tasks (e.g. auto-generation after analysis).
Arguments:
- case_id: UUID of the case.
Returns:
A result dict with keys
success(bool), and on success:report_path(~pathlib.Path),hash_ok(bool). On failure:error(str).