app.parser.core
Dissect integration layer for forensic artifact parsing.
Wraps the Dissect framework's Target API to extract Windows forensic
artifacts from disk images (E01, VMDK, VHD, raw, etc.) and stream them
into CSV files for downstream AI analysis.
Key responsibilities:
- Evidence opening --
ForensicParseropens a DissectTargetin read-only mode from any supported container format. - CSV streaming -- Records are streamed to CSV one row at a time, never materialised in memory, allowing safe handling of high-volume artifacts such as EVTX and MFT (millions of records).
- EVTX splitting -- Event log records are automatically partitioned
by channel/provider into separate CSV files, with additional part files
created when a single channel exceeds
EVTX_MAX_RECORDS_PER_FILE. - Schema evolution -- When a Dissect plugin yields records with varying schemas, CSV headers are expanded dynamically and the file is rewritten once to ensure a consistent header row.
Attributes:
- UNKNOWN_VALUE: Sentinel string used when a target attribute cannot be read.
- EVTX_MAX_RECORDS_PER_FILE: Maximum rows per EVTX CSV part file.
- MAX_RECORDS_PER_ARTIFACT: Hard cap on rows written for any single artifact.
1"""Dissect integration layer for forensic artifact parsing. 2 3Wraps the Dissect framework's ``Target`` API to extract Windows forensic 4artifacts from disk images (E01, VMDK, VHD, raw, etc.) and stream them 5into CSV files for downstream AI analysis. 6 7Key responsibilities: 8 9* **Evidence opening** -- :class:`ForensicParser` opens a Dissect 10 ``Target`` in read-only mode from any supported container format. 11* **CSV streaming** -- Records are streamed to CSV one row at a time, 12 never materialised in memory, allowing safe handling of high-volume 13 artifacts such as EVTX and MFT (millions of records). 14* **EVTX splitting** -- Event log records are automatically partitioned 15 by channel/provider into separate CSV files, with additional part files 16 created when a single channel exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`. 17* **Schema evolution** -- When a Dissect plugin yields records with 18 varying schemas, CSV headers are expanded dynamically and the file is 19 rewritten once to ensure a consistent header row. 20 21Attributes: 22 UNKNOWN_VALUE: Sentinel string used when a target attribute cannot be read. 23 EVTX_MAX_RECORDS_PER_FILE: Maximum rows per EVTX CSV part file. 24 MAX_RECORDS_PER_ARTIFACT: Hard cap on rows written for any single artifact. 25""" 26 27from __future__ import annotations 28 29import csv 30from datetime import date, datetime, time 31from pathlib import Path 32import re 33import traceback 34from types import TracebackType 35from time import perf_counter 36import logging 37from typing import Any, Callable, Iterable 38 39from dissect.target import Target 40from dissect.target.exceptions import PluginError, UnsupportedPluginError 41 42from .registry import get_artifact_registry 43 44__all__ = ["ForensicParser"] 45 46logger = logging.getLogger(__name__) 47 48UNKNOWN_VALUE = "Unknown" 49EVTX_MAX_RECORDS_PER_FILE = 500_000 50MAX_RECORDS_PER_ARTIFACT = 1_000_000 51 52 53class ForensicParser: 54 """Parse supported forensic artifacts from a Dissect target into CSV files. 55 56 Opens a disk image via Dissect's ``Target.open()``, queries available 57 artifacts, and streams their records to CSV files in the case's parsed 58 directory. Implements the context manager protocol for deterministic 59 resource cleanup. 60 61 Attributes: 62 evidence_path: Path to the source evidence file. 63 case_dir: Root directory for this forensic case. 64 audit_logger: :class:`~app.audit.AuditLogger` for recording actions. 65 parsed_dir: Directory where output CSV files are written. 66 target: The open Dissect ``Target`` handle. 67 """ 68 69 def __init__( 70 self, 71 evidence_path: str | Path, 72 case_dir: str | Path, 73 audit_logger: Any, 74 parsed_dir: str | Path | None = None, 75 ) -> None: 76 """Initialise the parser and open the Dissect target. 77 78 Args: 79 evidence_path: Path to the disk image or evidence container. 80 case_dir: Case-specific directory for output and audit data. 81 audit_logger: Logger instance for writing audit trail entries. 82 parsed_dir: Optional override for the CSV output directory. 83 Defaults to ``<case_dir>/parsed/``. 84 """ 85 self.evidence_path = Path(evidence_path) 86 self.case_dir = Path(case_dir) 87 self.audit_logger = audit_logger 88 self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed" 89 self.parsed_dir.mkdir(parents=True, exist_ok=True) 90 self.target = Target.open(self.evidence_path) 91 self._closed = False 92 93 try: 94 self.os_type: str = str(self.target.os).strip().lower() 95 except Exception: 96 self.os_type = "unknown" 97 98 def close(self) -> None: 99 """Close the underlying Dissect target handle.""" 100 if self._closed: 101 return 102 103 try: 104 close_method = getattr(self.target, "close", None) 105 except Exception: 106 close_method = None 107 if callable(close_method): 108 close_method() 109 self._closed = True 110 111 def __enter__(self) -> ForensicParser: 112 """Enter the runtime context and return the parser instance.""" 113 return self 114 115 def __exit__( 116 self, 117 exc_type: type[BaseException] | None, 118 exc_val: BaseException | None, 119 exc_tb: TracebackType | None, 120 ) -> bool: 121 """Exit the runtime context, closing the Dissect target.""" 122 del exc_type, exc_val, exc_tb 123 self.close() 124 return False 125 126 def get_image_metadata(self) -> dict[str, str]: 127 """Extract key system metadata from the Dissect target. 128 129 Attempts multiple attribute name variants for each field (e.g. 130 ``hostname``, ``computer_name``, ``name``) to accommodate 131 different OS profiles. 132 133 Returns: 134 Dictionary with keys ``hostname``, ``os_version``, ``domain``, 135 ``ips``, ``timezone``, and ``install_date``. 136 """ 137 hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name"))) 138 os_version = str(self._safe_read_target_attribute(("os_version", "version"))) 139 domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup"))) 140 timezone = str(self._safe_read_target_attribute(("timezone", "tz"))) 141 install_date = str(self._safe_read_target_attribute(("install_date", "installdate"))) 142 143 ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip")) 144 if isinstance(ips_value, (list, tuple, set)): 145 ips = ", ".join(str(value) for value in ips_value if value not in (None, "")) 146 if not ips: 147 ips = UNKNOWN_VALUE 148 else: 149 ips = str(ips_value) 150 151 return { 152 "hostname": hostname, 153 "os_version": os_version, 154 "domain": domain, 155 "ips": ips, 156 "timezone": timezone, 157 "install_date": install_date, 158 } 159 160 def get_available_artifacts(self) -> list[dict[str, Any]]: 161 """Return the artifact registry annotated with availability flags. 162 163 Detects the target OS via ``target.os`` and selects the 164 appropriate artifact registry (Windows or Linux). Probes the 165 Dissect target for each registered artifact and sets an 166 ``available`` boolean on the returned metadata dictionaries. 167 168 Returns: 169 List of artifact metadata dicts, each augmented with ``key`` 170 and ``available`` fields. 171 """ 172 registry = get_artifact_registry(self.os_type) 173 available_artifacts: list[dict[str, Any]] = [] 174 for artifact_key, artifact_details in registry.items(): 175 function_name = str(artifact_details.get("function", artifact_key)) 176 try: 177 available = bool(self.target.has_function(function_name)) 178 except (PluginError, UnsupportedPluginError): 179 available = False 180 181 available_artifact = dict(artifact_details) 182 available_artifact["key"] = artifact_key 183 available_artifact["available"] = available 184 available_artifacts.append(available_artifact) 185 186 return available_artifacts 187 188 def _call_target_function(self, function_name: str) -> Any: 189 """Invoke a Dissect function on the target, including namespaced functions. 190 191 For simple names like ``"shimcache"`` it calls ``target.shimcache()``. 192 For dotted names like ``"browser.history"`` it traverses the namespace 193 chain (``target.browser.history()``) and calls the final attribute. 194 """ 195 if "." not in function_name: 196 function = getattr(self.target, function_name) 197 return function() if callable(function) else function 198 199 current: Any = self.target 200 parts = function_name.split(".") 201 try: 202 for namespace in parts: 203 current = getattr(current, namespace) 204 except Exception: 205 logger.warning( 206 "Failed to resolve nested function '%s' (stopped at '%s')", 207 function_name, 208 namespace, 209 exc_info=True, 210 ) 211 raise 212 213 return current() if callable(current) else current 214 215 def parse_artifact( 216 self, 217 artifact_key: str, 218 progress_callback: Callable[..., None] | None = None, 219 ) -> dict[str, Any]: 220 """Parse a single artifact and stream its records to one or more CSV files. 221 222 Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``) 223 to the audit trail. EVTX artifacts are split by channel/provider 224 into separate CSV files. 225 226 Args: 227 artifact_key: Key from the OS-specific artifact registry identifying 228 the artifact to parse. 229 progress_callback: Optional callback invoked every 1 000 records 230 with progress information. 231 232 Returns: 233 Result dictionary with keys ``csv_path``, ``record_count``, 234 ``duration_seconds``, ``success``, and ``error``. EVTX 235 results also include a ``csv_paths`` list. 236 """ 237 registry = get_artifact_registry(self.os_type) 238 artifact = registry.get(artifact_key) 239 if artifact is None: 240 return { 241 "csv_path": "", 242 "record_count": 0, 243 "duration_seconds": 0.0, 244 "success": False, 245 "error": f"Unknown artifact key: {artifact_key}", 246 } 247 248 function_name = str(artifact.get("function", artifact_key)) 249 start_time = perf_counter() 250 record_count = 0 251 csv_path = "" 252 253 self.audit_logger.log( 254 "parsing_started", 255 { 256 "artifact_key": artifact_key, 257 "artifact_name": artifact.get("name", artifact_key), 258 "function": function_name, 259 }, 260 ) 261 262 try: 263 records = self._call_target_function(function_name) 264 if self._is_evtx_artifact(function_name): 265 all_csv_paths, record_count = self._write_evtx_records( 266 artifact_key=artifact_key, 267 records=records, 268 progress_callback=progress_callback, 269 ) 270 if all_csv_paths: 271 csv_path = str(all_csv_paths[0]) 272 else: 273 empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 274 empty_output.touch(exist_ok=True) 275 csv_path = str(empty_output) 276 all_csv_paths = [empty_output] 277 else: 278 csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 279 record_count = self._write_records_to_csv( 280 records=records, 281 csv_output_path=csv_output, 282 progress_callback=progress_callback, 283 artifact_key=artifact_key, 284 ) 285 csv_path = str(csv_output) 286 287 duration = perf_counter() - start_time 288 self.audit_logger.log( 289 "parsing_completed", 290 { 291 "artifact_key": artifact_key, 292 "artifact_name": artifact.get("name", artifact_key), 293 "function": function_name, 294 "record_count": record_count, 295 "duration_seconds": round(duration, 6), 296 "csv_path": csv_path, 297 }, 298 ) 299 300 result: dict[str, Any] = { 301 "csv_path": csv_path, 302 "record_count": record_count, 303 "duration_seconds": duration, 304 "success": True, 305 "error": None, 306 } 307 if self._is_evtx_artifact(function_name): 308 result["csv_paths"] = [str(p) for p in all_csv_paths] 309 return result 310 except Exception as error: 311 duration = perf_counter() - start_time 312 error_message = str(error) 313 error_traceback = traceback.format_exc() 314 self.audit_logger.log( 315 "parsing_failed", 316 { 317 "artifact_key": artifact_key, 318 "artifact_name": artifact.get("name", artifact_key), 319 "function": function_name, 320 "error": error_message, 321 "traceback": error_traceback, 322 "duration_seconds": round(duration, 6), 323 }, 324 ) 325 return { 326 "csv_path": "", 327 "record_count": record_count, 328 "duration_seconds": duration, 329 "success": False, 330 "error": error_message, 331 } 332 333 def _safe_read_target_attribute(self, attribute_names: tuple[str, ...]) -> Any: 334 """Read a target attribute by trying multiple candidate names. 335 336 Args: 337 attribute_names: Ordered tuple of attribute names to try. 338 339 Returns: 340 The first non-empty value found, or :data:`UNKNOWN_VALUE`. 341 """ 342 for attribute_name in attribute_names: 343 try: 344 value = getattr(self.target, attribute_name) 345 except Exception: 346 continue 347 348 if callable(value): 349 try: 350 value = value() 351 except Exception: 352 continue 353 354 if value in (None, ""): 355 continue 356 357 return value 358 359 return UNKNOWN_VALUE 360 361 def _write_records_to_csv( 362 self, 363 records: Iterable[Any], 364 csv_output_path: Path, 365 progress_callback: Callable[..., None] | None, 366 artifact_key: str, 367 ) -> int: 368 """Stream Dissect records to a CSV file, handling dynamic schemas. 369 370 If the record schema expands mid-stream (new columns appear), the 371 file is rewritten at the end with the complete header row via 372 :meth:`_rewrite_csv_with_expanded_headers`. 373 374 Args: 375 records: Iterable of Dissect record objects. 376 csv_output_path: Destination CSV file path. 377 progress_callback: Optional progress callback. 378 artifact_key: Artifact key for audit/progress reporting. 379 380 Returns: 381 Total number of records written. 382 """ 383 record_count = 0 384 fieldnames: list[str] = [] 385 fieldnames_set: set[str] = set() 386 headers_expanded = False 387 388 with csv_output_path.open("w", newline="", encoding="utf-8") as csv_file: 389 writer: csv.DictWriter | None = None 390 for record in records: 391 record_dict = self._record_to_dict(record) 392 393 new_keys = [str(k) for k in record_dict.keys() if str(k) not in fieldnames_set] 394 if new_keys: 395 fieldnames.extend(new_keys) 396 fieldnames_set.update(new_keys) 397 if writer is not None: 398 headers_expanded = True 399 writer = csv.DictWriter( 400 csv_file, fieldnames=fieldnames, restval="", extrasaction="ignore", 401 ) 402 if not headers_expanded: 403 writer.writeheader() 404 405 row = { 406 fn: self._stringify_csv_value(record_dict.get(fn)) 407 for fn in fieldnames 408 } 409 if writer is not None: 410 writer.writerow(row) 411 record_count += 1 412 413 if record_count >= MAX_RECORDS_PER_ARTIFACT: 414 self.audit_logger.log( 415 "parsing_capped", 416 { 417 "artifact_key": artifact_key, 418 "record_count": record_count, 419 "max_records": MAX_RECORDS_PER_ARTIFACT, 420 "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows", 421 }, 422 ) 423 break 424 425 if progress_callback is not None and record_count % 1000 == 0: 426 self._emit_progress(progress_callback, artifact_key, record_count) 427 428 if headers_expanded and record_count > 0: 429 self._rewrite_csv_with_expanded_headers(csv_output_path, fieldnames) 430 431 if progress_callback is not None: 432 self._emit_progress(progress_callback, artifact_key, record_count) 433 434 return record_count 435 436 def _rewrite_csv_with_expanded_headers(self, csv_path: Path, fieldnames: list[str]) -> None: 437 """Rewrite a CSV whose header is incomplete due to mid-stream schema changes. 438 439 Because fieldnames are only ever appended, row values are positionally 440 aligned: shorter rows (written before expansion) just need empty-string 441 padding for the new trailing columns. 442 """ 443 temp_path = csv_path.with_suffix(".csv.tmp") 444 num_fields = len(fieldnames) 445 with csv_path.open("r", newline="", encoding="utf-8") as src, \ 446 temp_path.open("w", newline="", encoding="utf-8") as dst: 447 reader = csv.reader(src) 448 csv_writer = csv.writer(dst) 449 csv_writer.writerow(fieldnames) 450 next(reader, None) # skip original (incomplete) header 451 for row in reader: 452 if len(row) < num_fields: 453 row.extend([""] * (num_fields - len(row))) 454 csv_writer.writerow(row) 455 temp_path.replace(csv_path) 456 457 def _write_evtx_records( 458 self, 459 artifact_key: str, 460 records: Any, 461 progress_callback: Callable[..., None] | None, 462 ) -> tuple[list[Path], int]: 463 """Stream EVTX records into per-channel CSV files with automatic splitting. 464 465 Records are grouped by their channel or provider name. When a 466 single group exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`, a new 467 part file is created. 468 469 Args: 470 artifact_key: Artifact key for filename construction. 471 records: Iterable of Dissect EVTX record objects. 472 progress_callback: Optional progress callback. 473 474 Returns: 475 Tuple of ``(csv_paths, total_record_count)``. 476 """ 477 writers: dict[str, dict[str, Any]] = {} 478 csv_paths: list[Path] = [] 479 record_count = 0 480 481 try: 482 for record in records: 483 if record_count >= MAX_RECORDS_PER_ARTIFACT: 484 self.audit_logger.log( 485 "parsing_capped", 486 { 487 "artifact_key": artifact_key, 488 "record_count": record_count, 489 "max_records": MAX_RECORDS_PER_ARTIFACT, 490 "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows", 491 }, 492 ) 493 break 494 495 record_dict = self._record_to_dict(record) 496 group_name = self._extract_evtx_group_name(record_dict) 497 498 writer_state = writers.get(group_name) 499 if writer_state is None: 500 writer_state = self._open_evtx_writer(artifact_key=artifact_key, group_name=group_name, part=1) 501 writers[group_name] = writer_state 502 csv_paths.append(writer_state["path"]) 503 elif writer_state["records_in_file"] >= EVTX_MAX_RECORDS_PER_FILE: 504 writer_state["handle"].close() 505 next_part = int(writer_state["part"]) + 1 506 writer_state = self._open_evtx_writer( 507 artifact_key=artifact_key, 508 group_name=group_name, 509 part=next_part, 510 ) 511 writers[group_name] = writer_state 512 csv_paths.append(writer_state["path"]) 513 514 if writer_state["fieldnames"] is None: 515 fieldnames = [str(key) for key in record_dict.keys()] 516 writer_state["fieldnames"] = fieldnames 517 writer_state["fieldnames_set"] = set(fieldnames) 518 writer_state["writer"] = csv.DictWriter( 519 writer_state["handle"], 520 fieldnames=fieldnames, 521 extrasaction="ignore", 522 ) 523 writer_state["writer"].writeheader() 524 else: 525 new_keys = [ 526 str(k) for k in record_dict.keys() 527 if str(k) not in writer_state["fieldnames_set"] 528 ] 529 if new_keys: 530 writer_state["fieldnames"].extend(new_keys) 531 writer_state["fieldnames_set"].update(new_keys) 532 writer_state["headers_expanded"] = True 533 writer_state["writer"] = csv.DictWriter( 534 writer_state["handle"], 535 fieldnames=writer_state["fieldnames"], 536 extrasaction="ignore", 537 ) 538 539 fieldnames = writer_state["fieldnames"] 540 row = { 541 fieldname: self._stringify_csv_value(record_dict.get(fieldname)) 542 for fieldname in fieldnames 543 } 544 writer_state["writer"].writerow(row) 545 writer_state["records_in_file"] += 1 546 record_count += 1 547 548 if progress_callback is not None and record_count % 1000 == 0: 549 self._emit_progress(progress_callback, artifact_key, record_count) 550 finally: 551 for writer_state in writers.values(): 552 writer_state["handle"].close() 553 554 for writer_state in writers.values(): 555 if writer_state["headers_expanded"] and writer_state["records_in_file"] > 0: 556 self._rewrite_csv_with_expanded_headers( 557 writer_state["path"], writer_state["fieldnames"], 558 ) 559 560 if progress_callback is not None: 561 self._emit_progress(progress_callback, artifact_key, record_count) 562 563 return csv_paths, record_count 564 565 def _open_evtx_writer(self, artifact_key: str, group_name: str, part: int) -> dict[str, Any]: 566 """Open a new CSV file for an EVTX channel group and return writer state. 567 568 Args: 569 artifact_key: Parent artifact key for filename construction. 570 group_name: EVTX channel or provider name. 571 part: 1-based part number for multi-file splits. 572 573 Returns: 574 Dictionary containing ``path``, ``handle``, ``writer``, 575 ``fieldnames``, ``fieldnames_set``, ``headers_expanded``, 576 ``records_in_file``, and ``part``. 577 """ 578 artifact_stub = self._sanitize_filename(artifact_key) 579 group_stub = self._sanitize_filename(group_name) 580 filename = f"{artifact_stub}_{group_stub}.csv" if part == 1 else f"{artifact_stub}_{group_stub}_part{part}.csv" 581 output_path = self.parsed_dir / filename 582 583 handle = output_path.open("w", newline="", encoding="utf-8") 584 return { 585 "path": output_path, 586 "handle": handle, 587 "writer": None, 588 "fieldnames": None, 589 "fieldnames_set": None, 590 "headers_expanded": False, 591 "records_in_file": 0, 592 "part": part, 593 } 594 595 def _extract_evtx_group_name(self, record_dict: dict[str, Any]) -> str: 596 """Determine the channel/provider group name for an EVTX record. 597 598 Checks multiple candidate keys (``channel``, ``Channel``, 599 ``provider``, etc.) and returns the first non-empty value. 600 601 Args: 602 record_dict: Dictionary representation of the EVTX record. 603 604 Returns: 605 Channel or provider name, or ``"unknown"`` if none found. 606 """ 607 channel = self._find_record_value( 608 record_dict, 609 ( 610 "channel", 611 "Channel", 612 "log_name", 613 "LogName", 614 "event_log", 615 "EventLog", 616 ), 617 ) 618 provider = self._find_record_value( 619 record_dict, 620 ( 621 "provider", 622 "Provider", 623 "provider_name", 624 "ProviderName", 625 "source", 626 "Source", 627 ), 628 ) 629 630 if channel: 631 return channel 632 if provider: 633 return provider 634 return "unknown" 635 636 @staticmethod 637 def _record_to_dict(record: Any) -> dict[str, Any]: 638 """Convert a Dissect record to a plain dictionary. 639 640 Handles Dissect ``Record`` objects (via ``_asdict()``), plain 641 dicts, and objects with a ``__dict__``. 642 643 Args: 644 record: A Dissect record or dict-like object. 645 646 Returns: 647 A plain dictionary of field names to values. 648 649 Raises: 650 TypeError: If the record cannot be converted. 651 """ 652 if hasattr(record, "_asdict"): 653 as_dict = record._asdict() 654 if isinstance(as_dict, dict): 655 return dict(as_dict) 656 657 if isinstance(record, dict): 658 return dict(record) 659 660 try: 661 return dict(vars(record)) 662 except TypeError as exc: 663 raise TypeError("Artifact record cannot be converted to a dictionary.") from exc 664 665 @staticmethod 666 def _stringify_csv_value(value: Any) -> str: 667 """Convert a record field value to a CSV-safe string. 668 669 Handles ``datetime``, ``bytes``, ``None``, and other types that 670 Dissect records may yield. 671 672 Args: 673 value: The raw field value from a Dissect record. 674 675 Returns: 676 String representation suitable for CSV output. 677 """ 678 if value is None: 679 return "" 680 if isinstance(value, (datetime, date, time)): 681 return value.isoformat() 682 if isinstance(value, (bytes, bytearray, memoryview)): 683 raw = bytes(value) 684 if len(raw) > 512: 685 return raw[:512].hex() + "..." 686 return raw.hex() 687 return str(value) 688 689 @staticmethod 690 def _find_record_value(record_dict: dict[str, Any], candidate_keys: tuple[str, ...]) -> str: 691 """Return the first non-empty value from *candidate_keys* in *record_dict*. 692 693 Args: 694 record_dict: Dictionary to search. 695 candidate_keys: Ordered tuple of keys to try. 696 697 Returns: 698 The first non-empty string value, or ``""`` if none found. 699 """ 700 for key in candidate_keys: 701 if key in record_dict and record_dict[key] not in (None, ""): 702 return str(record_dict[key]) 703 return "" 704 705 @staticmethod 706 def _sanitize_filename(value: str) -> str: 707 """Replace non-alphanumeric characters with underscores for safe filenames. 708 709 Args: 710 value: Raw string to sanitise. 711 712 Returns: 713 Filesystem-safe string, or ``"artifact"`` if empty after cleaning. 714 """ 715 cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_") 716 return cleaned or "artifact" 717 718 @staticmethod 719 def _is_evtx_artifact(function_name: str) -> bool: 720 """Return *True* if *function_name* indicates an EVTX artifact.""" 721 return function_name == "evtx" or function_name.endswith(".evtx") 722 723 @staticmethod 724 def _emit_progress( 725 progress_callback: Callable[..., None], 726 artifact_key: str, 727 record_count: int, 728 ) -> None: 729 """Invoke the progress callback, tolerating varying signatures. 730 731 Tries ``callback(dict)``, then ``callback(key, count)``, then 732 ``callback(count)`` to accommodate different caller conventions. 733 734 Args: 735 progress_callback: Callable to invoke. 736 artifact_key: Current artifact being parsed. 737 record_count: Number of records processed so far. 738 """ 739 payload = {"artifact_key": artifact_key, "record_count": record_count} 740 try: 741 progress_callback(payload) 742 return 743 except TypeError: 744 pass 745 746 try: 747 progress_callback(artifact_key, record_count) # type: ignore[misc] 748 return 749 except TypeError: 750 pass 751 752 try: 753 progress_callback(record_count) # type: ignore[misc] 754 except Exception: 755 return
54class ForensicParser: 55 """Parse supported forensic artifacts from a Dissect target into CSV files. 56 57 Opens a disk image via Dissect's ``Target.open()``, queries available 58 artifacts, and streams their records to CSV files in the case's parsed 59 directory. Implements the context manager protocol for deterministic 60 resource cleanup. 61 62 Attributes: 63 evidence_path: Path to the source evidence file. 64 case_dir: Root directory for this forensic case. 65 audit_logger: :class:`~app.audit.AuditLogger` for recording actions. 66 parsed_dir: Directory where output CSV files are written. 67 target: The open Dissect ``Target`` handle. 68 """ 69 70 def __init__( 71 self, 72 evidence_path: str | Path, 73 case_dir: str | Path, 74 audit_logger: Any, 75 parsed_dir: str | Path | None = None, 76 ) -> None: 77 """Initialise the parser and open the Dissect target. 78 79 Args: 80 evidence_path: Path to the disk image or evidence container. 81 case_dir: Case-specific directory for output and audit data. 82 audit_logger: Logger instance for writing audit trail entries. 83 parsed_dir: Optional override for the CSV output directory. 84 Defaults to ``<case_dir>/parsed/``. 85 """ 86 self.evidence_path = Path(evidence_path) 87 self.case_dir = Path(case_dir) 88 self.audit_logger = audit_logger 89 self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed" 90 self.parsed_dir.mkdir(parents=True, exist_ok=True) 91 self.target = Target.open(self.evidence_path) 92 self._closed = False 93 94 try: 95 self.os_type: str = str(self.target.os).strip().lower() 96 except Exception: 97 self.os_type = "unknown" 98 99 def close(self) -> None: 100 """Close the underlying Dissect target handle.""" 101 if self._closed: 102 return 103 104 try: 105 close_method = getattr(self.target, "close", None) 106 except Exception: 107 close_method = None 108 if callable(close_method): 109 close_method() 110 self._closed = True 111 112 def __enter__(self) -> ForensicParser: 113 """Enter the runtime context and return the parser instance.""" 114 return self 115 116 def __exit__( 117 self, 118 exc_type: type[BaseException] | None, 119 exc_val: BaseException | None, 120 exc_tb: TracebackType | None, 121 ) -> bool: 122 """Exit the runtime context, closing the Dissect target.""" 123 del exc_type, exc_val, exc_tb 124 self.close() 125 return False 126 127 def get_image_metadata(self) -> dict[str, str]: 128 """Extract key system metadata from the Dissect target. 129 130 Attempts multiple attribute name variants for each field (e.g. 131 ``hostname``, ``computer_name``, ``name``) to accommodate 132 different OS profiles. 133 134 Returns: 135 Dictionary with keys ``hostname``, ``os_version``, ``domain``, 136 ``ips``, ``timezone``, and ``install_date``. 137 """ 138 hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name"))) 139 os_version = str(self._safe_read_target_attribute(("os_version", "version"))) 140 domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup"))) 141 timezone = str(self._safe_read_target_attribute(("timezone", "tz"))) 142 install_date = str(self._safe_read_target_attribute(("install_date", "installdate"))) 143 144 ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip")) 145 if isinstance(ips_value, (list, tuple, set)): 146 ips = ", ".join(str(value) for value in ips_value if value not in (None, "")) 147 if not ips: 148 ips = UNKNOWN_VALUE 149 else: 150 ips = str(ips_value) 151 152 return { 153 "hostname": hostname, 154 "os_version": os_version, 155 "domain": domain, 156 "ips": ips, 157 "timezone": timezone, 158 "install_date": install_date, 159 } 160 161 def get_available_artifacts(self) -> list[dict[str, Any]]: 162 """Return the artifact registry annotated with availability flags. 163 164 Detects the target OS via ``target.os`` and selects the 165 appropriate artifact registry (Windows or Linux). Probes the 166 Dissect target for each registered artifact and sets an 167 ``available`` boolean on the returned metadata dictionaries. 168 169 Returns: 170 List of artifact metadata dicts, each augmented with ``key`` 171 and ``available`` fields. 172 """ 173 registry = get_artifact_registry(self.os_type) 174 available_artifacts: list[dict[str, Any]] = [] 175 for artifact_key, artifact_details in registry.items(): 176 function_name = str(artifact_details.get("function", artifact_key)) 177 try: 178 available = bool(self.target.has_function(function_name)) 179 except (PluginError, UnsupportedPluginError): 180 available = False 181 182 available_artifact = dict(artifact_details) 183 available_artifact["key"] = artifact_key 184 available_artifact["available"] = available 185 available_artifacts.append(available_artifact) 186 187 return available_artifacts 188 189 def _call_target_function(self, function_name: str) -> Any: 190 """Invoke a Dissect function on the target, including namespaced functions. 191 192 For simple names like ``"shimcache"`` it calls ``target.shimcache()``. 193 For dotted names like ``"browser.history"`` it traverses the namespace 194 chain (``target.browser.history()``) and calls the final attribute. 195 """ 196 if "." not in function_name: 197 function = getattr(self.target, function_name) 198 return function() if callable(function) else function 199 200 current: Any = self.target 201 parts = function_name.split(".") 202 try: 203 for namespace in parts: 204 current = getattr(current, namespace) 205 except Exception: 206 logger.warning( 207 "Failed to resolve nested function '%s' (stopped at '%s')", 208 function_name, 209 namespace, 210 exc_info=True, 211 ) 212 raise 213 214 return current() if callable(current) else current 215 216 def parse_artifact( 217 self, 218 artifact_key: str, 219 progress_callback: Callable[..., None] | None = None, 220 ) -> dict[str, Any]: 221 """Parse a single artifact and stream its records to one or more CSV files. 222 223 Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``) 224 to the audit trail. EVTX artifacts are split by channel/provider 225 into separate CSV files. 226 227 Args: 228 artifact_key: Key from the OS-specific artifact registry identifying 229 the artifact to parse. 230 progress_callback: Optional callback invoked every 1 000 records 231 with progress information. 232 233 Returns: 234 Result dictionary with keys ``csv_path``, ``record_count``, 235 ``duration_seconds``, ``success``, and ``error``. EVTX 236 results also include a ``csv_paths`` list. 237 """ 238 registry = get_artifact_registry(self.os_type) 239 artifact = registry.get(artifact_key) 240 if artifact is None: 241 return { 242 "csv_path": "", 243 "record_count": 0, 244 "duration_seconds": 0.0, 245 "success": False, 246 "error": f"Unknown artifact key: {artifact_key}", 247 } 248 249 function_name = str(artifact.get("function", artifact_key)) 250 start_time = perf_counter() 251 record_count = 0 252 csv_path = "" 253 254 self.audit_logger.log( 255 "parsing_started", 256 { 257 "artifact_key": artifact_key, 258 "artifact_name": artifact.get("name", artifact_key), 259 "function": function_name, 260 }, 261 ) 262 263 try: 264 records = self._call_target_function(function_name) 265 if self._is_evtx_artifact(function_name): 266 all_csv_paths, record_count = self._write_evtx_records( 267 artifact_key=artifact_key, 268 records=records, 269 progress_callback=progress_callback, 270 ) 271 if all_csv_paths: 272 csv_path = str(all_csv_paths[0]) 273 else: 274 empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 275 empty_output.touch(exist_ok=True) 276 csv_path = str(empty_output) 277 all_csv_paths = [empty_output] 278 else: 279 csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 280 record_count = self._write_records_to_csv( 281 records=records, 282 csv_output_path=csv_output, 283 progress_callback=progress_callback, 284 artifact_key=artifact_key, 285 ) 286 csv_path = str(csv_output) 287 288 duration = perf_counter() - start_time 289 self.audit_logger.log( 290 "parsing_completed", 291 { 292 "artifact_key": artifact_key, 293 "artifact_name": artifact.get("name", artifact_key), 294 "function": function_name, 295 "record_count": record_count, 296 "duration_seconds": round(duration, 6), 297 "csv_path": csv_path, 298 }, 299 ) 300 301 result: dict[str, Any] = { 302 "csv_path": csv_path, 303 "record_count": record_count, 304 "duration_seconds": duration, 305 "success": True, 306 "error": None, 307 } 308 if self._is_evtx_artifact(function_name): 309 result["csv_paths"] = [str(p) for p in all_csv_paths] 310 return result 311 except Exception as error: 312 duration = perf_counter() - start_time 313 error_message = str(error) 314 error_traceback = traceback.format_exc() 315 self.audit_logger.log( 316 "parsing_failed", 317 { 318 "artifact_key": artifact_key, 319 "artifact_name": artifact.get("name", artifact_key), 320 "function": function_name, 321 "error": error_message, 322 "traceback": error_traceback, 323 "duration_seconds": round(duration, 6), 324 }, 325 ) 326 return { 327 "csv_path": "", 328 "record_count": record_count, 329 "duration_seconds": duration, 330 "success": False, 331 "error": error_message, 332 } 333 334 def _safe_read_target_attribute(self, attribute_names: tuple[str, ...]) -> Any: 335 """Read a target attribute by trying multiple candidate names. 336 337 Args: 338 attribute_names: Ordered tuple of attribute names to try. 339 340 Returns: 341 The first non-empty value found, or :data:`UNKNOWN_VALUE`. 342 """ 343 for attribute_name in attribute_names: 344 try: 345 value = getattr(self.target, attribute_name) 346 except Exception: 347 continue 348 349 if callable(value): 350 try: 351 value = value() 352 except Exception: 353 continue 354 355 if value in (None, ""): 356 continue 357 358 return value 359 360 return UNKNOWN_VALUE 361 362 def _write_records_to_csv( 363 self, 364 records: Iterable[Any], 365 csv_output_path: Path, 366 progress_callback: Callable[..., None] | None, 367 artifact_key: str, 368 ) -> int: 369 """Stream Dissect records to a CSV file, handling dynamic schemas. 370 371 If the record schema expands mid-stream (new columns appear), the 372 file is rewritten at the end with the complete header row via 373 :meth:`_rewrite_csv_with_expanded_headers`. 374 375 Args: 376 records: Iterable of Dissect record objects. 377 csv_output_path: Destination CSV file path. 378 progress_callback: Optional progress callback. 379 artifact_key: Artifact key for audit/progress reporting. 380 381 Returns: 382 Total number of records written. 383 """ 384 record_count = 0 385 fieldnames: list[str] = [] 386 fieldnames_set: set[str] = set() 387 headers_expanded = False 388 389 with csv_output_path.open("w", newline="", encoding="utf-8") as csv_file: 390 writer: csv.DictWriter | None = None 391 for record in records: 392 record_dict = self._record_to_dict(record) 393 394 new_keys = [str(k) for k in record_dict.keys() if str(k) not in fieldnames_set] 395 if new_keys: 396 fieldnames.extend(new_keys) 397 fieldnames_set.update(new_keys) 398 if writer is not None: 399 headers_expanded = True 400 writer = csv.DictWriter( 401 csv_file, fieldnames=fieldnames, restval="", extrasaction="ignore", 402 ) 403 if not headers_expanded: 404 writer.writeheader() 405 406 row = { 407 fn: self._stringify_csv_value(record_dict.get(fn)) 408 for fn in fieldnames 409 } 410 if writer is not None: 411 writer.writerow(row) 412 record_count += 1 413 414 if record_count >= MAX_RECORDS_PER_ARTIFACT: 415 self.audit_logger.log( 416 "parsing_capped", 417 { 418 "artifact_key": artifact_key, 419 "record_count": record_count, 420 "max_records": MAX_RECORDS_PER_ARTIFACT, 421 "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows", 422 }, 423 ) 424 break 425 426 if progress_callback is not None and record_count % 1000 == 0: 427 self._emit_progress(progress_callback, artifact_key, record_count) 428 429 if headers_expanded and record_count > 0: 430 self._rewrite_csv_with_expanded_headers(csv_output_path, fieldnames) 431 432 if progress_callback is not None: 433 self._emit_progress(progress_callback, artifact_key, record_count) 434 435 return record_count 436 437 def _rewrite_csv_with_expanded_headers(self, csv_path: Path, fieldnames: list[str]) -> None: 438 """Rewrite a CSV whose header is incomplete due to mid-stream schema changes. 439 440 Because fieldnames are only ever appended, row values are positionally 441 aligned: shorter rows (written before expansion) just need empty-string 442 padding for the new trailing columns. 443 """ 444 temp_path = csv_path.with_suffix(".csv.tmp") 445 num_fields = len(fieldnames) 446 with csv_path.open("r", newline="", encoding="utf-8") as src, \ 447 temp_path.open("w", newline="", encoding="utf-8") as dst: 448 reader = csv.reader(src) 449 csv_writer = csv.writer(dst) 450 csv_writer.writerow(fieldnames) 451 next(reader, None) # skip original (incomplete) header 452 for row in reader: 453 if len(row) < num_fields: 454 row.extend([""] * (num_fields - len(row))) 455 csv_writer.writerow(row) 456 temp_path.replace(csv_path) 457 458 def _write_evtx_records( 459 self, 460 artifact_key: str, 461 records: Any, 462 progress_callback: Callable[..., None] | None, 463 ) -> tuple[list[Path], int]: 464 """Stream EVTX records into per-channel CSV files with automatic splitting. 465 466 Records are grouped by their channel or provider name. When a 467 single group exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`, a new 468 part file is created. 469 470 Args: 471 artifact_key: Artifact key for filename construction. 472 records: Iterable of Dissect EVTX record objects. 473 progress_callback: Optional progress callback. 474 475 Returns: 476 Tuple of ``(csv_paths, total_record_count)``. 477 """ 478 writers: dict[str, dict[str, Any]] = {} 479 csv_paths: list[Path] = [] 480 record_count = 0 481 482 try: 483 for record in records: 484 if record_count >= MAX_RECORDS_PER_ARTIFACT: 485 self.audit_logger.log( 486 "parsing_capped", 487 { 488 "artifact_key": artifact_key, 489 "record_count": record_count, 490 "max_records": MAX_RECORDS_PER_ARTIFACT, 491 "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows", 492 }, 493 ) 494 break 495 496 record_dict = self._record_to_dict(record) 497 group_name = self._extract_evtx_group_name(record_dict) 498 499 writer_state = writers.get(group_name) 500 if writer_state is None: 501 writer_state = self._open_evtx_writer(artifact_key=artifact_key, group_name=group_name, part=1) 502 writers[group_name] = writer_state 503 csv_paths.append(writer_state["path"]) 504 elif writer_state["records_in_file"] >= EVTX_MAX_RECORDS_PER_FILE: 505 writer_state["handle"].close() 506 next_part = int(writer_state["part"]) + 1 507 writer_state = self._open_evtx_writer( 508 artifact_key=artifact_key, 509 group_name=group_name, 510 part=next_part, 511 ) 512 writers[group_name] = writer_state 513 csv_paths.append(writer_state["path"]) 514 515 if writer_state["fieldnames"] is None: 516 fieldnames = [str(key) for key in record_dict.keys()] 517 writer_state["fieldnames"] = fieldnames 518 writer_state["fieldnames_set"] = set(fieldnames) 519 writer_state["writer"] = csv.DictWriter( 520 writer_state["handle"], 521 fieldnames=fieldnames, 522 extrasaction="ignore", 523 ) 524 writer_state["writer"].writeheader() 525 else: 526 new_keys = [ 527 str(k) for k in record_dict.keys() 528 if str(k) not in writer_state["fieldnames_set"] 529 ] 530 if new_keys: 531 writer_state["fieldnames"].extend(new_keys) 532 writer_state["fieldnames_set"].update(new_keys) 533 writer_state["headers_expanded"] = True 534 writer_state["writer"] = csv.DictWriter( 535 writer_state["handle"], 536 fieldnames=writer_state["fieldnames"], 537 extrasaction="ignore", 538 ) 539 540 fieldnames = writer_state["fieldnames"] 541 row = { 542 fieldname: self._stringify_csv_value(record_dict.get(fieldname)) 543 for fieldname in fieldnames 544 } 545 writer_state["writer"].writerow(row) 546 writer_state["records_in_file"] += 1 547 record_count += 1 548 549 if progress_callback is not None and record_count % 1000 == 0: 550 self._emit_progress(progress_callback, artifact_key, record_count) 551 finally: 552 for writer_state in writers.values(): 553 writer_state["handle"].close() 554 555 for writer_state in writers.values(): 556 if writer_state["headers_expanded"] and writer_state["records_in_file"] > 0: 557 self._rewrite_csv_with_expanded_headers( 558 writer_state["path"], writer_state["fieldnames"], 559 ) 560 561 if progress_callback is not None: 562 self._emit_progress(progress_callback, artifact_key, record_count) 563 564 return csv_paths, record_count 565 566 def _open_evtx_writer(self, artifact_key: str, group_name: str, part: int) -> dict[str, Any]: 567 """Open a new CSV file for an EVTX channel group and return writer state. 568 569 Args: 570 artifact_key: Parent artifact key for filename construction. 571 group_name: EVTX channel or provider name. 572 part: 1-based part number for multi-file splits. 573 574 Returns: 575 Dictionary containing ``path``, ``handle``, ``writer``, 576 ``fieldnames``, ``fieldnames_set``, ``headers_expanded``, 577 ``records_in_file``, and ``part``. 578 """ 579 artifact_stub = self._sanitize_filename(artifact_key) 580 group_stub = self._sanitize_filename(group_name) 581 filename = f"{artifact_stub}_{group_stub}.csv" if part == 1 else f"{artifact_stub}_{group_stub}_part{part}.csv" 582 output_path = self.parsed_dir / filename 583 584 handle = output_path.open("w", newline="", encoding="utf-8") 585 return { 586 "path": output_path, 587 "handle": handle, 588 "writer": None, 589 "fieldnames": None, 590 "fieldnames_set": None, 591 "headers_expanded": False, 592 "records_in_file": 0, 593 "part": part, 594 } 595 596 def _extract_evtx_group_name(self, record_dict: dict[str, Any]) -> str: 597 """Determine the channel/provider group name for an EVTX record. 598 599 Checks multiple candidate keys (``channel``, ``Channel``, 600 ``provider``, etc.) and returns the first non-empty value. 601 602 Args: 603 record_dict: Dictionary representation of the EVTX record. 604 605 Returns: 606 Channel or provider name, or ``"unknown"`` if none found. 607 """ 608 channel = self._find_record_value( 609 record_dict, 610 ( 611 "channel", 612 "Channel", 613 "log_name", 614 "LogName", 615 "event_log", 616 "EventLog", 617 ), 618 ) 619 provider = self._find_record_value( 620 record_dict, 621 ( 622 "provider", 623 "Provider", 624 "provider_name", 625 "ProviderName", 626 "source", 627 "Source", 628 ), 629 ) 630 631 if channel: 632 return channel 633 if provider: 634 return provider 635 return "unknown" 636 637 @staticmethod 638 def _record_to_dict(record: Any) -> dict[str, Any]: 639 """Convert a Dissect record to a plain dictionary. 640 641 Handles Dissect ``Record`` objects (via ``_asdict()``), plain 642 dicts, and objects with a ``__dict__``. 643 644 Args: 645 record: A Dissect record or dict-like object. 646 647 Returns: 648 A plain dictionary of field names to values. 649 650 Raises: 651 TypeError: If the record cannot be converted. 652 """ 653 if hasattr(record, "_asdict"): 654 as_dict = record._asdict() 655 if isinstance(as_dict, dict): 656 return dict(as_dict) 657 658 if isinstance(record, dict): 659 return dict(record) 660 661 try: 662 return dict(vars(record)) 663 except TypeError as exc: 664 raise TypeError("Artifact record cannot be converted to a dictionary.") from exc 665 666 @staticmethod 667 def _stringify_csv_value(value: Any) -> str: 668 """Convert a record field value to a CSV-safe string. 669 670 Handles ``datetime``, ``bytes``, ``None``, and other types that 671 Dissect records may yield. 672 673 Args: 674 value: The raw field value from a Dissect record. 675 676 Returns: 677 String representation suitable for CSV output. 678 """ 679 if value is None: 680 return "" 681 if isinstance(value, (datetime, date, time)): 682 return value.isoformat() 683 if isinstance(value, (bytes, bytearray, memoryview)): 684 raw = bytes(value) 685 if len(raw) > 512: 686 return raw[:512].hex() + "..." 687 return raw.hex() 688 return str(value) 689 690 @staticmethod 691 def _find_record_value(record_dict: dict[str, Any], candidate_keys: tuple[str, ...]) -> str: 692 """Return the first non-empty value from *candidate_keys* in *record_dict*. 693 694 Args: 695 record_dict: Dictionary to search. 696 candidate_keys: Ordered tuple of keys to try. 697 698 Returns: 699 The first non-empty string value, or ``""`` if none found. 700 """ 701 for key in candidate_keys: 702 if key in record_dict and record_dict[key] not in (None, ""): 703 return str(record_dict[key]) 704 return "" 705 706 @staticmethod 707 def _sanitize_filename(value: str) -> str: 708 """Replace non-alphanumeric characters with underscores for safe filenames. 709 710 Args: 711 value: Raw string to sanitise. 712 713 Returns: 714 Filesystem-safe string, or ``"artifact"`` if empty after cleaning. 715 """ 716 cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_") 717 return cleaned or "artifact" 718 719 @staticmethod 720 def _is_evtx_artifact(function_name: str) -> bool: 721 """Return *True* if *function_name* indicates an EVTX artifact.""" 722 return function_name == "evtx" or function_name.endswith(".evtx") 723 724 @staticmethod 725 def _emit_progress( 726 progress_callback: Callable[..., None], 727 artifact_key: str, 728 record_count: int, 729 ) -> None: 730 """Invoke the progress callback, tolerating varying signatures. 731 732 Tries ``callback(dict)``, then ``callback(key, count)``, then 733 ``callback(count)`` to accommodate different caller conventions. 734 735 Args: 736 progress_callback: Callable to invoke. 737 artifact_key: Current artifact being parsed. 738 record_count: Number of records processed so far. 739 """ 740 payload = {"artifact_key": artifact_key, "record_count": record_count} 741 try: 742 progress_callback(payload) 743 return 744 except TypeError: 745 pass 746 747 try: 748 progress_callback(artifact_key, record_count) # type: ignore[misc] 749 return 750 except TypeError: 751 pass 752 753 try: 754 progress_callback(record_count) # type: ignore[misc] 755 except Exception: 756 return
Parse supported forensic artifacts from a Dissect target into CSV files.
Opens a disk image via Dissect's Target.open(), queries available
artifacts, and streams their records to CSV files in the case's parsed
directory. Implements the context manager protocol for deterministic
resource cleanup.
Attributes:
- evidence_path: Path to the source evidence file.
- case_dir: Root directory for this forensic case.
- audit_logger:
~app.audit.AuditLoggerfor recording actions. - parsed_dir: Directory where output CSV files are written.
- target: The open Dissect
Targethandle.
70 def __init__( 71 self, 72 evidence_path: str | Path, 73 case_dir: str | Path, 74 audit_logger: Any, 75 parsed_dir: str | Path | None = None, 76 ) -> None: 77 """Initialise the parser and open the Dissect target. 78 79 Args: 80 evidence_path: Path to the disk image or evidence container. 81 case_dir: Case-specific directory for output and audit data. 82 audit_logger: Logger instance for writing audit trail entries. 83 parsed_dir: Optional override for the CSV output directory. 84 Defaults to ``<case_dir>/parsed/``. 85 """ 86 self.evidence_path = Path(evidence_path) 87 self.case_dir = Path(case_dir) 88 self.audit_logger = audit_logger 89 self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed" 90 self.parsed_dir.mkdir(parents=True, exist_ok=True) 91 self.target = Target.open(self.evidence_path) 92 self._closed = False 93 94 try: 95 self.os_type: str = str(self.target.os).strip().lower() 96 except Exception: 97 self.os_type = "unknown"
Initialise the parser and open the Dissect target.
Arguments:
- evidence_path: Path to the disk image or evidence container.
- case_dir: Case-specific directory for output and audit data.
- audit_logger: Logger instance for writing audit trail entries.
- parsed_dir: Optional override for the CSV output directory.
Defaults to
<case_dir>/parsed/.
99 def close(self) -> None: 100 """Close the underlying Dissect target handle.""" 101 if self._closed: 102 return 103 104 try: 105 close_method = getattr(self.target, "close", None) 106 except Exception: 107 close_method = None 108 if callable(close_method): 109 close_method() 110 self._closed = True
Close the underlying Dissect target handle.
127 def get_image_metadata(self) -> dict[str, str]: 128 """Extract key system metadata from the Dissect target. 129 130 Attempts multiple attribute name variants for each field (e.g. 131 ``hostname``, ``computer_name``, ``name``) to accommodate 132 different OS profiles. 133 134 Returns: 135 Dictionary with keys ``hostname``, ``os_version``, ``domain``, 136 ``ips``, ``timezone``, and ``install_date``. 137 """ 138 hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name"))) 139 os_version = str(self._safe_read_target_attribute(("os_version", "version"))) 140 domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup"))) 141 timezone = str(self._safe_read_target_attribute(("timezone", "tz"))) 142 install_date = str(self._safe_read_target_attribute(("install_date", "installdate"))) 143 144 ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip")) 145 if isinstance(ips_value, (list, tuple, set)): 146 ips = ", ".join(str(value) for value in ips_value if value not in (None, "")) 147 if not ips: 148 ips = UNKNOWN_VALUE 149 else: 150 ips = str(ips_value) 151 152 return { 153 "hostname": hostname, 154 "os_version": os_version, 155 "domain": domain, 156 "ips": ips, 157 "timezone": timezone, 158 "install_date": install_date, 159 }
Extract key system metadata from the Dissect target.
Attempts multiple attribute name variants for each field (e.g.
hostname, computer_name, name) to accommodate
different OS profiles.
Returns:
Dictionary with keys
hostname,os_version,domain,ips,timezone, andinstall_date.
161 def get_available_artifacts(self) -> list[dict[str, Any]]: 162 """Return the artifact registry annotated with availability flags. 163 164 Detects the target OS via ``target.os`` and selects the 165 appropriate artifact registry (Windows or Linux). Probes the 166 Dissect target for each registered artifact and sets an 167 ``available`` boolean on the returned metadata dictionaries. 168 169 Returns: 170 List of artifact metadata dicts, each augmented with ``key`` 171 and ``available`` fields. 172 """ 173 registry = get_artifact_registry(self.os_type) 174 available_artifacts: list[dict[str, Any]] = [] 175 for artifact_key, artifact_details in registry.items(): 176 function_name = str(artifact_details.get("function", artifact_key)) 177 try: 178 available = bool(self.target.has_function(function_name)) 179 except (PluginError, UnsupportedPluginError): 180 available = False 181 182 available_artifact = dict(artifact_details) 183 available_artifact["key"] = artifact_key 184 available_artifact["available"] = available 185 available_artifacts.append(available_artifact) 186 187 return available_artifacts
Return the artifact registry annotated with availability flags.
Detects the target OS via target.os and selects the
appropriate artifact registry (Windows or Linux). Probes the
Dissect target for each registered artifact and sets an
available boolean on the returned metadata dictionaries.
Returns:
List of artifact metadata dicts, each augmented with
keyandavailablefields.
216 def parse_artifact( 217 self, 218 artifact_key: str, 219 progress_callback: Callable[..., None] | None = None, 220 ) -> dict[str, Any]: 221 """Parse a single artifact and stream its records to one or more CSV files. 222 223 Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``) 224 to the audit trail. EVTX artifacts are split by channel/provider 225 into separate CSV files. 226 227 Args: 228 artifact_key: Key from the OS-specific artifact registry identifying 229 the artifact to parse. 230 progress_callback: Optional callback invoked every 1 000 records 231 with progress information. 232 233 Returns: 234 Result dictionary with keys ``csv_path``, ``record_count``, 235 ``duration_seconds``, ``success``, and ``error``. EVTX 236 results also include a ``csv_paths`` list. 237 """ 238 registry = get_artifact_registry(self.os_type) 239 artifact = registry.get(artifact_key) 240 if artifact is None: 241 return { 242 "csv_path": "", 243 "record_count": 0, 244 "duration_seconds": 0.0, 245 "success": False, 246 "error": f"Unknown artifact key: {artifact_key}", 247 } 248 249 function_name = str(artifact.get("function", artifact_key)) 250 start_time = perf_counter() 251 record_count = 0 252 csv_path = "" 253 254 self.audit_logger.log( 255 "parsing_started", 256 { 257 "artifact_key": artifact_key, 258 "artifact_name": artifact.get("name", artifact_key), 259 "function": function_name, 260 }, 261 ) 262 263 try: 264 records = self._call_target_function(function_name) 265 if self._is_evtx_artifact(function_name): 266 all_csv_paths, record_count = self._write_evtx_records( 267 artifact_key=artifact_key, 268 records=records, 269 progress_callback=progress_callback, 270 ) 271 if all_csv_paths: 272 csv_path = str(all_csv_paths[0]) 273 else: 274 empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 275 empty_output.touch(exist_ok=True) 276 csv_path = str(empty_output) 277 all_csv_paths = [empty_output] 278 else: 279 csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv" 280 record_count = self._write_records_to_csv( 281 records=records, 282 csv_output_path=csv_output, 283 progress_callback=progress_callback, 284 artifact_key=artifact_key, 285 ) 286 csv_path = str(csv_output) 287 288 duration = perf_counter() - start_time 289 self.audit_logger.log( 290 "parsing_completed", 291 { 292 "artifact_key": artifact_key, 293 "artifact_name": artifact.get("name", artifact_key), 294 "function": function_name, 295 "record_count": record_count, 296 "duration_seconds": round(duration, 6), 297 "csv_path": csv_path, 298 }, 299 ) 300 301 result: dict[str, Any] = { 302 "csv_path": csv_path, 303 "record_count": record_count, 304 "duration_seconds": duration, 305 "success": True, 306 "error": None, 307 } 308 if self._is_evtx_artifact(function_name): 309 result["csv_paths"] = [str(p) for p in all_csv_paths] 310 return result 311 except Exception as error: 312 duration = perf_counter() - start_time 313 error_message = str(error) 314 error_traceback = traceback.format_exc() 315 self.audit_logger.log( 316 "parsing_failed", 317 { 318 "artifact_key": artifact_key, 319 "artifact_name": artifact.get("name", artifact_key), 320 "function": function_name, 321 "error": error_message, 322 "traceback": error_traceback, 323 "duration_seconds": round(duration, 6), 324 }, 325 ) 326 return { 327 "csv_path": "", 328 "record_count": record_count, 329 "duration_seconds": duration, 330 "success": False, 331 "error": error_message, 332 }
Parse a single artifact and stream its records to one or more CSV files.
Logs parsing_started, parsing_completed (or parsing_failed)
to the audit trail. EVTX artifacts are split by channel/provider
into separate CSV files.
Arguments:
- artifact_key: Key from the OS-specific artifact registry identifying the artifact to parse.
- progress_callback: Optional callback invoked every 1 000 records with progress information.
Returns:
Result dictionary with keys
csv_path,record_count,duration_seconds,success, anderror. EVTX results also include acsv_pathslist.