app.parser.core

Dissect integration layer for forensic artifact parsing.

Wraps the Dissect framework's Target API to extract Windows forensic artifacts from disk images (E01, VMDK, VHD, raw, etc.) and stream them into CSV files for downstream AI analysis.

Key responsibilities:

  • Evidence opening -- ForensicParser opens a Dissect Target in read-only mode from any supported container format.
  • CSV streaming -- Records are streamed to CSV one row at a time, never materialised in memory, allowing safe handling of high-volume artifacts such as EVTX and MFT (millions of records).
  • EVTX splitting -- Event log records are automatically partitioned by channel/provider into separate CSV files, with additional part files created when a single channel exceeds EVTX_MAX_RECORDS_PER_FILE.
  • Schema evolution -- When a Dissect plugin yields records with varying schemas, CSV headers are expanded dynamically and the file is rewritten once to ensure a consistent header row.
Attributes:
  • UNKNOWN_VALUE: Sentinel string used when a target attribute cannot be read.
  • EVTX_MAX_RECORDS_PER_FILE: Maximum rows per EVTX CSV part file.
  • MAX_RECORDS_PER_ARTIFACT: Hard cap on rows written for any single artifact.
  1"""Dissect integration layer for forensic artifact parsing.
  2
  3Wraps the Dissect framework's ``Target`` API to extract Windows forensic
  4artifacts from disk images (E01, VMDK, VHD, raw, etc.) and stream them
  5into CSV files for downstream AI analysis.
  6
  7Key responsibilities:
  8
  9* **Evidence opening** -- :class:`ForensicParser` opens a Dissect
 10  ``Target`` in read-only mode from any supported container format.
 11* **CSV streaming** -- Records are streamed to CSV one row at a time,
 12  never materialised in memory, allowing safe handling of high-volume
 13  artifacts such as EVTX and MFT (millions of records).
 14* **EVTX splitting** -- Event log records are automatically partitioned
 15  by channel/provider into separate CSV files, with additional part files
 16  created when a single channel exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`.
 17* **Schema evolution** -- When a Dissect plugin yields records with
 18  varying schemas, CSV headers are expanded dynamically and the file is
 19  rewritten once to ensure a consistent header row.
 20
 21Attributes:
 22    UNKNOWN_VALUE: Sentinel string used when a target attribute cannot be read.
 23    EVTX_MAX_RECORDS_PER_FILE: Maximum rows per EVTX CSV part file.
 24    MAX_RECORDS_PER_ARTIFACT: Hard cap on rows written for any single artifact.
 25"""
 26
 27from __future__ import annotations
 28
 29import csv
 30from datetime import date, datetime, time
 31from pathlib import Path
 32import re
 33import traceback
 34from types import TracebackType
 35from time import perf_counter
 36import logging
 37from typing import Any, Callable, Iterable
 38
 39from dissect.target import Target
 40from dissect.target.exceptions import PluginError, UnsupportedPluginError
 41
 42from .registry import get_artifact_registry
 43
 44__all__ = ["ForensicParser"]
 45
 46logger = logging.getLogger(__name__)
 47
 48UNKNOWN_VALUE = "Unknown"
 49EVTX_MAX_RECORDS_PER_FILE = 500_000
 50MAX_RECORDS_PER_ARTIFACT = 1_000_000
 51
 52
 53class ForensicParser:
 54    """Parse supported forensic artifacts from a Dissect target into CSV files.
 55
 56    Opens a disk image via Dissect's ``Target.open()``, queries available
 57    artifacts, and streams their records to CSV files in the case's parsed
 58    directory.  Implements the context manager protocol for deterministic
 59    resource cleanup.
 60
 61    Attributes:
 62        evidence_path: Path to the source evidence file.
 63        case_dir: Root directory for this forensic case.
 64        audit_logger: :class:`~app.audit.AuditLogger` for recording actions.
 65        parsed_dir: Directory where output CSV files are written.
 66        target: The open Dissect ``Target`` handle.
 67    """
 68
 69    def __init__(
 70        self,
 71        evidence_path: str | Path,
 72        case_dir: str | Path,
 73        audit_logger: Any,
 74        parsed_dir: str | Path | None = None,
 75    ) -> None:
 76        """Initialise the parser and open the Dissect target.
 77
 78        Args:
 79            evidence_path: Path to the disk image or evidence container.
 80            case_dir: Case-specific directory for output and audit data.
 81            audit_logger: Logger instance for writing audit trail entries.
 82            parsed_dir: Optional override for the CSV output directory.
 83                Defaults to ``<case_dir>/parsed/``.
 84        """
 85        self.evidence_path = Path(evidence_path)
 86        self.case_dir = Path(case_dir)
 87        self.audit_logger = audit_logger
 88        self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed"
 89        self.parsed_dir.mkdir(parents=True, exist_ok=True)
 90        self.target = Target.open(self.evidence_path)
 91        self._closed = False
 92
 93        try:
 94            self.os_type: str = str(self.target.os).strip().lower()
 95        except Exception:
 96            self.os_type = "unknown"
 97
 98    def close(self) -> None:
 99        """Close the underlying Dissect target handle."""
100        if self._closed:
101            return
102
103        try:
104            close_method = getattr(self.target, "close", None)
105        except Exception:
106            close_method = None
107        if callable(close_method):
108            close_method()
109        self._closed = True
110
111    def __enter__(self) -> ForensicParser:
112        """Enter the runtime context and return the parser instance."""
113        return self
114
115    def __exit__(
116        self,
117        exc_type: type[BaseException] | None,
118        exc_val: BaseException | None,
119        exc_tb: TracebackType | None,
120    ) -> bool:
121        """Exit the runtime context, closing the Dissect target."""
122        del exc_type, exc_val, exc_tb
123        self.close()
124        return False
125
126    def get_image_metadata(self) -> dict[str, str]:
127        """Extract key system metadata from the Dissect target.
128
129        Attempts multiple attribute name variants for each field (e.g.
130        ``hostname``, ``computer_name``, ``name``) to accommodate
131        different OS profiles.
132
133        Returns:
134            Dictionary with keys ``hostname``, ``os_version``, ``domain``,
135            ``ips``, ``timezone``, and ``install_date``.
136        """
137        hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name")))
138        os_version = str(self._safe_read_target_attribute(("os_version", "version")))
139        domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup")))
140        timezone = str(self._safe_read_target_attribute(("timezone", "tz")))
141        install_date = str(self._safe_read_target_attribute(("install_date", "installdate")))
142
143        ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip"))
144        if isinstance(ips_value, (list, tuple, set)):
145            ips = ", ".join(str(value) for value in ips_value if value not in (None, ""))
146            if not ips:
147                ips = UNKNOWN_VALUE
148        else:
149            ips = str(ips_value)
150
151        return {
152            "hostname": hostname,
153            "os_version": os_version,
154            "domain": domain,
155            "ips": ips,
156            "timezone": timezone,
157            "install_date": install_date,
158        }
159
160    def get_available_artifacts(self) -> list[dict[str, Any]]:
161        """Return the artifact registry annotated with availability flags.
162
163        Detects the target OS via ``target.os`` and selects the
164        appropriate artifact registry (Windows or Linux).  Probes the
165        Dissect target for each registered artifact and sets an
166        ``available`` boolean on the returned metadata dictionaries.
167
168        Returns:
169            List of artifact metadata dicts, each augmented with ``key``
170            and ``available`` fields.
171        """
172        registry = get_artifact_registry(self.os_type)
173        available_artifacts: list[dict[str, Any]] = []
174        for artifact_key, artifact_details in registry.items():
175            function_name = str(artifact_details.get("function", artifact_key))
176            try:
177                available = bool(self.target.has_function(function_name))
178            except (PluginError, UnsupportedPluginError):
179                available = False
180
181            available_artifact = dict(artifact_details)
182            available_artifact["key"] = artifact_key
183            available_artifact["available"] = available
184            available_artifacts.append(available_artifact)
185
186        return available_artifacts
187
188    def _call_target_function(self, function_name: str) -> Any:
189        """Invoke a Dissect function on the target, including namespaced functions.
190
191        For simple names like ``"shimcache"`` it calls ``target.shimcache()``.
192        For dotted names like ``"browser.history"`` it traverses the namespace
193        chain (``target.browser.history()``) and calls the final attribute.
194        """
195        if "." not in function_name:
196            function = getattr(self.target, function_name)
197            return function() if callable(function) else function
198
199        current: Any = self.target
200        parts = function_name.split(".")
201        try:
202            for namespace in parts:
203                current = getattr(current, namespace)
204        except Exception:
205            logger.warning(
206                "Failed to resolve nested function '%s' (stopped at '%s')",
207                function_name,
208                namespace,
209                exc_info=True,
210            )
211            raise
212
213        return current() if callable(current) else current
214
215    def parse_artifact(
216        self,
217        artifact_key: str,
218        progress_callback: Callable[..., None] | None = None,
219    ) -> dict[str, Any]:
220        """Parse a single artifact and stream its records to one or more CSV files.
221
222        Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``)
223        to the audit trail.  EVTX artifacts are split by channel/provider
224        into separate CSV files.
225
226        Args:
227            artifact_key: Key from the OS-specific artifact registry identifying
228                the artifact to parse.
229            progress_callback: Optional callback invoked every 1 000 records
230                with progress information.
231
232        Returns:
233            Result dictionary with keys ``csv_path``, ``record_count``,
234            ``duration_seconds``, ``success``, and ``error``.  EVTX
235            results also include a ``csv_paths`` list.
236        """
237        registry = get_artifact_registry(self.os_type)
238        artifact = registry.get(artifact_key)
239        if artifact is None:
240            return {
241                "csv_path": "",
242                "record_count": 0,
243                "duration_seconds": 0.0,
244                "success": False,
245                "error": f"Unknown artifact key: {artifact_key}",
246            }
247
248        function_name = str(artifact.get("function", artifact_key))
249        start_time = perf_counter()
250        record_count = 0
251        csv_path = ""
252
253        self.audit_logger.log(
254            "parsing_started",
255            {
256                "artifact_key": artifact_key,
257                "artifact_name": artifact.get("name", artifact_key),
258                "function": function_name,
259            },
260        )
261
262        try:
263            records = self._call_target_function(function_name)
264            if self._is_evtx_artifact(function_name):
265                all_csv_paths, record_count = self._write_evtx_records(
266                    artifact_key=artifact_key,
267                    records=records,
268                    progress_callback=progress_callback,
269                )
270                if all_csv_paths:
271                    csv_path = str(all_csv_paths[0])
272                else:
273                    empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
274                    empty_output.touch(exist_ok=True)
275                    csv_path = str(empty_output)
276                    all_csv_paths = [empty_output]
277            else:
278                csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
279                record_count = self._write_records_to_csv(
280                    records=records,
281                    csv_output_path=csv_output,
282                    progress_callback=progress_callback,
283                    artifact_key=artifact_key,
284                )
285                csv_path = str(csv_output)
286
287            duration = perf_counter() - start_time
288            self.audit_logger.log(
289                "parsing_completed",
290                {
291                    "artifact_key": artifact_key,
292                    "artifact_name": artifact.get("name", artifact_key),
293                    "function": function_name,
294                    "record_count": record_count,
295                    "duration_seconds": round(duration, 6),
296                    "csv_path": csv_path,
297                },
298            )
299
300            result: dict[str, Any] = {
301                "csv_path": csv_path,
302                "record_count": record_count,
303                "duration_seconds": duration,
304                "success": True,
305                "error": None,
306            }
307            if self._is_evtx_artifact(function_name):
308                result["csv_paths"] = [str(p) for p in all_csv_paths]
309            return result
310        except Exception as error:
311            duration = perf_counter() - start_time
312            error_message = str(error)
313            error_traceback = traceback.format_exc()
314            self.audit_logger.log(
315                "parsing_failed",
316                {
317                    "artifact_key": artifact_key,
318                    "artifact_name": artifact.get("name", artifact_key),
319                    "function": function_name,
320                    "error": error_message,
321                    "traceback": error_traceback,
322                    "duration_seconds": round(duration, 6),
323                },
324            )
325            return {
326                "csv_path": "",
327                "record_count": record_count,
328                "duration_seconds": duration,
329                "success": False,
330                "error": error_message,
331            }
332
333    def _safe_read_target_attribute(self, attribute_names: tuple[str, ...]) -> Any:
334        """Read a target attribute by trying multiple candidate names.
335
336        Args:
337            attribute_names: Ordered tuple of attribute names to try.
338
339        Returns:
340            The first non-empty value found, or :data:`UNKNOWN_VALUE`.
341        """
342        for attribute_name in attribute_names:
343            try:
344                value = getattr(self.target, attribute_name)
345            except Exception:
346                continue
347
348            if callable(value):
349                try:
350                    value = value()
351                except Exception:
352                    continue
353
354            if value in (None, ""):
355                continue
356
357            return value
358
359        return UNKNOWN_VALUE
360
361    def _write_records_to_csv(
362        self,
363        records: Iterable[Any],
364        csv_output_path: Path,
365        progress_callback: Callable[..., None] | None,
366        artifact_key: str,
367    ) -> int:
368        """Stream Dissect records to a CSV file, handling dynamic schemas.
369
370        If the record schema expands mid-stream (new columns appear), the
371        file is rewritten at the end with the complete header row via
372        :meth:`_rewrite_csv_with_expanded_headers`.
373
374        Args:
375            records: Iterable of Dissect record objects.
376            csv_output_path: Destination CSV file path.
377            progress_callback: Optional progress callback.
378            artifact_key: Artifact key for audit/progress reporting.
379
380        Returns:
381            Total number of records written.
382        """
383        record_count = 0
384        fieldnames: list[str] = []
385        fieldnames_set: set[str] = set()
386        headers_expanded = False
387
388        with csv_output_path.open("w", newline="", encoding="utf-8") as csv_file:
389            writer: csv.DictWriter | None = None
390            for record in records:
391                record_dict = self._record_to_dict(record)
392
393                new_keys = [str(k) for k in record_dict.keys() if str(k) not in fieldnames_set]
394                if new_keys:
395                    fieldnames.extend(new_keys)
396                    fieldnames_set.update(new_keys)
397                    if writer is not None:
398                        headers_expanded = True
399                    writer = csv.DictWriter(
400                        csv_file, fieldnames=fieldnames, restval="", extrasaction="ignore",
401                    )
402                    if not headers_expanded:
403                        writer.writeheader()
404
405                row = {
406                    fn: self._stringify_csv_value(record_dict.get(fn))
407                    for fn in fieldnames
408                }
409                if writer is not None:
410                    writer.writerow(row)
411                record_count += 1
412
413                if record_count >= MAX_RECORDS_PER_ARTIFACT:
414                    self.audit_logger.log(
415                        "parsing_capped",
416                        {
417                            "artifact_key": artifact_key,
418                            "record_count": record_count,
419                            "max_records": MAX_RECORDS_PER_ARTIFACT,
420                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
421                        },
422                    )
423                    break
424
425                if progress_callback is not None and record_count % 1000 == 0:
426                    self._emit_progress(progress_callback, artifact_key, record_count)
427
428        if headers_expanded and record_count > 0:
429            self._rewrite_csv_with_expanded_headers(csv_output_path, fieldnames)
430
431        if progress_callback is not None:
432            self._emit_progress(progress_callback, artifact_key, record_count)
433
434        return record_count
435
436    def _rewrite_csv_with_expanded_headers(self, csv_path: Path, fieldnames: list[str]) -> None:
437        """Rewrite a CSV whose header is incomplete due to mid-stream schema changes.
438
439        Because fieldnames are only ever appended, row values are positionally
440        aligned: shorter rows (written before expansion) just need empty-string
441        padding for the new trailing columns.
442        """
443        temp_path = csv_path.with_suffix(".csv.tmp")
444        num_fields = len(fieldnames)
445        with csv_path.open("r", newline="", encoding="utf-8") as src, \
446             temp_path.open("w", newline="", encoding="utf-8") as dst:
447            reader = csv.reader(src)
448            csv_writer = csv.writer(dst)
449            csv_writer.writerow(fieldnames)
450            next(reader, None)  # skip original (incomplete) header
451            for row in reader:
452                if len(row) < num_fields:
453                    row.extend([""] * (num_fields - len(row)))
454                csv_writer.writerow(row)
455        temp_path.replace(csv_path)
456
457    def _write_evtx_records(
458        self,
459        artifact_key: str,
460        records: Any,
461        progress_callback: Callable[..., None] | None,
462    ) -> tuple[list[Path], int]:
463        """Stream EVTX records into per-channel CSV files with automatic splitting.
464
465        Records are grouped by their channel or provider name.  When a
466        single group exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`, a new
467        part file is created.
468
469        Args:
470            artifact_key: Artifact key for filename construction.
471            records: Iterable of Dissect EVTX record objects.
472            progress_callback: Optional progress callback.
473
474        Returns:
475            Tuple of ``(csv_paths, total_record_count)``.
476        """
477        writers: dict[str, dict[str, Any]] = {}
478        csv_paths: list[Path] = []
479        record_count = 0
480
481        try:
482            for record in records:
483                if record_count >= MAX_RECORDS_PER_ARTIFACT:
484                    self.audit_logger.log(
485                        "parsing_capped",
486                        {
487                            "artifact_key": artifact_key,
488                            "record_count": record_count,
489                            "max_records": MAX_RECORDS_PER_ARTIFACT,
490                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
491                        },
492                    )
493                    break
494
495                record_dict = self._record_to_dict(record)
496                group_name = self._extract_evtx_group_name(record_dict)
497
498                writer_state = writers.get(group_name)
499                if writer_state is None:
500                    writer_state = self._open_evtx_writer(artifact_key=artifact_key, group_name=group_name, part=1)
501                    writers[group_name] = writer_state
502                    csv_paths.append(writer_state["path"])
503                elif writer_state["records_in_file"] >= EVTX_MAX_RECORDS_PER_FILE:
504                    writer_state["handle"].close()
505                    next_part = int(writer_state["part"]) + 1
506                    writer_state = self._open_evtx_writer(
507                        artifact_key=artifact_key,
508                        group_name=group_name,
509                        part=next_part,
510                    )
511                    writers[group_name] = writer_state
512                    csv_paths.append(writer_state["path"])
513
514                if writer_state["fieldnames"] is None:
515                    fieldnames = [str(key) for key in record_dict.keys()]
516                    writer_state["fieldnames"] = fieldnames
517                    writer_state["fieldnames_set"] = set(fieldnames)
518                    writer_state["writer"] = csv.DictWriter(
519                        writer_state["handle"],
520                        fieldnames=fieldnames,
521                        extrasaction="ignore",
522                    )
523                    writer_state["writer"].writeheader()
524                else:
525                    new_keys = [
526                        str(k) for k in record_dict.keys()
527                        if str(k) not in writer_state["fieldnames_set"]
528                    ]
529                    if new_keys:
530                        writer_state["fieldnames"].extend(new_keys)
531                        writer_state["fieldnames_set"].update(new_keys)
532                        writer_state["headers_expanded"] = True
533                        writer_state["writer"] = csv.DictWriter(
534                            writer_state["handle"],
535                            fieldnames=writer_state["fieldnames"],
536                            extrasaction="ignore",
537                        )
538
539                fieldnames = writer_state["fieldnames"]
540                row = {
541                    fieldname: self._stringify_csv_value(record_dict.get(fieldname))
542                    for fieldname in fieldnames
543                }
544                writer_state["writer"].writerow(row)
545                writer_state["records_in_file"] += 1
546                record_count += 1
547
548                if progress_callback is not None and record_count % 1000 == 0:
549                    self._emit_progress(progress_callback, artifact_key, record_count)
550        finally:
551            for writer_state in writers.values():
552                writer_state["handle"].close()
553
554        for writer_state in writers.values():
555            if writer_state["headers_expanded"] and writer_state["records_in_file"] > 0:
556                self._rewrite_csv_with_expanded_headers(
557                    writer_state["path"], writer_state["fieldnames"],
558                )
559
560        if progress_callback is not None:
561            self._emit_progress(progress_callback, artifact_key, record_count)
562
563        return csv_paths, record_count
564
565    def _open_evtx_writer(self, artifact_key: str, group_name: str, part: int) -> dict[str, Any]:
566        """Open a new CSV file for an EVTX channel group and return writer state.
567
568        Args:
569            artifact_key: Parent artifact key for filename construction.
570            group_name: EVTX channel or provider name.
571            part: 1-based part number for multi-file splits.
572
573        Returns:
574            Dictionary containing ``path``, ``handle``, ``writer``,
575            ``fieldnames``, ``fieldnames_set``, ``headers_expanded``,
576            ``records_in_file``, and ``part``.
577        """
578        artifact_stub = self._sanitize_filename(artifact_key)
579        group_stub = self._sanitize_filename(group_name)
580        filename = f"{artifact_stub}_{group_stub}.csv" if part == 1 else f"{artifact_stub}_{group_stub}_part{part}.csv"
581        output_path = self.parsed_dir / filename
582
583        handle = output_path.open("w", newline="", encoding="utf-8")
584        return {
585            "path": output_path,
586            "handle": handle,
587            "writer": None,
588            "fieldnames": None,
589            "fieldnames_set": None,
590            "headers_expanded": False,
591            "records_in_file": 0,
592            "part": part,
593        }
594
595    def _extract_evtx_group_name(self, record_dict: dict[str, Any]) -> str:
596        """Determine the channel/provider group name for an EVTX record.
597
598        Checks multiple candidate keys (``channel``, ``Channel``,
599        ``provider``, etc.) and returns the first non-empty value.
600
601        Args:
602            record_dict: Dictionary representation of the EVTX record.
603
604        Returns:
605            Channel or provider name, or ``"unknown"`` if none found.
606        """
607        channel = self._find_record_value(
608            record_dict,
609            (
610                "channel",
611                "Channel",
612                "log_name",
613                "LogName",
614                "event_log",
615                "EventLog",
616            ),
617        )
618        provider = self._find_record_value(
619            record_dict,
620            (
621                "provider",
622                "Provider",
623                "provider_name",
624                "ProviderName",
625                "source",
626                "Source",
627            ),
628        )
629
630        if channel:
631            return channel
632        if provider:
633            return provider
634        return "unknown"
635
636    @staticmethod
637    def _record_to_dict(record: Any) -> dict[str, Any]:
638        """Convert a Dissect record to a plain dictionary.
639
640        Handles Dissect ``Record`` objects (via ``_asdict()``), plain
641        dicts, and objects with a ``__dict__``.
642
643        Args:
644            record: A Dissect record or dict-like object.
645
646        Returns:
647            A plain dictionary of field names to values.
648
649        Raises:
650            TypeError: If the record cannot be converted.
651        """
652        if hasattr(record, "_asdict"):
653            as_dict = record._asdict()
654            if isinstance(as_dict, dict):
655                return dict(as_dict)
656
657        if isinstance(record, dict):
658            return dict(record)
659
660        try:
661            return dict(vars(record))
662        except TypeError as exc:
663            raise TypeError("Artifact record cannot be converted to a dictionary.") from exc
664
665    @staticmethod
666    def _stringify_csv_value(value: Any) -> str:
667        """Convert a record field value to a CSV-safe string.
668
669        Handles ``datetime``, ``bytes``, ``None``, and other types that
670        Dissect records may yield.
671
672        Args:
673            value: The raw field value from a Dissect record.
674
675        Returns:
676            String representation suitable for CSV output.
677        """
678        if value is None:
679            return ""
680        if isinstance(value, (datetime, date, time)):
681            return value.isoformat()
682        if isinstance(value, (bytes, bytearray, memoryview)):
683            raw = bytes(value)
684            if len(raw) > 512:
685                return raw[:512].hex() + "..."
686            return raw.hex()
687        return str(value)
688
689    @staticmethod
690    def _find_record_value(record_dict: dict[str, Any], candidate_keys: tuple[str, ...]) -> str:
691        """Return the first non-empty value from *candidate_keys* in *record_dict*.
692
693        Args:
694            record_dict: Dictionary to search.
695            candidate_keys: Ordered tuple of keys to try.
696
697        Returns:
698            The first non-empty string value, or ``""`` if none found.
699        """
700        for key in candidate_keys:
701            if key in record_dict and record_dict[key] not in (None, ""):
702                return str(record_dict[key])
703        return ""
704
705    @staticmethod
706    def _sanitize_filename(value: str) -> str:
707        """Replace non-alphanumeric characters with underscores for safe filenames.
708
709        Args:
710            value: Raw string to sanitise.
711
712        Returns:
713            Filesystem-safe string, or ``"artifact"`` if empty after cleaning.
714        """
715        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")
716        return cleaned or "artifact"
717
718    @staticmethod
719    def _is_evtx_artifact(function_name: str) -> bool:
720        """Return *True* if *function_name* indicates an EVTX artifact."""
721        return function_name == "evtx" or function_name.endswith(".evtx")
722
723    @staticmethod
724    def _emit_progress(
725        progress_callback: Callable[..., None],
726        artifact_key: str,
727        record_count: int,
728    ) -> None:
729        """Invoke the progress callback, tolerating varying signatures.
730
731        Tries ``callback(dict)``, then ``callback(key, count)``, then
732        ``callback(count)`` to accommodate different caller conventions.
733
734        Args:
735            progress_callback: Callable to invoke.
736            artifact_key: Current artifact being parsed.
737            record_count: Number of records processed so far.
738        """
739        payload = {"artifact_key": artifact_key, "record_count": record_count}
740        try:
741            progress_callback(payload)
742            return
743        except TypeError:
744            pass
745
746        try:
747            progress_callback(artifact_key, record_count)  # type: ignore[misc]
748            return
749        except TypeError:
750            pass
751
752        try:
753            progress_callback(record_count)  # type: ignore[misc]
754        except Exception:
755            return
class ForensicParser:
 54class ForensicParser:
 55    """Parse supported forensic artifacts from a Dissect target into CSV files.
 56
 57    Opens a disk image via Dissect's ``Target.open()``, queries available
 58    artifacts, and streams their records to CSV files in the case's parsed
 59    directory.  Implements the context manager protocol for deterministic
 60    resource cleanup.
 61
 62    Attributes:
 63        evidence_path: Path to the source evidence file.
 64        case_dir: Root directory for this forensic case.
 65        audit_logger: :class:`~app.audit.AuditLogger` for recording actions.
 66        parsed_dir: Directory where output CSV files are written.
 67        target: The open Dissect ``Target`` handle.
 68    """
 69
 70    def __init__(
 71        self,
 72        evidence_path: str | Path,
 73        case_dir: str | Path,
 74        audit_logger: Any,
 75        parsed_dir: str | Path | None = None,
 76    ) -> None:
 77        """Initialise the parser and open the Dissect target.
 78
 79        Args:
 80            evidence_path: Path to the disk image or evidence container.
 81            case_dir: Case-specific directory for output and audit data.
 82            audit_logger: Logger instance for writing audit trail entries.
 83            parsed_dir: Optional override for the CSV output directory.
 84                Defaults to ``<case_dir>/parsed/``.
 85        """
 86        self.evidence_path = Path(evidence_path)
 87        self.case_dir = Path(case_dir)
 88        self.audit_logger = audit_logger
 89        self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed"
 90        self.parsed_dir.mkdir(parents=True, exist_ok=True)
 91        self.target = Target.open(self.evidence_path)
 92        self._closed = False
 93
 94        try:
 95            self.os_type: str = str(self.target.os).strip().lower()
 96        except Exception:
 97            self.os_type = "unknown"
 98
 99    def close(self) -> None:
100        """Close the underlying Dissect target handle."""
101        if self._closed:
102            return
103
104        try:
105            close_method = getattr(self.target, "close", None)
106        except Exception:
107            close_method = None
108        if callable(close_method):
109            close_method()
110        self._closed = True
111
112    def __enter__(self) -> ForensicParser:
113        """Enter the runtime context and return the parser instance."""
114        return self
115
116    def __exit__(
117        self,
118        exc_type: type[BaseException] | None,
119        exc_val: BaseException | None,
120        exc_tb: TracebackType | None,
121    ) -> bool:
122        """Exit the runtime context, closing the Dissect target."""
123        del exc_type, exc_val, exc_tb
124        self.close()
125        return False
126
127    def get_image_metadata(self) -> dict[str, str]:
128        """Extract key system metadata from the Dissect target.
129
130        Attempts multiple attribute name variants for each field (e.g.
131        ``hostname``, ``computer_name``, ``name``) to accommodate
132        different OS profiles.
133
134        Returns:
135            Dictionary with keys ``hostname``, ``os_version``, ``domain``,
136            ``ips``, ``timezone``, and ``install_date``.
137        """
138        hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name")))
139        os_version = str(self._safe_read_target_attribute(("os_version", "version")))
140        domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup")))
141        timezone = str(self._safe_read_target_attribute(("timezone", "tz")))
142        install_date = str(self._safe_read_target_attribute(("install_date", "installdate")))
143
144        ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip"))
145        if isinstance(ips_value, (list, tuple, set)):
146            ips = ", ".join(str(value) for value in ips_value if value not in (None, ""))
147            if not ips:
148                ips = UNKNOWN_VALUE
149        else:
150            ips = str(ips_value)
151
152        return {
153            "hostname": hostname,
154            "os_version": os_version,
155            "domain": domain,
156            "ips": ips,
157            "timezone": timezone,
158            "install_date": install_date,
159        }
160
161    def get_available_artifacts(self) -> list[dict[str, Any]]:
162        """Return the artifact registry annotated with availability flags.
163
164        Detects the target OS via ``target.os`` and selects the
165        appropriate artifact registry (Windows or Linux).  Probes the
166        Dissect target for each registered artifact and sets an
167        ``available`` boolean on the returned metadata dictionaries.
168
169        Returns:
170            List of artifact metadata dicts, each augmented with ``key``
171            and ``available`` fields.
172        """
173        registry = get_artifact_registry(self.os_type)
174        available_artifacts: list[dict[str, Any]] = []
175        for artifact_key, artifact_details in registry.items():
176            function_name = str(artifact_details.get("function", artifact_key))
177            try:
178                available = bool(self.target.has_function(function_name))
179            except (PluginError, UnsupportedPluginError):
180                available = False
181
182            available_artifact = dict(artifact_details)
183            available_artifact["key"] = artifact_key
184            available_artifact["available"] = available
185            available_artifacts.append(available_artifact)
186
187        return available_artifacts
188
189    def _call_target_function(self, function_name: str) -> Any:
190        """Invoke a Dissect function on the target, including namespaced functions.
191
192        For simple names like ``"shimcache"`` it calls ``target.shimcache()``.
193        For dotted names like ``"browser.history"`` it traverses the namespace
194        chain (``target.browser.history()``) and calls the final attribute.
195        """
196        if "." not in function_name:
197            function = getattr(self.target, function_name)
198            return function() if callable(function) else function
199
200        current: Any = self.target
201        parts = function_name.split(".")
202        try:
203            for namespace in parts:
204                current = getattr(current, namespace)
205        except Exception:
206            logger.warning(
207                "Failed to resolve nested function '%s' (stopped at '%s')",
208                function_name,
209                namespace,
210                exc_info=True,
211            )
212            raise
213
214        return current() if callable(current) else current
215
216    def parse_artifact(
217        self,
218        artifact_key: str,
219        progress_callback: Callable[..., None] | None = None,
220    ) -> dict[str, Any]:
221        """Parse a single artifact and stream its records to one or more CSV files.
222
223        Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``)
224        to the audit trail.  EVTX artifacts are split by channel/provider
225        into separate CSV files.
226
227        Args:
228            artifact_key: Key from the OS-specific artifact registry identifying
229                the artifact to parse.
230            progress_callback: Optional callback invoked every 1 000 records
231                with progress information.
232
233        Returns:
234            Result dictionary with keys ``csv_path``, ``record_count``,
235            ``duration_seconds``, ``success``, and ``error``.  EVTX
236            results also include a ``csv_paths`` list.
237        """
238        registry = get_artifact_registry(self.os_type)
239        artifact = registry.get(artifact_key)
240        if artifact is None:
241            return {
242                "csv_path": "",
243                "record_count": 0,
244                "duration_seconds": 0.0,
245                "success": False,
246                "error": f"Unknown artifact key: {artifact_key}",
247            }
248
249        function_name = str(artifact.get("function", artifact_key))
250        start_time = perf_counter()
251        record_count = 0
252        csv_path = ""
253
254        self.audit_logger.log(
255            "parsing_started",
256            {
257                "artifact_key": artifact_key,
258                "artifact_name": artifact.get("name", artifact_key),
259                "function": function_name,
260            },
261        )
262
263        try:
264            records = self._call_target_function(function_name)
265            if self._is_evtx_artifact(function_name):
266                all_csv_paths, record_count = self._write_evtx_records(
267                    artifact_key=artifact_key,
268                    records=records,
269                    progress_callback=progress_callback,
270                )
271                if all_csv_paths:
272                    csv_path = str(all_csv_paths[0])
273                else:
274                    empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
275                    empty_output.touch(exist_ok=True)
276                    csv_path = str(empty_output)
277                    all_csv_paths = [empty_output]
278            else:
279                csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
280                record_count = self._write_records_to_csv(
281                    records=records,
282                    csv_output_path=csv_output,
283                    progress_callback=progress_callback,
284                    artifact_key=artifact_key,
285                )
286                csv_path = str(csv_output)
287
288            duration = perf_counter() - start_time
289            self.audit_logger.log(
290                "parsing_completed",
291                {
292                    "artifact_key": artifact_key,
293                    "artifact_name": artifact.get("name", artifact_key),
294                    "function": function_name,
295                    "record_count": record_count,
296                    "duration_seconds": round(duration, 6),
297                    "csv_path": csv_path,
298                },
299            )
300
301            result: dict[str, Any] = {
302                "csv_path": csv_path,
303                "record_count": record_count,
304                "duration_seconds": duration,
305                "success": True,
306                "error": None,
307            }
308            if self._is_evtx_artifact(function_name):
309                result["csv_paths"] = [str(p) for p in all_csv_paths]
310            return result
311        except Exception as error:
312            duration = perf_counter() - start_time
313            error_message = str(error)
314            error_traceback = traceback.format_exc()
315            self.audit_logger.log(
316                "parsing_failed",
317                {
318                    "artifact_key": artifact_key,
319                    "artifact_name": artifact.get("name", artifact_key),
320                    "function": function_name,
321                    "error": error_message,
322                    "traceback": error_traceback,
323                    "duration_seconds": round(duration, 6),
324                },
325            )
326            return {
327                "csv_path": "",
328                "record_count": record_count,
329                "duration_seconds": duration,
330                "success": False,
331                "error": error_message,
332            }
333
334    def _safe_read_target_attribute(self, attribute_names: tuple[str, ...]) -> Any:
335        """Read a target attribute by trying multiple candidate names.
336
337        Args:
338            attribute_names: Ordered tuple of attribute names to try.
339
340        Returns:
341            The first non-empty value found, or :data:`UNKNOWN_VALUE`.
342        """
343        for attribute_name in attribute_names:
344            try:
345                value = getattr(self.target, attribute_name)
346            except Exception:
347                continue
348
349            if callable(value):
350                try:
351                    value = value()
352                except Exception:
353                    continue
354
355            if value in (None, ""):
356                continue
357
358            return value
359
360        return UNKNOWN_VALUE
361
362    def _write_records_to_csv(
363        self,
364        records: Iterable[Any],
365        csv_output_path: Path,
366        progress_callback: Callable[..., None] | None,
367        artifact_key: str,
368    ) -> int:
369        """Stream Dissect records to a CSV file, handling dynamic schemas.
370
371        If the record schema expands mid-stream (new columns appear), the
372        file is rewritten at the end with the complete header row via
373        :meth:`_rewrite_csv_with_expanded_headers`.
374
375        Args:
376            records: Iterable of Dissect record objects.
377            csv_output_path: Destination CSV file path.
378            progress_callback: Optional progress callback.
379            artifact_key: Artifact key for audit/progress reporting.
380
381        Returns:
382            Total number of records written.
383        """
384        record_count = 0
385        fieldnames: list[str] = []
386        fieldnames_set: set[str] = set()
387        headers_expanded = False
388
389        with csv_output_path.open("w", newline="", encoding="utf-8") as csv_file:
390            writer: csv.DictWriter | None = None
391            for record in records:
392                record_dict = self._record_to_dict(record)
393
394                new_keys = [str(k) for k in record_dict.keys() if str(k) not in fieldnames_set]
395                if new_keys:
396                    fieldnames.extend(new_keys)
397                    fieldnames_set.update(new_keys)
398                    if writer is not None:
399                        headers_expanded = True
400                    writer = csv.DictWriter(
401                        csv_file, fieldnames=fieldnames, restval="", extrasaction="ignore",
402                    )
403                    if not headers_expanded:
404                        writer.writeheader()
405
406                row = {
407                    fn: self._stringify_csv_value(record_dict.get(fn))
408                    for fn in fieldnames
409                }
410                if writer is not None:
411                    writer.writerow(row)
412                record_count += 1
413
414                if record_count >= MAX_RECORDS_PER_ARTIFACT:
415                    self.audit_logger.log(
416                        "parsing_capped",
417                        {
418                            "artifact_key": artifact_key,
419                            "record_count": record_count,
420                            "max_records": MAX_RECORDS_PER_ARTIFACT,
421                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
422                        },
423                    )
424                    break
425
426                if progress_callback is not None and record_count % 1000 == 0:
427                    self._emit_progress(progress_callback, artifact_key, record_count)
428
429        if headers_expanded and record_count > 0:
430            self._rewrite_csv_with_expanded_headers(csv_output_path, fieldnames)
431
432        if progress_callback is not None:
433            self._emit_progress(progress_callback, artifact_key, record_count)
434
435        return record_count
436
437    def _rewrite_csv_with_expanded_headers(self, csv_path: Path, fieldnames: list[str]) -> None:
438        """Rewrite a CSV whose header is incomplete due to mid-stream schema changes.
439
440        Because fieldnames are only ever appended, row values are positionally
441        aligned: shorter rows (written before expansion) just need empty-string
442        padding for the new trailing columns.
443        """
444        temp_path = csv_path.with_suffix(".csv.tmp")
445        num_fields = len(fieldnames)
446        with csv_path.open("r", newline="", encoding="utf-8") as src, \
447             temp_path.open("w", newline="", encoding="utf-8") as dst:
448            reader = csv.reader(src)
449            csv_writer = csv.writer(dst)
450            csv_writer.writerow(fieldnames)
451            next(reader, None)  # skip original (incomplete) header
452            for row in reader:
453                if len(row) < num_fields:
454                    row.extend([""] * (num_fields - len(row)))
455                csv_writer.writerow(row)
456        temp_path.replace(csv_path)
457
458    def _write_evtx_records(
459        self,
460        artifact_key: str,
461        records: Any,
462        progress_callback: Callable[..., None] | None,
463    ) -> tuple[list[Path], int]:
464        """Stream EVTX records into per-channel CSV files with automatic splitting.
465
466        Records are grouped by their channel or provider name.  When a
467        single group exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`, a new
468        part file is created.
469
470        Args:
471            artifact_key: Artifact key for filename construction.
472            records: Iterable of Dissect EVTX record objects.
473            progress_callback: Optional progress callback.
474
475        Returns:
476            Tuple of ``(csv_paths, total_record_count)``.
477        """
478        writers: dict[str, dict[str, Any]] = {}
479        csv_paths: list[Path] = []
480        record_count = 0
481
482        try:
483            for record in records:
484                if record_count >= MAX_RECORDS_PER_ARTIFACT:
485                    self.audit_logger.log(
486                        "parsing_capped",
487                        {
488                            "artifact_key": artifact_key,
489                            "record_count": record_count,
490                            "max_records": MAX_RECORDS_PER_ARTIFACT,
491                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
492                        },
493                    )
494                    break
495
496                record_dict = self._record_to_dict(record)
497                group_name = self._extract_evtx_group_name(record_dict)
498
499                writer_state = writers.get(group_name)
500                if writer_state is None:
501                    writer_state = self._open_evtx_writer(artifact_key=artifact_key, group_name=group_name, part=1)
502                    writers[group_name] = writer_state
503                    csv_paths.append(writer_state["path"])
504                elif writer_state["records_in_file"] >= EVTX_MAX_RECORDS_PER_FILE:
505                    writer_state["handle"].close()
506                    next_part = int(writer_state["part"]) + 1
507                    writer_state = self._open_evtx_writer(
508                        artifact_key=artifact_key,
509                        group_name=group_name,
510                        part=next_part,
511                    )
512                    writers[group_name] = writer_state
513                    csv_paths.append(writer_state["path"])
514
515                if writer_state["fieldnames"] is None:
516                    fieldnames = [str(key) for key in record_dict.keys()]
517                    writer_state["fieldnames"] = fieldnames
518                    writer_state["fieldnames_set"] = set(fieldnames)
519                    writer_state["writer"] = csv.DictWriter(
520                        writer_state["handle"],
521                        fieldnames=fieldnames,
522                        extrasaction="ignore",
523                    )
524                    writer_state["writer"].writeheader()
525                else:
526                    new_keys = [
527                        str(k) for k in record_dict.keys()
528                        if str(k) not in writer_state["fieldnames_set"]
529                    ]
530                    if new_keys:
531                        writer_state["fieldnames"].extend(new_keys)
532                        writer_state["fieldnames_set"].update(new_keys)
533                        writer_state["headers_expanded"] = True
534                        writer_state["writer"] = csv.DictWriter(
535                            writer_state["handle"],
536                            fieldnames=writer_state["fieldnames"],
537                            extrasaction="ignore",
538                        )
539
540                fieldnames = writer_state["fieldnames"]
541                row = {
542                    fieldname: self._stringify_csv_value(record_dict.get(fieldname))
543                    for fieldname in fieldnames
544                }
545                writer_state["writer"].writerow(row)
546                writer_state["records_in_file"] += 1
547                record_count += 1
548
549                if progress_callback is not None and record_count % 1000 == 0:
550                    self._emit_progress(progress_callback, artifact_key, record_count)
551        finally:
552            for writer_state in writers.values():
553                writer_state["handle"].close()
554
555        for writer_state in writers.values():
556            if writer_state["headers_expanded"] and writer_state["records_in_file"] > 0:
557                self._rewrite_csv_with_expanded_headers(
558                    writer_state["path"], writer_state["fieldnames"],
559                )
560
561        if progress_callback is not None:
562            self._emit_progress(progress_callback, artifact_key, record_count)
563
564        return csv_paths, record_count
565
566    def _open_evtx_writer(self, artifact_key: str, group_name: str, part: int) -> dict[str, Any]:
567        """Open a new CSV file for an EVTX channel group and return writer state.
568
569        Args:
570            artifact_key: Parent artifact key for filename construction.
571            group_name: EVTX channel or provider name.
572            part: 1-based part number for multi-file splits.
573
574        Returns:
575            Dictionary containing ``path``, ``handle``, ``writer``,
576            ``fieldnames``, ``fieldnames_set``, ``headers_expanded``,
577            ``records_in_file``, and ``part``.
578        """
579        artifact_stub = self._sanitize_filename(artifact_key)
580        group_stub = self._sanitize_filename(group_name)
581        filename = f"{artifact_stub}_{group_stub}.csv" if part == 1 else f"{artifact_stub}_{group_stub}_part{part}.csv"
582        output_path = self.parsed_dir / filename
583
584        handle = output_path.open("w", newline="", encoding="utf-8")
585        return {
586            "path": output_path,
587            "handle": handle,
588            "writer": None,
589            "fieldnames": None,
590            "fieldnames_set": None,
591            "headers_expanded": False,
592            "records_in_file": 0,
593            "part": part,
594        }
595
596    def _extract_evtx_group_name(self, record_dict: dict[str, Any]) -> str:
597        """Determine the channel/provider group name for an EVTX record.
598
599        Checks multiple candidate keys (``channel``, ``Channel``,
600        ``provider``, etc.) and returns the first non-empty value.
601
602        Args:
603            record_dict: Dictionary representation of the EVTX record.
604
605        Returns:
606            Channel or provider name, or ``"unknown"`` if none found.
607        """
608        channel = self._find_record_value(
609            record_dict,
610            (
611                "channel",
612                "Channel",
613                "log_name",
614                "LogName",
615                "event_log",
616                "EventLog",
617            ),
618        )
619        provider = self._find_record_value(
620            record_dict,
621            (
622                "provider",
623                "Provider",
624                "provider_name",
625                "ProviderName",
626                "source",
627                "Source",
628            ),
629        )
630
631        if channel:
632            return channel
633        if provider:
634            return provider
635        return "unknown"
636
637    @staticmethod
638    def _record_to_dict(record: Any) -> dict[str, Any]:
639        """Convert a Dissect record to a plain dictionary.
640
641        Handles Dissect ``Record`` objects (via ``_asdict()``), plain
642        dicts, and objects with a ``__dict__``.
643
644        Args:
645            record: A Dissect record or dict-like object.
646
647        Returns:
648            A plain dictionary of field names to values.
649
650        Raises:
651            TypeError: If the record cannot be converted.
652        """
653        if hasattr(record, "_asdict"):
654            as_dict = record._asdict()
655            if isinstance(as_dict, dict):
656                return dict(as_dict)
657
658        if isinstance(record, dict):
659            return dict(record)
660
661        try:
662            return dict(vars(record))
663        except TypeError as exc:
664            raise TypeError("Artifact record cannot be converted to a dictionary.") from exc
665
666    @staticmethod
667    def _stringify_csv_value(value: Any) -> str:
668        """Convert a record field value to a CSV-safe string.
669
670        Handles ``datetime``, ``bytes``, ``None``, and other types that
671        Dissect records may yield.
672
673        Args:
674            value: The raw field value from a Dissect record.
675
676        Returns:
677            String representation suitable for CSV output.
678        """
679        if value is None:
680            return ""
681        if isinstance(value, (datetime, date, time)):
682            return value.isoformat()
683        if isinstance(value, (bytes, bytearray, memoryview)):
684            raw = bytes(value)
685            if len(raw) > 512:
686                return raw[:512].hex() + "..."
687            return raw.hex()
688        return str(value)
689
690    @staticmethod
691    def _find_record_value(record_dict: dict[str, Any], candidate_keys: tuple[str, ...]) -> str:
692        """Return the first non-empty value from *candidate_keys* in *record_dict*.
693
694        Args:
695            record_dict: Dictionary to search.
696            candidate_keys: Ordered tuple of keys to try.
697
698        Returns:
699            The first non-empty string value, or ``""`` if none found.
700        """
701        for key in candidate_keys:
702            if key in record_dict and record_dict[key] not in (None, ""):
703                return str(record_dict[key])
704        return ""
705
706    @staticmethod
707    def _sanitize_filename(value: str) -> str:
708        """Replace non-alphanumeric characters with underscores for safe filenames.
709
710        Args:
711            value: Raw string to sanitise.
712
713        Returns:
714            Filesystem-safe string, or ``"artifact"`` if empty after cleaning.
715        """
716        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")
717        return cleaned or "artifact"
718
719    @staticmethod
720    def _is_evtx_artifact(function_name: str) -> bool:
721        """Return *True* if *function_name* indicates an EVTX artifact."""
722        return function_name == "evtx" or function_name.endswith(".evtx")
723
724    @staticmethod
725    def _emit_progress(
726        progress_callback: Callable[..., None],
727        artifact_key: str,
728        record_count: int,
729    ) -> None:
730        """Invoke the progress callback, tolerating varying signatures.
731
732        Tries ``callback(dict)``, then ``callback(key, count)``, then
733        ``callback(count)`` to accommodate different caller conventions.
734
735        Args:
736            progress_callback: Callable to invoke.
737            artifact_key: Current artifact being parsed.
738            record_count: Number of records processed so far.
739        """
740        payload = {"artifact_key": artifact_key, "record_count": record_count}
741        try:
742            progress_callback(payload)
743            return
744        except TypeError:
745            pass
746
747        try:
748            progress_callback(artifact_key, record_count)  # type: ignore[misc]
749            return
750        except TypeError:
751            pass
752
753        try:
754            progress_callback(record_count)  # type: ignore[misc]
755        except Exception:
756            return

Parse supported forensic artifacts from a Dissect target into CSV files.

Opens a disk image via Dissect's Target.open(), queries available artifacts, and streams their records to CSV files in the case's parsed directory. Implements the context manager protocol for deterministic resource cleanup.

Attributes:
  • evidence_path: Path to the source evidence file.
  • case_dir: Root directory for this forensic case.
  • audit_logger: ~app.audit.AuditLogger for recording actions.
  • parsed_dir: Directory where output CSV files are written.
  • target: The open Dissect Target handle.
ForensicParser( evidence_path: str | pathlib.Path, case_dir: str | pathlib.Path, audit_logger: Any, parsed_dir: str | pathlib.Path | None = None)
70    def __init__(
71        self,
72        evidence_path: str | Path,
73        case_dir: str | Path,
74        audit_logger: Any,
75        parsed_dir: str | Path | None = None,
76    ) -> None:
77        """Initialise the parser and open the Dissect target.
78
79        Args:
80            evidence_path: Path to the disk image or evidence container.
81            case_dir: Case-specific directory for output and audit data.
82            audit_logger: Logger instance for writing audit trail entries.
83            parsed_dir: Optional override for the CSV output directory.
84                Defaults to ``<case_dir>/parsed/``.
85        """
86        self.evidence_path = Path(evidence_path)
87        self.case_dir = Path(case_dir)
88        self.audit_logger = audit_logger
89        self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed"
90        self.parsed_dir.mkdir(parents=True, exist_ok=True)
91        self.target = Target.open(self.evidence_path)
92        self._closed = False
93
94        try:
95            self.os_type: str = str(self.target.os).strip().lower()
96        except Exception:
97            self.os_type = "unknown"

Initialise the parser and open the Dissect target.

Arguments:
  • evidence_path: Path to the disk image or evidence container.
  • case_dir: Case-specific directory for output and audit data.
  • audit_logger: Logger instance for writing audit trail entries.
  • parsed_dir: Optional override for the CSV output directory. Defaults to <case_dir>/parsed/.
evidence_path
case_dir
audit_logger
parsed_dir
target
def close(self) -> None:
 99    def close(self) -> None:
100        """Close the underlying Dissect target handle."""
101        if self._closed:
102            return
103
104        try:
105            close_method = getattr(self.target, "close", None)
106        except Exception:
107            close_method = None
108        if callable(close_method):
109            close_method()
110        self._closed = True

Close the underlying Dissect target handle.

def get_image_metadata(self) -> dict[str, str]:
127    def get_image_metadata(self) -> dict[str, str]:
128        """Extract key system metadata from the Dissect target.
129
130        Attempts multiple attribute name variants for each field (e.g.
131        ``hostname``, ``computer_name``, ``name``) to accommodate
132        different OS profiles.
133
134        Returns:
135            Dictionary with keys ``hostname``, ``os_version``, ``domain``,
136            ``ips``, ``timezone``, and ``install_date``.
137        """
138        hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name")))
139        os_version = str(self._safe_read_target_attribute(("os_version", "version")))
140        domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup")))
141        timezone = str(self._safe_read_target_attribute(("timezone", "tz")))
142        install_date = str(self._safe_read_target_attribute(("install_date", "installdate")))
143
144        ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip"))
145        if isinstance(ips_value, (list, tuple, set)):
146            ips = ", ".join(str(value) for value in ips_value if value not in (None, ""))
147            if not ips:
148                ips = UNKNOWN_VALUE
149        else:
150            ips = str(ips_value)
151
152        return {
153            "hostname": hostname,
154            "os_version": os_version,
155            "domain": domain,
156            "ips": ips,
157            "timezone": timezone,
158            "install_date": install_date,
159        }

Extract key system metadata from the Dissect target.

Attempts multiple attribute name variants for each field (e.g. hostname, computer_name, name) to accommodate different OS profiles.

Returns:

Dictionary with keys hostname, os_version, domain, ips, timezone, and install_date.

def get_available_artifacts(self) -> list[dict[str, typing.Any]]:
161    def get_available_artifacts(self) -> list[dict[str, Any]]:
162        """Return the artifact registry annotated with availability flags.
163
164        Detects the target OS via ``target.os`` and selects the
165        appropriate artifact registry (Windows or Linux).  Probes the
166        Dissect target for each registered artifact and sets an
167        ``available`` boolean on the returned metadata dictionaries.
168
169        Returns:
170            List of artifact metadata dicts, each augmented with ``key``
171            and ``available`` fields.
172        """
173        registry = get_artifact_registry(self.os_type)
174        available_artifacts: list[dict[str, Any]] = []
175        for artifact_key, artifact_details in registry.items():
176            function_name = str(artifact_details.get("function", artifact_key))
177            try:
178                available = bool(self.target.has_function(function_name))
179            except (PluginError, UnsupportedPluginError):
180                available = False
181
182            available_artifact = dict(artifact_details)
183            available_artifact["key"] = artifact_key
184            available_artifact["available"] = available
185            available_artifacts.append(available_artifact)
186
187        return available_artifacts

Return the artifact registry annotated with availability flags.

Detects the target OS via target.os and selects the appropriate artifact registry (Windows or Linux). Probes the Dissect target for each registered artifact and sets an available boolean on the returned metadata dictionaries.

Returns:

List of artifact metadata dicts, each augmented with key and available fields.

def parse_artifact( self, artifact_key: str, progress_callback: Optional[Callable[..., NoneType]] = None) -> dict[str, typing.Any]:
216    def parse_artifact(
217        self,
218        artifact_key: str,
219        progress_callback: Callable[..., None] | None = None,
220    ) -> dict[str, Any]:
221        """Parse a single artifact and stream its records to one or more CSV files.
222
223        Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``)
224        to the audit trail.  EVTX artifacts are split by channel/provider
225        into separate CSV files.
226
227        Args:
228            artifact_key: Key from the OS-specific artifact registry identifying
229                the artifact to parse.
230            progress_callback: Optional callback invoked every 1 000 records
231                with progress information.
232
233        Returns:
234            Result dictionary with keys ``csv_path``, ``record_count``,
235            ``duration_seconds``, ``success``, and ``error``.  EVTX
236            results also include a ``csv_paths`` list.
237        """
238        registry = get_artifact_registry(self.os_type)
239        artifact = registry.get(artifact_key)
240        if artifact is None:
241            return {
242                "csv_path": "",
243                "record_count": 0,
244                "duration_seconds": 0.0,
245                "success": False,
246                "error": f"Unknown artifact key: {artifact_key}",
247            }
248
249        function_name = str(artifact.get("function", artifact_key))
250        start_time = perf_counter()
251        record_count = 0
252        csv_path = ""
253
254        self.audit_logger.log(
255            "parsing_started",
256            {
257                "artifact_key": artifact_key,
258                "artifact_name": artifact.get("name", artifact_key),
259                "function": function_name,
260            },
261        )
262
263        try:
264            records = self._call_target_function(function_name)
265            if self._is_evtx_artifact(function_name):
266                all_csv_paths, record_count = self._write_evtx_records(
267                    artifact_key=artifact_key,
268                    records=records,
269                    progress_callback=progress_callback,
270                )
271                if all_csv_paths:
272                    csv_path = str(all_csv_paths[0])
273                else:
274                    empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
275                    empty_output.touch(exist_ok=True)
276                    csv_path = str(empty_output)
277                    all_csv_paths = [empty_output]
278            else:
279                csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
280                record_count = self._write_records_to_csv(
281                    records=records,
282                    csv_output_path=csv_output,
283                    progress_callback=progress_callback,
284                    artifact_key=artifact_key,
285                )
286                csv_path = str(csv_output)
287
288            duration = perf_counter() - start_time
289            self.audit_logger.log(
290                "parsing_completed",
291                {
292                    "artifact_key": artifact_key,
293                    "artifact_name": artifact.get("name", artifact_key),
294                    "function": function_name,
295                    "record_count": record_count,
296                    "duration_seconds": round(duration, 6),
297                    "csv_path": csv_path,
298                },
299            )
300
301            result: dict[str, Any] = {
302                "csv_path": csv_path,
303                "record_count": record_count,
304                "duration_seconds": duration,
305                "success": True,
306                "error": None,
307            }
308            if self._is_evtx_artifact(function_name):
309                result["csv_paths"] = [str(p) for p in all_csv_paths]
310            return result
311        except Exception as error:
312            duration = perf_counter() - start_time
313            error_message = str(error)
314            error_traceback = traceback.format_exc()
315            self.audit_logger.log(
316                "parsing_failed",
317                {
318                    "artifact_key": artifact_key,
319                    "artifact_name": artifact.get("name", artifact_key),
320                    "function": function_name,
321                    "error": error_message,
322                    "traceback": error_traceback,
323                    "duration_seconds": round(duration, 6),
324                },
325            )
326            return {
327                "csv_path": "",
328                "record_count": record_count,
329                "duration_seconds": duration,
330                "success": False,
331                "error": error_message,
332            }

Parse a single artifact and stream its records to one or more CSV files.

Logs parsing_started, parsing_completed (or parsing_failed) to the audit trail. EVTX artifacts are split by channel/provider into separate CSV files.

Arguments:
  • artifact_key: Key from the OS-specific artifact registry identifying the artifact to parse.
  • progress_callback: Optional callback invoked every 1 000 records with progress information.
Returns:

Result dictionary with keys csv_path, record_count, duration_seconds, success, and error. EVTX results also include a csv_paths list.