app.parser

Forensic artifact parsing package.

Re-exports the public API so that existing from app.parser import ... statements continue to work after the module was split into a package.

 1"""Forensic artifact parsing package.
 2
 3Re-exports the public API so that existing ``from app.parser import ...``
 4statements continue to work after the module was split into a package.
 5"""
 6
 7from dissect.target.exceptions import UnsupportedPluginError
 8
 9from .core import (
10    EVTX_MAX_RECORDS_PER_FILE,
11    MAX_RECORDS_PER_ARTIFACT,
12    UNKNOWN_VALUE,
13    ForensicParser,
14)
15from .registry import (
16    LINUX_ARTIFACT_REGISTRY,
17    WINDOWS_ARTIFACT_REGISTRY,
18    get_artifact_registry,
19)
20
21__all__ = [
22    "EVTX_MAX_RECORDS_PER_FILE",
23    "ForensicParser",
24    "LINUX_ARTIFACT_REGISTRY",
25    "MAX_RECORDS_PER_ARTIFACT",
26    "UNKNOWN_VALUE",
27    "UnsupportedPluginError",
28    "WINDOWS_ARTIFACT_REGISTRY",
29    "core",
30    "get_artifact_registry",
31    "registry",
32]
EVTX_MAX_RECORDS_PER_FILE = 500000
class ForensicParser:
 54class ForensicParser:
 55    """Parse supported forensic artifacts from a Dissect target into CSV files.
 56
 57    Opens a disk image via Dissect's ``Target.open()``, queries available
 58    artifacts, and streams their records to CSV files in the case's parsed
 59    directory.  Implements the context manager protocol for deterministic
 60    resource cleanup.
 61
 62    Attributes:
 63        evidence_path: Path to the source evidence file.
 64        case_dir: Root directory for this forensic case.
 65        audit_logger: :class:`~app.audit.AuditLogger` for recording actions.
 66        parsed_dir: Directory where output CSV files are written.
 67        target: The open Dissect ``Target`` handle.
 68    """
 69
 70    def __init__(
 71        self,
 72        evidence_path: str | Path,
 73        case_dir: str | Path,
 74        audit_logger: Any,
 75        parsed_dir: str | Path | None = None,
 76    ) -> None:
 77        """Initialise the parser and open the Dissect target.
 78
 79        Args:
 80            evidence_path: Path to the disk image or evidence container.
 81            case_dir: Case-specific directory for output and audit data.
 82            audit_logger: Logger instance for writing audit trail entries.
 83            parsed_dir: Optional override for the CSV output directory.
 84                Defaults to ``<case_dir>/parsed/``.
 85        """
 86        self.evidence_path = Path(evidence_path)
 87        self.case_dir = Path(case_dir)
 88        self.audit_logger = audit_logger
 89        self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed"
 90        self.parsed_dir.mkdir(parents=True, exist_ok=True)
 91        self.target = Target.open(self.evidence_path)
 92        self._closed = False
 93
 94        try:
 95            self.os_type: str = str(self.target.os).strip().lower()
 96        except Exception:
 97            self.os_type = "unknown"
 98
 99    def close(self) -> None:
100        """Close the underlying Dissect target handle."""
101        if self._closed:
102            return
103
104        try:
105            close_method = getattr(self.target, "close", None)
106        except Exception:
107            close_method = None
108        if callable(close_method):
109            close_method()
110        self._closed = True
111
112    def __enter__(self) -> ForensicParser:
113        """Enter the runtime context and return the parser instance."""
114        return self
115
116    def __exit__(
117        self,
118        exc_type: type[BaseException] | None,
119        exc_val: BaseException | None,
120        exc_tb: TracebackType | None,
121    ) -> bool:
122        """Exit the runtime context, closing the Dissect target."""
123        del exc_type, exc_val, exc_tb
124        self.close()
125        return False
126
127    def get_image_metadata(self) -> dict[str, str]:
128        """Extract key system metadata from the Dissect target.
129
130        Attempts multiple attribute name variants for each field (e.g.
131        ``hostname``, ``computer_name``, ``name``) to accommodate
132        different OS profiles.
133
134        Returns:
135            Dictionary with keys ``hostname``, ``os_version``, ``domain``,
136            ``ips``, ``timezone``, and ``install_date``.
137        """
138        hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name")))
139        os_version = str(self._safe_read_target_attribute(("os_version", "version")))
140        domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup")))
141        timezone = str(self._safe_read_target_attribute(("timezone", "tz")))
142        install_date = str(self._safe_read_target_attribute(("install_date", "installdate")))
143
144        ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip"))
145        if isinstance(ips_value, (list, tuple, set)):
146            ips = ", ".join(str(value) for value in ips_value if value not in (None, ""))
147            if not ips:
148                ips = UNKNOWN_VALUE
149        else:
150            ips = str(ips_value)
151
152        return {
153            "hostname": hostname,
154            "os_version": os_version,
155            "domain": domain,
156            "ips": ips,
157            "timezone": timezone,
158            "install_date": install_date,
159        }
160
161    def get_available_artifacts(self) -> list[dict[str, Any]]:
162        """Return the artifact registry annotated with availability flags.
163
164        Detects the target OS via ``target.os`` and selects the
165        appropriate artifact registry (Windows or Linux).  Probes the
166        Dissect target for each registered artifact and sets an
167        ``available`` boolean on the returned metadata dictionaries.
168
169        Returns:
170            List of artifact metadata dicts, each augmented with ``key``
171            and ``available`` fields.
172        """
173        registry = get_artifact_registry(self.os_type)
174        available_artifacts: list[dict[str, Any]] = []
175        for artifact_key, artifact_details in registry.items():
176            function_name = str(artifact_details.get("function", artifact_key))
177            try:
178                available = bool(self.target.has_function(function_name))
179            except (PluginError, UnsupportedPluginError):
180                available = False
181
182            available_artifact = dict(artifact_details)
183            available_artifact["key"] = artifact_key
184            available_artifact["available"] = available
185            available_artifacts.append(available_artifact)
186
187        return available_artifacts
188
189    def _call_target_function(self, function_name: str) -> Any:
190        """Invoke a Dissect function on the target, including namespaced functions.
191
192        For simple names like ``"shimcache"`` it calls ``target.shimcache()``.
193        For dotted names like ``"browser.history"`` it traverses the namespace
194        chain (``target.browser.history()``) and calls the final attribute.
195        """
196        if "." not in function_name:
197            function = getattr(self.target, function_name)
198            return function() if callable(function) else function
199
200        current: Any = self.target
201        parts = function_name.split(".")
202        try:
203            for namespace in parts:
204                current = getattr(current, namespace)
205        except Exception:
206            logger.warning(
207                "Failed to resolve nested function '%s' (stopped at '%s')",
208                function_name,
209                namespace,
210                exc_info=True,
211            )
212            raise
213
214        return current() if callable(current) else current
215
216    def parse_artifact(
217        self,
218        artifact_key: str,
219        progress_callback: Callable[..., None] | None = None,
220    ) -> dict[str, Any]:
221        """Parse a single artifact and stream its records to one or more CSV files.
222
223        Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``)
224        to the audit trail.  EVTX artifacts are split by channel/provider
225        into separate CSV files.
226
227        Args:
228            artifact_key: Key from the OS-specific artifact registry identifying
229                the artifact to parse.
230            progress_callback: Optional callback invoked every 1 000 records
231                with progress information.
232
233        Returns:
234            Result dictionary with keys ``csv_path``, ``record_count``,
235            ``duration_seconds``, ``success``, and ``error``.  EVTX
236            results also include a ``csv_paths`` list.
237        """
238        registry = get_artifact_registry(self.os_type)
239        artifact = registry.get(artifact_key)
240        if artifact is None:
241            return {
242                "csv_path": "",
243                "record_count": 0,
244                "duration_seconds": 0.0,
245                "success": False,
246                "error": f"Unknown artifact key: {artifact_key}",
247            }
248
249        function_name = str(artifact.get("function", artifact_key))
250        start_time = perf_counter()
251        record_count = 0
252        csv_path = ""
253
254        self.audit_logger.log(
255            "parsing_started",
256            {
257                "artifact_key": artifact_key,
258                "artifact_name": artifact.get("name", artifact_key),
259                "function": function_name,
260            },
261        )
262
263        try:
264            records = self._call_target_function(function_name)
265            if self._is_evtx_artifact(function_name):
266                all_csv_paths, record_count = self._write_evtx_records(
267                    artifact_key=artifact_key,
268                    records=records,
269                    progress_callback=progress_callback,
270                )
271                if all_csv_paths:
272                    csv_path = str(all_csv_paths[0])
273                else:
274                    empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
275                    empty_output.touch(exist_ok=True)
276                    csv_path = str(empty_output)
277                    all_csv_paths = [empty_output]
278            else:
279                csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
280                record_count = self._write_records_to_csv(
281                    records=records,
282                    csv_output_path=csv_output,
283                    progress_callback=progress_callback,
284                    artifact_key=artifact_key,
285                )
286                csv_path = str(csv_output)
287
288            duration = perf_counter() - start_time
289            self.audit_logger.log(
290                "parsing_completed",
291                {
292                    "artifact_key": artifact_key,
293                    "artifact_name": artifact.get("name", artifact_key),
294                    "function": function_name,
295                    "record_count": record_count,
296                    "duration_seconds": round(duration, 6),
297                    "csv_path": csv_path,
298                },
299            )
300
301            result: dict[str, Any] = {
302                "csv_path": csv_path,
303                "record_count": record_count,
304                "duration_seconds": duration,
305                "success": True,
306                "error": None,
307            }
308            if self._is_evtx_artifact(function_name):
309                result["csv_paths"] = [str(p) for p in all_csv_paths]
310            return result
311        except Exception as error:
312            duration = perf_counter() - start_time
313            error_message = str(error)
314            error_traceback = traceback.format_exc()
315            self.audit_logger.log(
316                "parsing_failed",
317                {
318                    "artifact_key": artifact_key,
319                    "artifact_name": artifact.get("name", artifact_key),
320                    "function": function_name,
321                    "error": error_message,
322                    "traceback": error_traceback,
323                    "duration_seconds": round(duration, 6),
324                },
325            )
326            return {
327                "csv_path": "",
328                "record_count": record_count,
329                "duration_seconds": duration,
330                "success": False,
331                "error": error_message,
332            }
333
334    def _safe_read_target_attribute(self, attribute_names: tuple[str, ...]) -> Any:
335        """Read a target attribute by trying multiple candidate names.
336
337        Args:
338            attribute_names: Ordered tuple of attribute names to try.
339
340        Returns:
341            The first non-empty value found, or :data:`UNKNOWN_VALUE`.
342        """
343        for attribute_name in attribute_names:
344            try:
345                value = getattr(self.target, attribute_name)
346            except Exception:
347                continue
348
349            if callable(value):
350                try:
351                    value = value()
352                except Exception:
353                    continue
354
355            if value in (None, ""):
356                continue
357
358            return value
359
360        return UNKNOWN_VALUE
361
362    def _write_records_to_csv(
363        self,
364        records: Iterable[Any],
365        csv_output_path: Path,
366        progress_callback: Callable[..., None] | None,
367        artifact_key: str,
368    ) -> int:
369        """Stream Dissect records to a CSV file, handling dynamic schemas.
370
371        If the record schema expands mid-stream (new columns appear), the
372        file is rewritten at the end with the complete header row via
373        :meth:`_rewrite_csv_with_expanded_headers`.
374
375        Args:
376            records: Iterable of Dissect record objects.
377            csv_output_path: Destination CSV file path.
378            progress_callback: Optional progress callback.
379            artifact_key: Artifact key for audit/progress reporting.
380
381        Returns:
382            Total number of records written.
383        """
384        record_count = 0
385        fieldnames: list[str] = []
386        fieldnames_set: set[str] = set()
387        headers_expanded = False
388
389        with csv_output_path.open("w", newline="", encoding="utf-8") as csv_file:
390            writer: csv.DictWriter | None = None
391            for record in records:
392                record_dict = self._record_to_dict(record)
393
394                new_keys = [str(k) for k in record_dict.keys() if str(k) not in fieldnames_set]
395                if new_keys:
396                    fieldnames.extend(new_keys)
397                    fieldnames_set.update(new_keys)
398                    if writer is not None:
399                        headers_expanded = True
400                    writer = csv.DictWriter(
401                        csv_file, fieldnames=fieldnames, restval="", extrasaction="ignore",
402                    )
403                    if not headers_expanded:
404                        writer.writeheader()
405
406                row = {
407                    fn: self._stringify_csv_value(record_dict.get(fn))
408                    for fn in fieldnames
409                }
410                if writer is not None:
411                    writer.writerow(row)
412                record_count += 1
413
414                if record_count >= MAX_RECORDS_PER_ARTIFACT:
415                    self.audit_logger.log(
416                        "parsing_capped",
417                        {
418                            "artifact_key": artifact_key,
419                            "record_count": record_count,
420                            "max_records": MAX_RECORDS_PER_ARTIFACT,
421                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
422                        },
423                    )
424                    break
425
426                if progress_callback is not None and record_count % 1000 == 0:
427                    self._emit_progress(progress_callback, artifact_key, record_count)
428
429        if headers_expanded and record_count > 0:
430            self._rewrite_csv_with_expanded_headers(csv_output_path, fieldnames)
431
432        if progress_callback is not None:
433            self._emit_progress(progress_callback, artifact_key, record_count)
434
435        return record_count
436
437    def _rewrite_csv_with_expanded_headers(self, csv_path: Path, fieldnames: list[str]) -> None:
438        """Rewrite a CSV whose header is incomplete due to mid-stream schema changes.
439
440        Because fieldnames are only ever appended, row values are positionally
441        aligned: shorter rows (written before expansion) just need empty-string
442        padding for the new trailing columns.
443        """
444        temp_path = csv_path.with_suffix(".csv.tmp")
445        num_fields = len(fieldnames)
446        with csv_path.open("r", newline="", encoding="utf-8") as src, \
447             temp_path.open("w", newline="", encoding="utf-8") as dst:
448            reader = csv.reader(src)
449            csv_writer = csv.writer(dst)
450            csv_writer.writerow(fieldnames)
451            next(reader, None)  # skip original (incomplete) header
452            for row in reader:
453                if len(row) < num_fields:
454                    row.extend([""] * (num_fields - len(row)))
455                csv_writer.writerow(row)
456        temp_path.replace(csv_path)
457
458    def _write_evtx_records(
459        self,
460        artifact_key: str,
461        records: Any,
462        progress_callback: Callable[..., None] | None,
463    ) -> tuple[list[Path], int]:
464        """Stream EVTX records into per-channel CSV files with automatic splitting.
465
466        Records are grouped by their channel or provider name.  When a
467        single group exceeds :data:`EVTX_MAX_RECORDS_PER_FILE`, a new
468        part file is created.
469
470        Args:
471            artifact_key: Artifact key for filename construction.
472            records: Iterable of Dissect EVTX record objects.
473            progress_callback: Optional progress callback.
474
475        Returns:
476            Tuple of ``(csv_paths, total_record_count)``.
477        """
478        writers: dict[str, dict[str, Any]] = {}
479        csv_paths: list[Path] = []
480        record_count = 0
481
482        try:
483            for record in records:
484                if record_count >= MAX_RECORDS_PER_ARTIFACT:
485                    self.audit_logger.log(
486                        "parsing_capped",
487                        {
488                            "artifact_key": artifact_key,
489                            "record_count": record_count,
490                            "max_records": MAX_RECORDS_PER_ARTIFACT,
491                            "message": f"Artifact capped at {MAX_RECORDS_PER_ARTIFACT:,} rows",
492                        },
493                    )
494                    break
495
496                record_dict = self._record_to_dict(record)
497                group_name = self._extract_evtx_group_name(record_dict)
498
499                writer_state = writers.get(group_name)
500                if writer_state is None:
501                    writer_state = self._open_evtx_writer(artifact_key=artifact_key, group_name=group_name, part=1)
502                    writers[group_name] = writer_state
503                    csv_paths.append(writer_state["path"])
504                elif writer_state["records_in_file"] >= EVTX_MAX_RECORDS_PER_FILE:
505                    writer_state["handle"].close()
506                    next_part = int(writer_state["part"]) + 1
507                    writer_state = self._open_evtx_writer(
508                        artifact_key=artifact_key,
509                        group_name=group_name,
510                        part=next_part,
511                    )
512                    writers[group_name] = writer_state
513                    csv_paths.append(writer_state["path"])
514
515                if writer_state["fieldnames"] is None:
516                    fieldnames = [str(key) for key in record_dict.keys()]
517                    writer_state["fieldnames"] = fieldnames
518                    writer_state["fieldnames_set"] = set(fieldnames)
519                    writer_state["writer"] = csv.DictWriter(
520                        writer_state["handle"],
521                        fieldnames=fieldnames,
522                        extrasaction="ignore",
523                    )
524                    writer_state["writer"].writeheader()
525                else:
526                    new_keys = [
527                        str(k) for k in record_dict.keys()
528                        if str(k) not in writer_state["fieldnames_set"]
529                    ]
530                    if new_keys:
531                        writer_state["fieldnames"].extend(new_keys)
532                        writer_state["fieldnames_set"].update(new_keys)
533                        writer_state["headers_expanded"] = True
534                        writer_state["writer"] = csv.DictWriter(
535                            writer_state["handle"],
536                            fieldnames=writer_state["fieldnames"],
537                            extrasaction="ignore",
538                        )
539
540                fieldnames = writer_state["fieldnames"]
541                row = {
542                    fieldname: self._stringify_csv_value(record_dict.get(fieldname))
543                    for fieldname in fieldnames
544                }
545                writer_state["writer"].writerow(row)
546                writer_state["records_in_file"] += 1
547                record_count += 1
548
549                if progress_callback is not None and record_count % 1000 == 0:
550                    self._emit_progress(progress_callback, artifact_key, record_count)
551        finally:
552            for writer_state in writers.values():
553                writer_state["handle"].close()
554
555        for writer_state in writers.values():
556            if writer_state["headers_expanded"] and writer_state["records_in_file"] > 0:
557                self._rewrite_csv_with_expanded_headers(
558                    writer_state["path"], writer_state["fieldnames"],
559                )
560
561        if progress_callback is not None:
562            self._emit_progress(progress_callback, artifact_key, record_count)
563
564        return csv_paths, record_count
565
566    def _open_evtx_writer(self, artifact_key: str, group_name: str, part: int) -> dict[str, Any]:
567        """Open a new CSV file for an EVTX channel group and return writer state.
568
569        Args:
570            artifact_key: Parent artifact key for filename construction.
571            group_name: EVTX channel or provider name.
572            part: 1-based part number for multi-file splits.
573
574        Returns:
575            Dictionary containing ``path``, ``handle``, ``writer``,
576            ``fieldnames``, ``fieldnames_set``, ``headers_expanded``,
577            ``records_in_file``, and ``part``.
578        """
579        artifact_stub = self._sanitize_filename(artifact_key)
580        group_stub = self._sanitize_filename(group_name)
581        filename = f"{artifact_stub}_{group_stub}.csv" if part == 1 else f"{artifact_stub}_{group_stub}_part{part}.csv"
582        output_path = self.parsed_dir / filename
583
584        handle = output_path.open("w", newline="", encoding="utf-8")
585        return {
586            "path": output_path,
587            "handle": handle,
588            "writer": None,
589            "fieldnames": None,
590            "fieldnames_set": None,
591            "headers_expanded": False,
592            "records_in_file": 0,
593            "part": part,
594        }
595
596    def _extract_evtx_group_name(self, record_dict: dict[str, Any]) -> str:
597        """Determine the channel/provider group name for an EVTX record.
598
599        Checks multiple candidate keys (``channel``, ``Channel``,
600        ``provider``, etc.) and returns the first non-empty value.
601
602        Args:
603            record_dict: Dictionary representation of the EVTX record.
604
605        Returns:
606            Channel or provider name, or ``"unknown"`` if none found.
607        """
608        channel = self._find_record_value(
609            record_dict,
610            (
611                "channel",
612                "Channel",
613                "log_name",
614                "LogName",
615                "event_log",
616                "EventLog",
617            ),
618        )
619        provider = self._find_record_value(
620            record_dict,
621            (
622                "provider",
623                "Provider",
624                "provider_name",
625                "ProviderName",
626                "source",
627                "Source",
628            ),
629        )
630
631        if channel:
632            return channel
633        if provider:
634            return provider
635        return "unknown"
636
637    @staticmethod
638    def _record_to_dict(record: Any) -> dict[str, Any]:
639        """Convert a Dissect record to a plain dictionary.
640
641        Handles Dissect ``Record`` objects (via ``_asdict()``), plain
642        dicts, and objects with a ``__dict__``.
643
644        Args:
645            record: A Dissect record or dict-like object.
646
647        Returns:
648            A plain dictionary of field names to values.
649
650        Raises:
651            TypeError: If the record cannot be converted.
652        """
653        if hasattr(record, "_asdict"):
654            as_dict = record._asdict()
655            if isinstance(as_dict, dict):
656                return dict(as_dict)
657
658        if isinstance(record, dict):
659            return dict(record)
660
661        try:
662            return dict(vars(record))
663        except TypeError as exc:
664            raise TypeError("Artifact record cannot be converted to a dictionary.") from exc
665
666    @staticmethod
667    def _stringify_csv_value(value: Any) -> str:
668        """Convert a record field value to a CSV-safe string.
669
670        Handles ``datetime``, ``bytes``, ``None``, and other types that
671        Dissect records may yield.
672
673        Args:
674            value: The raw field value from a Dissect record.
675
676        Returns:
677            String representation suitable for CSV output.
678        """
679        if value is None:
680            return ""
681        if isinstance(value, (datetime, date, time)):
682            return value.isoformat()
683        if isinstance(value, (bytes, bytearray, memoryview)):
684            raw = bytes(value)
685            if len(raw) > 512:
686                return raw[:512].hex() + "..."
687            return raw.hex()
688        return str(value)
689
690    @staticmethod
691    def _find_record_value(record_dict: dict[str, Any], candidate_keys: tuple[str, ...]) -> str:
692        """Return the first non-empty value from *candidate_keys* in *record_dict*.
693
694        Args:
695            record_dict: Dictionary to search.
696            candidate_keys: Ordered tuple of keys to try.
697
698        Returns:
699            The first non-empty string value, or ``""`` if none found.
700        """
701        for key in candidate_keys:
702            if key in record_dict and record_dict[key] not in (None, ""):
703                return str(record_dict[key])
704        return ""
705
706    @staticmethod
707    def _sanitize_filename(value: str) -> str:
708        """Replace non-alphanumeric characters with underscores for safe filenames.
709
710        Args:
711            value: Raw string to sanitise.
712
713        Returns:
714            Filesystem-safe string, or ``"artifact"`` if empty after cleaning.
715        """
716        cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")
717        return cleaned or "artifact"
718
719    @staticmethod
720    def _is_evtx_artifact(function_name: str) -> bool:
721        """Return *True* if *function_name* indicates an EVTX artifact."""
722        return function_name == "evtx" or function_name.endswith(".evtx")
723
724    @staticmethod
725    def _emit_progress(
726        progress_callback: Callable[..., None],
727        artifact_key: str,
728        record_count: int,
729    ) -> None:
730        """Invoke the progress callback, tolerating varying signatures.
731
732        Tries ``callback(dict)``, then ``callback(key, count)``, then
733        ``callback(count)`` to accommodate different caller conventions.
734
735        Args:
736            progress_callback: Callable to invoke.
737            artifact_key: Current artifact being parsed.
738            record_count: Number of records processed so far.
739        """
740        payload = {"artifact_key": artifact_key, "record_count": record_count}
741        try:
742            progress_callback(payload)
743            return
744        except TypeError:
745            pass
746
747        try:
748            progress_callback(artifact_key, record_count)  # type: ignore[misc]
749            return
750        except TypeError:
751            pass
752
753        try:
754            progress_callback(record_count)  # type: ignore[misc]
755        except Exception:
756            return

Parse supported forensic artifacts from a Dissect target into CSV files.

Opens a disk image via Dissect's Target.open(), queries available artifacts, and streams their records to CSV files in the case's parsed directory. Implements the context manager protocol for deterministic resource cleanup.

Attributes:
  • evidence_path: Path to the source evidence file.
  • case_dir: Root directory for this forensic case.
  • audit_logger: ~app.audit.AuditLogger for recording actions.
  • parsed_dir: Directory where output CSV files are written.
  • target: The open Dissect Target handle.
ForensicParser( evidence_path: str | pathlib.Path, case_dir: str | pathlib.Path, audit_logger: Any, parsed_dir: str | pathlib.Path | None = None)
70    def __init__(
71        self,
72        evidence_path: str | Path,
73        case_dir: str | Path,
74        audit_logger: Any,
75        parsed_dir: str | Path | None = None,
76    ) -> None:
77        """Initialise the parser and open the Dissect target.
78
79        Args:
80            evidence_path: Path to the disk image or evidence container.
81            case_dir: Case-specific directory for output and audit data.
82            audit_logger: Logger instance for writing audit trail entries.
83            parsed_dir: Optional override for the CSV output directory.
84                Defaults to ``<case_dir>/parsed/``.
85        """
86        self.evidence_path = Path(evidence_path)
87        self.case_dir = Path(case_dir)
88        self.audit_logger = audit_logger
89        self.parsed_dir = Path(parsed_dir) if parsed_dir is not None else self.case_dir / "parsed"
90        self.parsed_dir.mkdir(parents=True, exist_ok=True)
91        self.target = Target.open(self.evidence_path)
92        self._closed = False
93
94        try:
95            self.os_type: str = str(self.target.os).strip().lower()
96        except Exception:
97            self.os_type = "unknown"

Initialise the parser and open the Dissect target.

Arguments:
  • evidence_path: Path to the disk image or evidence container.
  • case_dir: Case-specific directory for output and audit data.
  • audit_logger: Logger instance for writing audit trail entries.
  • parsed_dir: Optional override for the CSV output directory. Defaults to <case_dir>/parsed/.
evidence_path
case_dir
audit_logger
parsed_dir
target
def close(self) -> None:
 99    def close(self) -> None:
100        """Close the underlying Dissect target handle."""
101        if self._closed:
102            return
103
104        try:
105            close_method = getattr(self.target, "close", None)
106        except Exception:
107            close_method = None
108        if callable(close_method):
109            close_method()
110        self._closed = True

Close the underlying Dissect target handle.

def get_image_metadata(self) -> dict[str, str]:
127    def get_image_metadata(self) -> dict[str, str]:
128        """Extract key system metadata from the Dissect target.
129
130        Attempts multiple attribute name variants for each field (e.g.
131        ``hostname``, ``computer_name``, ``name``) to accommodate
132        different OS profiles.
133
134        Returns:
135            Dictionary with keys ``hostname``, ``os_version``, ``domain``,
136            ``ips``, ``timezone``, and ``install_date``.
137        """
138        hostname = str(self._safe_read_target_attribute(("hostname", "computer_name", "name")))
139        os_version = str(self._safe_read_target_attribute(("os_version", "version")))
140        domain = str(self._safe_read_target_attribute(("domain", "dns_domain", "workgroup")))
141        timezone = str(self._safe_read_target_attribute(("timezone", "tz")))
142        install_date = str(self._safe_read_target_attribute(("install_date", "installdate")))
143
144        ips_value = self._safe_read_target_attribute(("ips", "ip_addresses", "ip"))
145        if isinstance(ips_value, (list, tuple, set)):
146            ips = ", ".join(str(value) for value in ips_value if value not in (None, ""))
147            if not ips:
148                ips = UNKNOWN_VALUE
149        else:
150            ips = str(ips_value)
151
152        return {
153            "hostname": hostname,
154            "os_version": os_version,
155            "domain": domain,
156            "ips": ips,
157            "timezone": timezone,
158            "install_date": install_date,
159        }

Extract key system metadata from the Dissect target.

Attempts multiple attribute name variants for each field (e.g. hostname, computer_name, name) to accommodate different OS profiles.

Returns:

Dictionary with keys hostname, os_version, domain, ips, timezone, and install_date.

def get_available_artifacts(self) -> list[dict[str, typing.Any]]:
161    def get_available_artifacts(self) -> list[dict[str, Any]]:
162        """Return the artifact registry annotated with availability flags.
163
164        Detects the target OS via ``target.os`` and selects the
165        appropriate artifact registry (Windows or Linux).  Probes the
166        Dissect target for each registered artifact and sets an
167        ``available`` boolean on the returned metadata dictionaries.
168
169        Returns:
170            List of artifact metadata dicts, each augmented with ``key``
171            and ``available`` fields.
172        """
173        registry = get_artifact_registry(self.os_type)
174        available_artifacts: list[dict[str, Any]] = []
175        for artifact_key, artifact_details in registry.items():
176            function_name = str(artifact_details.get("function", artifact_key))
177            try:
178                available = bool(self.target.has_function(function_name))
179            except (PluginError, UnsupportedPluginError):
180                available = False
181
182            available_artifact = dict(artifact_details)
183            available_artifact["key"] = artifact_key
184            available_artifact["available"] = available
185            available_artifacts.append(available_artifact)
186
187        return available_artifacts

Return the artifact registry annotated with availability flags.

Detects the target OS via target.os and selects the appropriate artifact registry (Windows or Linux). Probes the Dissect target for each registered artifact and sets an available boolean on the returned metadata dictionaries.

Returns:

List of artifact metadata dicts, each augmented with key and available fields.

def parse_artifact( self, artifact_key: str, progress_callback: Optional[Callable[..., NoneType]] = None) -> dict[str, typing.Any]:
216    def parse_artifact(
217        self,
218        artifact_key: str,
219        progress_callback: Callable[..., None] | None = None,
220    ) -> dict[str, Any]:
221        """Parse a single artifact and stream its records to one or more CSV files.
222
223        Logs ``parsing_started``, ``parsing_completed`` (or ``parsing_failed``)
224        to the audit trail.  EVTX artifacts are split by channel/provider
225        into separate CSV files.
226
227        Args:
228            artifact_key: Key from the OS-specific artifact registry identifying
229                the artifact to parse.
230            progress_callback: Optional callback invoked every 1 000 records
231                with progress information.
232
233        Returns:
234            Result dictionary with keys ``csv_path``, ``record_count``,
235            ``duration_seconds``, ``success``, and ``error``.  EVTX
236            results also include a ``csv_paths`` list.
237        """
238        registry = get_artifact_registry(self.os_type)
239        artifact = registry.get(artifact_key)
240        if artifact is None:
241            return {
242                "csv_path": "",
243                "record_count": 0,
244                "duration_seconds": 0.0,
245                "success": False,
246                "error": f"Unknown artifact key: {artifact_key}",
247            }
248
249        function_name = str(artifact.get("function", artifact_key))
250        start_time = perf_counter()
251        record_count = 0
252        csv_path = ""
253
254        self.audit_logger.log(
255            "parsing_started",
256            {
257                "artifact_key": artifact_key,
258                "artifact_name": artifact.get("name", artifact_key),
259                "function": function_name,
260            },
261        )
262
263        try:
264            records = self._call_target_function(function_name)
265            if self._is_evtx_artifact(function_name):
266                all_csv_paths, record_count = self._write_evtx_records(
267                    artifact_key=artifact_key,
268                    records=records,
269                    progress_callback=progress_callback,
270                )
271                if all_csv_paths:
272                    csv_path = str(all_csv_paths[0])
273                else:
274                    empty_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
275                    empty_output.touch(exist_ok=True)
276                    csv_path = str(empty_output)
277                    all_csv_paths = [empty_output]
278            else:
279                csv_output = self.parsed_dir / f"{self._sanitize_filename(artifact_key)}.csv"
280                record_count = self._write_records_to_csv(
281                    records=records,
282                    csv_output_path=csv_output,
283                    progress_callback=progress_callback,
284                    artifact_key=artifact_key,
285                )
286                csv_path = str(csv_output)
287
288            duration = perf_counter() - start_time
289            self.audit_logger.log(
290                "parsing_completed",
291                {
292                    "artifact_key": artifact_key,
293                    "artifact_name": artifact.get("name", artifact_key),
294                    "function": function_name,
295                    "record_count": record_count,
296                    "duration_seconds": round(duration, 6),
297                    "csv_path": csv_path,
298                },
299            )
300
301            result: dict[str, Any] = {
302                "csv_path": csv_path,
303                "record_count": record_count,
304                "duration_seconds": duration,
305                "success": True,
306                "error": None,
307            }
308            if self._is_evtx_artifact(function_name):
309                result["csv_paths"] = [str(p) for p in all_csv_paths]
310            return result
311        except Exception as error:
312            duration = perf_counter() - start_time
313            error_message = str(error)
314            error_traceback = traceback.format_exc()
315            self.audit_logger.log(
316                "parsing_failed",
317                {
318                    "artifact_key": artifact_key,
319                    "artifact_name": artifact.get("name", artifact_key),
320                    "function": function_name,
321                    "error": error_message,
322                    "traceback": error_traceback,
323                    "duration_seconds": round(duration, 6),
324                },
325            )
326            return {
327                "csv_path": "",
328                "record_count": record_count,
329                "duration_seconds": duration,
330                "success": False,
331                "error": error_message,
332            }

Parse a single artifact and stream its records to one or more CSV files.

Logs parsing_started, parsing_completed (or parsing_failed) to the audit trail. EVTX artifacts are split by channel/provider into separate CSV files.

Arguments:
  • artifact_key: Key from the OS-specific artifact registry identifying the artifact to parse.
  • progress_callback: Optional callback invoked every 1 000 records with progress information.
Returns:

Result dictionary with keys csv_path, record_count, duration_seconds, success, and error. EVTX results also include a csv_paths list.

LINUX_ARTIFACT_REGISTRY = {'cronjobs': {'name': 'Cron Jobs', 'category': 'Persistence', 'function': 'cronjobs', 'description': 'Scheduled tasks defined in user crontabs and system-wide /etc/cron.* directories. Cron is a common persistence and periodic-execution mechanism on Linux systems.', 'analysis_hint': 'Flag cron entries that download or execute from /tmp, /dev/shm, or user-writable paths. Look for base64-encoded commands, reverse shells, and entries added near the incident window.', 'artifact_guidance': 'Scheduled tasks — primary persistence mechanism on Linux.\n- Suspicious: entries running scripts from /tmp, /dev/shm, or user-writable directories; entries executing curl/wget/python/bash with URLs or encoded payloads; entries owned by unexpected users; unusual schedules (every minute, @reboot).\n- Locations: /var/spool/cron/crontabs/ (per-user), /etc/crontab, /etc/cron.d/, /etc/cron.{hourly,daily,weekly,monthly}/.\n- @reboot entries are high-priority — they survive reboots without appearing in regular cron schedules.\n- Cron execution should appear in syslog (CRON entries). Missing log entries for known cron jobs may indicate log tampering.'}, 'services': {'name': 'Systemd Services', 'category': 'Persistence', 'function': 'services', 'description': "Systemd unit files describing services, their startup configuration, and current state. Dissect's services function is OS-aware and returns Linux systemd units on Linux targets.", 'analysis_hint': 'Identify services with ExecStart pointing to unusual paths (/tmp, /var/tmp, user home dirs). Flag recently created or modified unit files, units set to restart on failure, and masked units.', 'artifact_guidance': "Systemd service units and init scripts — key persistence and privilege artifact on Linux.\n- Suspicious: unit files in /etc/systemd/system/ referencing unusual binaries, ExecStart pointing to /tmp, /dev/shm, or hidden directories, services with Restart=always that aren't standard, recently created unit files, services running as root with unusual ExecStart paths, Type=oneshot services running scripts.\n- Check for: masked legitimate security services (apparmor, auditd, fail2ban), ExecStartPre/ExecStartPost running additional commands, drop-in overrides in /etc/systemd/system/*.d/ directories.\n- Cross-check: service creation should correlate with systemctl commands in bash_history and file creation timestamps in filesystem artifacts.\n- Expected: standard distro services are common — focus on what doesn't fit the installed package set."}, 'bash_history': {'name': 'Bash History', 'category': 'Shell History', 'function': 'bash_history', 'description': 'Per-user .bash_history files recording interactive shell commands. Highest-value artifact on Linux for understanding attacker activity.', 'analysis_hint': 'Hunt for curl/wget downloads, base64 encoding/decoding, reverse shells (bash -i, /dev/tcp), credential access (cat /etc/shadow), reconnaissance (id, whoami, uname -a), persistence installation (crontab -e, systemctl enable), and log tampering (truncate, shred, rm /var/log). Sparse or empty history for active accounts may indicate clearing (history -c, HISTFILE=/dev/null).', 'artifact_guidance': 'Direct record of commands typed by users — highest-value artifact on Linux systems.\n- Suspicious: curl/wget downloads, base64 encoding/decoding, reverse shells (bash -i, /dev/tcp), compiler invocations (gcc, make) for kernel exploits, credential access (cat /etc/shadow, mimipenguin), recon sequences (id, whoami, uname -a, cat /etc/passwd, ss -tlnp, ip a), persistence installation (crontab -e, systemctl enable), log tampering (truncate, shred, rm on /var/log).\n- No timestamps. Sequence matters but timing must come from other artifacts.\n- Sparse or empty history for active accounts may indicate clearing (history -c, HISTFILE=/dev/null, unset HISTFILE).\n- Look for multi-stage patterns: recon → exploitation → persistence. Commands piped to /dev/null or with stderr redirection may indicate output suppression.'}, 'zsh_history': {'name': 'Zsh History', 'category': 'Shell History', 'function': 'zsh_history', 'description': 'Per-user .zsh_history files recording Zsh shell commands with optional timestamps. Zsh history may include timing data not present in bash history.', 'analysis_hint': 'Apply the same suspicious-command patterns as bash_history. Zsh extended history format includes timestamps — use them for timeline correlation.', 'artifact_guidance': 'Zsh shell history with timestamps — higher value than bash_history for timeline construction.\n- Format: `: epoch:duration;command` — use the epoch timestamp for direct correlation with other timed artifacts (wtmp, syslog, journalctl).\n- Same threat indicators as bash_history: curl/wget downloads, base64, reverse shells, credential access, recon commands, persistence installation, log tampering.\n- Zsh extended history may record multi-line commands that bash_history splits or truncates.\n- Sparse or empty history for active accounts may indicate clearing or HISTFILE manipulation.'}, 'fish_history': {'name': 'Fish History', 'category': 'Shell History', 'function': 'fish_history', 'description': 'Per-user Fish shell history stored in YAML-like format with timestamps. Less common but may capture activity missed by bash/zsh.', 'analysis_hint': 'Apply the same suspicious-command patterns as bash_history. Fish history includes timestamps per command — correlate with login records.', 'artifact_guidance': 'Fish shell history with timestamps. Same threat indicators as bash_history.\n- Fish stores history with `- cmd:` and `when:` fields — timestamps are Unix epochs, enabling direct timeline correlation.\n- Fish is uncommon on servers. Its presence on a production system may itself be notable — check if it was recently installed.\n- Stored per-user in ~/.local/share/fish/fish_history.'}, 'python_history': {'name': 'Python History', 'category': 'Shell History', 'function': 'python_history', 'description': 'Python REPL history from interactive interpreter sessions. May reveal attacker use of Python for scripting, exploitation, or data manipulation.', 'analysis_hint': 'Look for import of socket/subprocess/os modules, file read/write operations on sensitive paths, and network connection attempts. Python is commonly used for exploit development and post-exploitation tooling.', 'artifact_guidance': 'Python interactive REPL history — records commands typed in the Python interpreter.\n- Suspicious: import os/subprocess/socket/pty, eval/exec calls, network connections (socket.connect, urllib, requests), file operations on sensitive paths (/etc/shadow, /root/.ssh), os.system or subprocess.call with shell commands, pty.spawn for shell upgrades.\n- Often used for interactive exploitation after initial access — attacker drops into Python to avoid bash history or leverage Python capabilities.\n- Stored in ~/.python_history by default. No timestamps.\n- Cross-check: Python interpreter execution should appear in bash_history (python/python3 commands) or process logs.'}, 'wtmp': {'name': 'Login Records (wtmp)', 'category': 'Authentication', 'function': 'wtmp', 'description': 'Successful login/logout records including user, terminal, source IP, and timestamps. Linux equivalent of Windows logon events.', 'analysis_hint': 'Flag logins from unexpected IPs, logins at unusual hours, root logins via SSH, and logins from accounts that should not be interactive. Cross-check with auth logs and shell history. wtmp can be tampered with — missing records or time gaps may indicate editing.', 'artifact_guidance': "Login/logout records — Linux equivalent of Windows logon events.\n- Shows: user, terminal (tty/pts), source IP for remote sessions, login/logout timestamps.\n- Suspicious: logins from unexpected IPs, logins at unusual hours, root logins via SSH, logins from accounts that shouldn't be interactive (www-data, nobody, service accounts), logins immediately after account creation.\n- Anti-forensic: wtmp is a binary file that can be tampered with (utmpdump). Missing records or time gaps may indicate editing. Compare with syslog/journalctl auth entries for consistency.\n- Cross-check: correlate with auth logs, bash_history, and btmp to build user activity timeline. A successful login here preceded by many failures in btmp indicates compromised credentials."}, 'btmp': {'name': 'Failed Logins (btmp)', 'category': 'Authentication', 'function': 'btmp', 'description': 'Failed login attempt records including user, source IP, and timestamps. High volumes indicate brute-force attacks or credential stuffing.', 'analysis_hint': 'Look for high-frequency failures from single IPs (brute force), failures for non-existent accounts (enumeration), and failures immediately before a successful wtmp login (successful brute force). Correlate source IPs with successful logins.', 'artifact_guidance': 'Failed login attempts — Linux equivalent of Windows Event ID 4625.\n- Patterns: brute force (high volume against one account), password spraying (low volume across many accounts), attempts against disabled or system accounts.\n- Source IPs are key IOCs. A successful login (in wtmp) after many failures here indicates compromised credentials.\n- High volume is normal for internet-facing SSH — focus on attempts against real local accounts rather than dictionary usernames.'}, 'lastlog': {'name': 'Last Login Records', 'category': 'Authentication', 'function': 'lastlog', 'description': 'Last login timestamp and source for each user account on the system. Provides a quick overview of account usage recency.', 'analysis_hint': 'Identify accounts with recent logins that should be dormant or disabled. Compare with wtmp for consistency — discrepancies may indicate log tampering.', 'artifact_guidance': "Last login timestamp and source for each user account — quick-reference artifact.\n- Quick checks: accounts with recent logins that shouldn't be active (service accounts, disabled users), system accounts (UID < 1000) with login records, accounts that have never logged in but were recently created.\n- Only stores the most recent login per user — no history. Cross-check against wtmp for full login records.\n- Discrepancies between lastlog and wtmp may indicate tampering with one or both.\n- Small artifact: review all entries."}, 'users': {'name': 'User Accounts', 'category': 'Authentication', 'function': 'users', 'description': 'User account information parsed from /etc/passwd and /etc/shadow, including UIDs, shells, home directories, and password metadata.', 'analysis_hint': 'Flag accounts with UID 0 (root-equivalent), accounts with login shells that should have /sbin/nologin, recently created accounts (check shadow dates), and accounts with empty password fields.', 'artifact_guidance': "User accounts from /etc/passwd and /etc/shadow — Linux equivalent of the SAM artifact.\n- Suspicious: UID 0 accounts besides root, accounts with no password or weak hash type (DES, MD5 instead of SHA-512), recently created accounts (check shadow change dates), accounts with interactive shells (/bin/bash, /bin/sh) that shouldn't have them (www-data, nobody, service accounts), home directories in unusual locations (/tmp, /dev/shm).\n- Key fields: username, UID, GID, shell, home directory, password hash type, last password change, account expiration.\n- Cross-check: new accounts should correlate with useradd commands in bash_history and auth log entries.\n- Small artifact: review all entries. Focus on accounts that don't match the expected system profile."}, 'groups': {'name': 'Groups', 'category': 'Authentication', 'function': 'groups', 'description': 'Group definitions from /etc/group including group members. Shows privilege group membership such as sudo, wheel, and docker.', 'analysis_hint': 'Check membership of privileged groups (sudo, wheel, docker, adm, root). Flag unexpected users in administrative groups.', 'artifact_guidance': 'Group memberships from /etc/group — shows privilege assignments.\n- Suspicious: unexpected members of sudo, wheel, adm, docker, lxd, disk, or shadow groups.\n- Docker and lxd group membership effectively grants root access — flag non-admin users in these groups.\n- The adm group grants log file access — membership could enable log review or tampering.\n- Small artifact: review all privileged group memberships completely.'}, 'sudoers': {'name': 'Sudoers Config', 'category': 'Authentication', 'function': 'sudoers', 'description': 'Sudo configuration from /etc/sudoers and /etc/sudoers.d/, defining which users can run which commands with elevated privileges.', 'analysis_hint': 'Flag NOPASSWD entries, overly broad command allowances (ALL), and rules for unexpected users. Attackers often modify sudoers for passwordless privilege escalation.', 'artifact_guidance': 'Sudo configuration defining privilege escalation rules.\n- Suspicious: NOPASSWD entries (sudo without password), overly broad allowances (ALL=(ALL) ALL for non-admin users), entries for unexpected users or groups, entries allowing specific dangerous commands (bash, su, cp, chmod, chown), entries with !authenticate.\n- Check both /etc/sudoers and /etc/sudoers.d/ drop-in files.\n- Recently modified sudoers files are high-priority — correlate modification timestamps with other activity.\n- Attackers commonly add NOPASSWD entries for persistence or privilege escalation.\n- Cross-check: sudoers modifications should correlate with visudo usage in bash_history or file modification timestamps.'}, 'network.interfaces': {'name': 'Network Interfaces', 'category': 'Network', 'function': 'network.interfaces', 'description': "Network interface configuration including IP addresses, subnets, and interface names. Provides context for understanding the system's network position.", 'analysis_hint': 'Document all configured interfaces and IPs for correlation with login source IPs and network artifacts from other systems. Flag unexpected interfaces (tunnels, bridges).', 'artifact_guidance': "Network interface configuration — context artifact for understanding the system's network position.\n- Shows: interface names, IP addresses, subnet masks, gateways, DNS servers, VLAN configurations.\n- Useful for: determining reachable networks, identifying multi-homed systems, understanding blast radius of a compromise.\n- Suspicious: unexpected interfaces (tun/tap for VPN tunnels, docker/veth for containers that shouldn't exist), promiscuous mode enabled (potential sniffing), IP addresses outside expected ranges.\n- Primarily a context artifact — use it to inform analysis of other artifacts rather than as a standalone finding source."}, 'syslog': {'name': 'Syslog', 'category': 'Logs', 'function': 'syslog', 'description': 'System log entries from /var/log/syslog, /var/log/messages, and /var/log/auth.log. Central log source for authentication, service, and kernel events on Linux.', 'analysis_hint': 'Filter for sshd, sudo, su, and PAM messages to reconstruct authentication activity. Look for service start/stop events, kernel warnings, and log gaps that may indicate tampering or system downtime.', 'artifact_guidance': 'Primary system log — broadest coverage of system events on Linux.\n- High-signal entries: authentication events (sshd, sudo, su, login), service start/stop, kernel messages (especially module loading via modprobe/insmod), cron execution, package manager activity, OOM kills.\n- Suspicious: timestamp gaps (log deletion/rotation tampering), sshd accepted/failed password entries, sudo command executions, unknown or unexpected service names, kernel module loading for non-standard modules.\n- Volume warning: syslog can have millions of lines. Focus on the incident time window and high-signal facility/program combinations.\n- Cross-check: syslog auth entries should be consistent with wtmp/btmp records. Discrepancies indicate tampering with one or both.'}, 'journalctl': {'name': 'Systemd Journal', 'category': 'Logs', 'function': 'journalctl', 'description': 'Structured journal entries from systemd-journald, covering services, kernel, and user-session events with rich metadata.', 'analysis_hint': 'Use unit and priority fields to filter for security-relevant events. Journal entries complement syslog and may contain structured fields not present in plain-text logs.', 'artifact_guidance': "Systemd journal — richer than syslog with structured metadata (unit names, PIDs, priority levels).\n- May capture service stdout/stderr that syslog misses. Same threat indicators: authentication events, service changes, kernel messages, cron execution.\n- Suspicious: journal file truncation or missing time ranges, failed service starts for security tools, kernel module loading, coredumps for exploited processes.\n- Journal persistence depends on config — volatile journals (/run/log/journal/) are lost on reboot. Persistent journals live in /var/log/journal/.\n- If journal has entries that syslog doesn't (or vice versa), one was likely tampered with."}, 'packagemanager': {'name': 'Package History', 'category': 'Logs', 'function': 'packagemanager', 'description': 'Package installation, removal, and update history from apt, yum, dnf, or other package managers. Shows software changes over time.', 'analysis_hint': 'Flag recently installed packages, especially compilers (gcc, make), network tools (nmap, netcat, socat), and packages installed outside normal maintenance windows. Package removal near incident time may indicate cleanup.', 'artifact_guidance': 'Package installation and removal history — shows software changes over time.\n- Suspicious: recently installed offensive tools (nmap, netcat/ncat, tcpdump, wireshark, gcc, make, gdb, strace), removed security tools (auditd, fail2ban, rkhunter, clamav), packages from non-standard repositories or PPAs, installations correlating with incident timing.\n- Compiler toolchain installation (build-essential, gcc, make) on a production server is notable — may indicate kernel exploit compilation.\n- Sources vary by distro: dpkg.log and apt history.log (Debian/Ubuntu), yum.log or dnf.log (RHEL/Fedora), pacman.log (Arch), zypper.log (SUSE).\n- Cross-check: package installations should correlate with apt/yum/dnf commands in bash_history.'}, 'ssh.authorized_keys': {'name': 'SSH Authorized Keys', 'category': 'SSH', 'function': 'ssh.authorized_keys', 'description': 'Per-user authorized_keys files listing public keys allowed for SSH authentication. A primary persistence mechanism for SSH-based access.', 'analysis_hint': "Flag keys added recently or for unexpected accounts. Compare key fingerprints across systems to identify lateral movement. Look for command-restricted keys and keys with 'from=' options limiting source IPs.", 'artifact_guidance': 'SSH public keys granting passwordless access — critical persistence mechanism.\n- Suspicious: keys in unexpected user accounts (especially root, service accounts), recently added keys (correlate with file timestamps), keys with forced command restrictions that look like backdoors (command="..." prefix), multiple keys for single accounts that don\'t match known administrators, unusual comment fields.\n- Check: ~/.ssh/authorized_keys and ~/.ssh/authorized_keys2 for all users, plus /etc/ssh/sshd_config for AuthorizedKeysFile overrides pointing to non-standard locations.\n- An attacker adding their key is one of the most common Linux persistence techniques — always review thoroughly.\n- Cross-check: key additions should correlate with SSH/SCP activity in auth logs, and echo/cat commands in bash_history writing to authorized_keys files.'}, 'ssh.known_hosts': {'name': 'SSH Known Hosts', 'category': 'SSH', 'function': 'ssh.known_hosts', 'description': 'Per-user known_hosts files recording SSH server fingerprints the user has connected to. Reveals outbound SSH connections and lateral movement targets.', 'analysis_hint': 'Identify internal hosts the user SSHed to (lateral movement) and external hosts (potential C2 or data exfiltration). Hashed known_hosts entries obscure hostnames but IP-based entries may still be readable.', 'artifact_guidance': "SSH host keys for systems this machine has connected to — shows lateral movement paths outward.\n- Suspicious: internal hosts that shouldn't be SSH targets from this system, external IPs or hostnames, large number of known hosts on a system that shouldn't be initiating SSH (web servers, database servers), recently added entries.\n- Hashed known_hosts (HashKnownHosts=yes) obscures hostnames — entry count and file modification time are still useful.\n- Check both per-user (~/.ssh/known_hosts) and system-wide (/etc/ssh/ssh_known_hosts).\n- Cross-check: SSH connections should correlate with ssh commands in bash_history and auth logs on destination systems."}}
MAX_RECORDS_PER_ARTIFACT = 1000000
UNKNOWN_VALUE = 'Unknown'
class UnsupportedPluginError(dissect.target.exceptions.PluginError):
69class UnsupportedPluginError(PluginError):
70    """The requested plugin is not supported by the target."""
71
72    def root_cause_str(self) -> str:
73        """Often with this type of Error, the root cause is more descriptive for the user."""
74        return str(self.__cause__.args[0])

The requested plugin is not supported by the target.

def root_cause_str(self) -> str:
72    def root_cause_str(self) -> str:
73        """Often with this type of Error, the root cause is more descriptive for the user."""
74        return str(self.__cause__.args[0])

Often with this type of Error, the root cause is more descriptive for the user.

WINDOWS_ARTIFACT_REGISTRY = {'runkeys': {'name': 'Run/RunOnce Keys', 'category': 'Persistence', 'function': 'runkeys', 'description': 'Registry autorun entries that launch programs at user logon or system boot. These keys commonly store malware persistence command lines and loader stubs.', 'analysis_hint': 'Prioritize entries launching from user-writable paths like AppData, Temp, or Public. Flag encoded PowerShell, LOLBins, and commands added near the suspected compromise window.', 'artifact_guidance': 'Startup persistence. Every entry is worth reviewing — these are typically few.\n- Separate HKLM (machine-wide) from HKCU (user-specific) scope.\n- Suspicious: commands from user-writable paths (AppData, Temp, Public, ProgramData), script hosts (powershell, wscript, mshta, cmd /c), encoded/obfuscated arguments, LOLBins (rundll32, regsvr32, mshta).\n- Expected: enterprise software updaters (Google, Adobe, Teams, OneDrive). If in doubt, flag it — false positives are cheap here.'}, 'tasks': {'name': 'Scheduled Tasks', 'category': 'Persistence', 'function': 'tasks', 'description': 'Windows Task Scheduler definitions including triggers, actions, principals, and timing. Adversaries frequently use tasks for periodic execution and delayed payload launch.', 'analysis_hint': 'Look for newly created or modified tasks with hidden settings, unusual run accounts, or actions pointing to scripts/binaries outside Program Files and Windows directories.', 'artifact_guidance': "Scheduled execution and persistence.\n- Suspicious: non-Microsoft authors, hidden tasks, tasks running script hosts or encoded commands, binaries outside trusted system paths, tasks created/modified near the incident window.\n- High-risk triggers: boot/logon triggers with no clear business purpose, high-frequency schedules.\n- Cross-check: task creation should correlate with EVTX and execution artifacts.\n- Expected: Windows maintenance tasks (defrag, diagnostics, updates) are normal — focus on what's new or unusual."}, 'services': {'name': 'Services', 'category': 'Persistence', 'function': 'services', 'description': 'Windows service configuration and startup metadata, including image paths and service accounts. Malicious services can provide boot persistence and privilege escalation.', 'analysis_hint': 'Investigate auto-start services with suspicious image paths, weakly named binaries, or unexpected accounts. Correlate install/start times with process creation and event log artifacts.', 'artifact_guidance': "Boot/logon persistence and privilege context.\n- Focus on auto-start and delayed-auto-start services.\n- Suspicious: image paths under user-writable directories, service names mimicking legitimate components but pointing to odd binaries, services running as LocalSystem with unusual paths, quoted-path vulnerabilities.\n- Cross-check: newly installed services should correlate with EVTX Event ID 7045.\n- Expected: vendor software services are common and usually benign — look for what doesn't fit the pattern."}, 'cim': {'name': 'WMI Persistence', 'category': 'Persistence', 'function': 'cim', 'description': 'WMI repository data such as event filters, consumers, and bindings used for event-driven execution. This is a common stealth persistence mechanism in fileless intrusions.', 'analysis_hint': 'Focus on suspicious __EventFilter, CommandLineEventConsumer, and ActiveScriptEventConsumer objects. Flag PowerShell, cmd, or script host commands triggered by system/user logon events.', 'artifact_guidance': "WMI event subscription persistence — a stealthy and often overlooked persistence mechanism.\n- Focus on the three components: EventFilter (trigger), EventConsumer (action), and FilterToConsumerBinding (link between them).\n- Suspicious: CommandLineEventConsumer or ActiveScriptEventConsumer invoking powershell, cmd, wscript, mshta, or referencing external script files. Any consumer executing from user-writable paths.\n- High-risk triggers: logon, startup, or timer-based EventFilters that re-execute payloads automatically.\n- This artifact is rarely used legitimately outside enterprise management tools (SCCM, monitoring agents). Any unexpected subscription is worth flagging.\n- Cross-check: execution of the consumer's target command should appear in EVTX process creation, prefetch, or shimcache."}, 'shimcache': {'name': 'Shimcache', 'category': 'Execution', 'function': 'shimcache', 'description': 'Application Compatibility Cache entries containing executable paths and file metadata observed by the OS. Entries provide execution context but do not independently prove a successful run.', 'analysis_hint': 'Use Shimcache to surface suspicious paths, then confirm execution with Prefetch, Amcache, or event logs. Pay attention to unsigned tools, archive extraction paths, and deleted binaries.', 'artifact_guidance': 'Evidence of program presence on disk, not definitive proof of execution.\n- Suspicious: executables in user profiles, temp directories, recycle bin, removable media, or archive extraction paths. Renamed system utilities. Known attacker tools (psexec, mimikatz, procdump, etc.).\n- Important: shimcache alone does not confirm execution. Flag items that need corroboration from Prefetch, Amcache, or EVTX.\n- Use timestamps and entry order to build a likely sequence, but label the uncertainty.\n- Expected: common enterprise software in standard paths is noise — skip it unless relevant to the investigation context.'}, 'amcache': {'name': 'Amcache', 'category': 'Execution', 'function': 'amcache', 'description': 'Application and file inventory from Amcache.hve, often including path, hash, compile info, and first-seen data. Useful for identifying executed or installed binaries and their provenance.', 'analysis_hint': 'Prioritize recently introduced executables with unknown publishers or rare install locations. Compare hashes and file names against threat intelligence and other execution artifacts.', 'artifact_guidance': "Program inventory with execution relevance and SHA-1 hashes.\n- Suspicious: newly observed executables near the incident window, uncommon install paths, unknown publishers, product name mismatches, executables without expected publisher metadata.\n- High value: SHA-1 hashes can be cross-referenced with threat intel (note this for the analyst, but don't fabricate lookups).\n- Cross-check: correlate with shimcache and prefetch for execution confirmation.\n- Expected: normal software installs and updates are common — focus on what appeared recently or doesn't belong."}, 'prefetch': {'name': 'Prefetch', 'category': 'Execution', 'function': 'prefetch', 'description': 'Windows Prefetch artifacts recording executable run metadata such as run counts, last run times, and referenced files. They are high-value evidence for userland execution on supported systems.', 'analysis_hint': 'Hunt for recently first-run utilities, script hosts, and remote administration tools. Review loaded file references for dropped DLLs and staging directories.', 'artifact_guidance': "Strong evidence of program execution with run count and timing.\n- Suspicious: low run-count executables (1-3 runs suggest recently introduced tools), script hosts and LOLBins from user-writable paths, known attacker tools, burst execution patterns.\n- Key fields: last run time and run count together tell you when something new appeared.\n- Cross-check: referenced files/directories within prefetch data can reveal staging locations or payload unpacking paths.\n- Expected: system utilities with high run counts are routine — focus on what's new or rare."}, 'bam': {'name': 'BAM/DAM', 'category': 'Execution', 'function': 'bam', 'description': 'Background Activity Moderator and Desktop Activity Moderator execution tracking tied to user SIDs. These entries help attribute process activity to specific user contexts.', 'analysis_hint': 'Correlate BAM/DAM timestamps with logons and process events to identify who launched suspicious binaries. Highlight administrative tools and scripts executed outside normal business patterns.', 'artifact_guidance': "Accurate last-execution timestamps per user. Lightweight but precise.\n- Provides user-to-executable mapping with reliable timestamps — useful for attribution.\n- Suspicious: execution of tools from temp/download/public folders, execution timestamps clustering around incident window.\n- Cross-check: correlate with prefetch and amcache to build a fuller execution picture.\n- Limited data: BAM only stores recent entries and lacks historical depth. Absence doesn't mean non-execution."}, 'userassist': {'name': 'UserAssist', 'category': 'Execution', 'function': 'userassist', 'description': 'Per-user Explorer-driven program execution traces stored in ROT13-encoded registry values. Includes run counts and last execution times for GUI-launched applications.', 'analysis_hint': 'Decode and review rarely used programs, renamed binaries, and LOLBins launched through Explorer. Use run-count deltas and last-run times to identify unusual user behavior.', 'artifact_guidance': 'GUI-driven program execution via Explorer shell, per user.\n- Shows what users launched interactively — useful for distinguishing user actions from automated/service execution.\n- Suspicious: rarely used or newly appearing applications, script hosts and LOLBins launched from Explorer, tools from atypical folders.\n- Key fields: run count and last execution time together show behavioral changes.\n- Limited scope: only captures Explorer-launched programs, not command-line or service execution.'}, 'evtx': {'name': 'Windows Event Logs', 'category': 'Event Logs', 'function': 'evtx', 'description': 'Windows event channel records covering authentication, process creation, services, policy changes, and system health. EVTX is often the backbone for timeline and intrusion reconstruction.', 'analysis_hint': 'Pivot on high-signal event IDs for logon, process creation, service installs, account changes, and log clearing. Correlate actor account, host, and parent-child process chains across Security/System channels.', 'artifact_guidance': "Primary security telemetry and event timeline. Richest artifact for incident reconstruction.\n- High-signal Event IDs to prioritize:\n - Logon: 4624 (success), 4625 (failure), 4634 (logoff), 4648 (explicit creds), 4672 (special privileges)\n - Process: 4688 (process creation — command lines are gold)\n - Services: 7045 (new service installed), 4697 (service install via Security log)\n - Accounts: 4720 (created), 4722 (enabled), 4724 (password reset), 4726 (deleted), 4732/4733 (group membership)\n - Anti-forensic: 1102 (audit log cleared)\n- Build event chains: logon → process creation → persistence change, with timestamps.\n- Flag: unusual logon types (Type 3 network, Type 10 RDP from unexpected sources), process command lines with encoding or download cradles, log gaps suggesting clearing.\n- Volume warning: EVTX can have millions of records. Focus on the incident time window and high-signal IDs. Don't enumerate routine system noise."}, 'defender.evtx': {'name': 'Defender Logs', 'category': 'Event Logs', 'function': 'defender.evtx', 'description': 'Microsoft Defender event logs describing detections, remediation actions, exclusions, and protection state changes. These records show what malware was seen and how protection responded.', 'analysis_hint': 'Identify detection names, severity, and action outcomes (blocked, quarantined, allowed, failed). Flag tamper protection events, exclusion changes, and repeated detections of the same path.', 'artifact_guidance': 'Endpoint protection detection and response events.\n- Key data: threat names, severity, affected file paths, action taken (blocked/quarantined/allowed/failed).\n- Suspicious: detections where remediation failed, repeated detections of the same threat (reinfection), real-time protection disabled, exclusions added near incident window, tamper protection changes.\n- Cross-check: correlate detection timestamps with execution artifacts to assess whether the malware ran before or after detection.\n- Distinguish real malware detections from PUA/adware noise — severity and threat name are the key differentiators.'}, 'mft': {'name': 'MFT', 'category': 'File System', 'function': 'mft', 'description': 'Master File Table metadata for NTFS files and directories, including timestamps, attributes, and record references. MFT helps reconstruct file lifecycle and artifact provenance at scale.', 'analysis_hint': 'Focus on executable/script creation in user profile, temp, and startup paths near incident time. Check for timestamp anomalies and suspicious rename/move patterns suggesting anti-forensics.', 'artifact_guidance': "Complete file metadata with MACB timestamps for every file on the volume.\n- Key technique: compare $STANDARD_INFORMATION timestamps against $FILE_NAME timestamps. Discrepancies suggest timestomping (anti-forensic timestamp manipulation).\n- Suspicious: files created in the incident window in temp/staging directories, executables in unexpected locations, files with creation times newer than modification times (copy indicator).\n- Focus on the incident time window — a full MFT can have millions of entries. Don't enumerate routine system files.\n- Cross-check: file paths found here should correlate with execution artifacts (prefetch, amcache) and persistence mechanisms (runkeys, services, tasks)."}, 'usnjrnl': {'name': 'USN Journal', 'category': 'File System', 'function': 'usnjrnl', 'description': 'NTFS change journal entries capturing create, modify, rename, and delete operations over time. USN is valuable for short-lived files that no longer exist on disk.', 'analysis_hint': 'Track rapid create-delete or rename chains involving scripts, archives, and binaries. Correlate change reasons and timestamps with execution and network artifacts for full activity flow.', 'artifact_guidance': 'NTFS change journal recording file creation, deletion, rename, and attribute changes.\n- Suspicious: file creation/rename in staging directories, batch deletions suggesting cleanup, executable files appearing in temp/download paths, rename operations disguising file types.\n- Anti-forensic value: shows files that were created then deleted (even if they no longer exist on disk).\n- Focus on the incident time window. USN journals can be very large.\n- Cross-check: file operations here should correlate with MFT timestamps, execution artifacts, and recycle bin entries.'}, 'recyclebin': {'name': 'Recycle Bin', 'category': 'File System', 'function': 'recyclebin', 'description': 'Deleted-item metadata including original paths, deletion times, and owning user context. Useful for identifying post-activity cleanup and attempted evidence removal.', 'analysis_hint': 'Prioritize deleted tools, scripts, archives, and credential files tied to suspicious users. Compare deletion timestamps against detection events and command history.', 'artifact_guidance': 'Intentionally deleted files with original path and deletion timestamp.\n- Suspicious: deleted executables, scripts, archives, credential material, log files — especially shortly after suspicious execution or detection events.\n- Clusters of deletions in a short window suggest deliberate evidence cleanup.\n- Key fields: original file path (reveals where the file lived) and deletion timestamp (reveals when cleanup happened).\n- Cross-check: correlate deletion timing with Defender detections, execution artifacts, and EVTX events.'}, 'browser.history': {'name': 'Browser History', 'category': 'User Activity', 'function': 'browser.history', 'description': 'Visited URL records with titles and timestamps from supported web browsers. These entries reveal user browsing intent, reconnaissance, and web-based attack paths.', 'analysis_hint': 'Look for phishing domains, file-sharing links, admin portals, and malware delivery infrastructure. Align visit times with downloads, process execution, and authentication events.', 'artifact_guidance': 'Web browsing history showing URLs visited with timestamps.\n- Suspicious: phishing domains, file-sharing/paste sites, malware delivery URLs, C2 panel access, remote access tool download pages, raw IP addresses, suspicious TLDs, search queries for hacking tools or techniques.\n- Cross-check: correlate visit timestamps with browser downloads and subsequent execution artifacts.\n- Context: browsing patterns can reveal reconnaissance, tool acquisition, or data exfiltration via web services.\n- Expected: routine business browsing is noise — focus on what stands out relative to the investigation context.'}, 'browser.downloads': {'name': 'Browser Downloads', 'category': 'User Activity', 'function': 'browser.downloads', 'description': 'Browser download records linking source URLs to local file paths and timing. This artifact is key for tracing initial payload ingress and user-acquired tools.', 'analysis_hint': 'Flag executable, script, archive, and disk-image downloads from untrusted domains. Correlate downloaded file names and times with Prefetch, Amcache, and Defender activity.', 'artifact_guidance': 'Files downloaded through web browsers with source URL and local save path.\n- Suspicious: downloaded executables, scripts, archives, disk images, office documents with macros — especially from unknown or suspicious URLs.\n- High-value cross-check: a downloaded file that also appears in execution artifacts (prefetch, amcache) confirms the payload was run.\n- Flag: repeated downloads of similarly named files (retry behavior), downloads from raw IP URLs, filename/extension mismatches.\n- Key fields: source URL, local path, download timestamp.'}, 'powershell_history': {'name': 'PowerShell History', 'category': 'User Activity', 'function': 'powershell_history', 'description': 'PSReadLine command history capturing interactive PowerShell commands entered by users. Often exposes attacker tradecraft such as reconnaissance, staging, and command-and-control setup.', 'analysis_hint': 'Hunt for encoded commands, download cradles, credential access, and remote execution cmdlets. Note gaps or abrupt truncation that may indicate history clearing or alternate execution methods.', 'artifact_guidance': 'Direct record of PowerShell commands typed by users. High-value tradecraft evidence.\n- Suspicious: encoded commands (-enc / -EncodedCommand), download cradles (IWR, Invoke-WebRequest, Net.WebClient), execution policy bypasses, AMSI bypasses, credential access cmdlets, discovery commands (whoami, net user, Get-ADUser, nltest), lateral movement (Enter-PSSession, Invoke-Command), file staging and archiving.\n- Anti-forensic: sparse or truncated history may indicate clearing (Clear-History, deletion of ConsoleHost_history.txt).\n- No timestamps: PSReadLine history is a plain text file without timestamps. Sequence matters but timing must come from other artifacts.\n- This is often the highest-signal artifact when present. Treat every line as potentially significant.'}, 'activitiescache': {'name': 'Activities Cache', 'category': 'User Activity', 'function': 'activitiescache', 'description': 'Windows Timeline activity records reflecting user interactions with apps, documents, and URLs. Provides broader behavioral context across applications and time.', 'analysis_hint': 'Use it to build user intent timelines around suspicious periods and identify staging behavior. Prioritize activity involving remote access tools, cloud storage, and sensitive document paths.', 'artifact_guidance': "Windows Timeline database tracking application focus time and user activity.\n- Provides a timeline of what applications the user was actively working in, with timestamps.\n- Suspicious: remote access tool usage, cloud storage clients during off-hours, admin utilities not part of the user's normal role, sensitive document access patterns.\n- Context value: establishes what the user was doing before, during, and after suspicious events detected in other artifacts.\n- Cross-check: correlate with execution artifacts and browser history to build a complete activity narrative."}, 'sru.network_data': {'name': 'SRUM Network Data', 'category': 'Network', 'function': 'sru.network_data', 'description': 'System Resource Usage Monitor network telemetry with per-application usage over time. Shows which apps consumed network bandwidth and when.', 'analysis_hint': 'Identify unusual outbound-heavy applications, especially unsigned or rarely seen executables. Correlate spikes with execution artifacts and possible data exfiltration windows.', 'artifact_guidance': "Network usage statistics per application from the SRUM database.\n- Suspicious: large data volumes from unexpected applications (potential exfiltration), network activity from known attacker tools, unusual applications making network connections.\n- Key fields: application name, bytes sent/received, timestamps.\n- Context: helps identify which processes were communicating and how much data moved, even if network logs aren't available.\n- Limitation: SRUM aggregates data over time intervals, so precise timing of individual connections isn't available."}, 'sru.application': {'name': 'SRUM Application', 'category': 'Network', 'function': 'sru.application', 'description': 'SRUM application resource usage records that provide process-level activity context across time slices. Helpful for spotting persistence or background abuse patterns.', 'analysis_hint': 'Surface low-prevalence applications active during the incident period or outside baseline hours. Cross-check with BAM, Prefetch, and network logs to confirm suspicious sustained activity.', 'artifact_guidance': 'Application resource usage (CPU time, foreground time) from the SRUM database.\n- Suspicious: high resource usage from unexpected or unknown processes, applications running with significant CPU time but zero foreground time (background/hidden execution).\n- Context: helps identify persistent or resource-intensive processes that may indicate crypto mining, data processing, or long-running attacker tools.\n- Cross-check: application names here should correlate with execution artifacts.\n- Limitation: SRUM data is aggregated — it shows that something ran, not exactly what it did.'}, 'shellbags': {'name': 'Shellbags', 'category': 'Registry', 'function': 'shellbags', 'description': 'Registry traces of folders viewed in Explorer, including local, removable, and network paths. Shellbags can preserve evidence even after files or folders are deleted.', 'analysis_hint': 'Look for access to hidden folders, USB volumes, network shares, and unusual archive locations. Use viewed-path chronology to support staging and collection hypotheses.', 'artifact_guidance': "Folder access history from Explorer — shows what directories users browsed.\n- Suspicious: access to network shares, USB/removable media paths, hidden/system directories, archive contents, other users' profiles, credential stores, and sensitive project directories.\n- Context: path access patterns can reveal reconnaissance (browsing through directories looking for data) and collection/staging behavior.\n- Cross-check: correlate accessed folders with file creation/deletion in MFT/USN and data movement to USB devices.\n- Limitation: shows folder access, not individual file access. Timestamps may reflect when the shellbag entry was updated, not necessarily first access."}, 'usb': {'name': 'USB History', 'category': 'Registry', 'function': 'usb', 'description': 'Registry evidence of connected USB devices, including identifiers and connection history metadata. Useful for tracking removable media usage and potential data transfer vectors.', 'analysis_hint': 'Identify unknown devices and compare first/last seen times with suspicious file and user activity. Focus on storage-class devices connected near possible exfiltration or staging events.', 'artifact_guidance': 'USB device connection history from the registry.\n- Key for data exfiltration investigations. Shows what removable storage was connected, when, and by which user.\n- Suspicious: USB devices connected during or shortly after the incident window, devices connected during off-hours, new/unknown devices appearing for the first time near suspicious activity.\n- Key fields: device serial number, vendor/product, first and last connection times.\n- Cross-check: correlate USB connection times with shellbag access to removable media paths and file copy operations in USN journal.'}, 'muicache': {'name': 'MUIcache', 'category': 'Registry', 'function': 'muicache', 'description': 'Cache of executable display strings written when programs are launched via the shell. Can provide residual execution clues for binaries no longer present.', 'analysis_hint': 'Hunt for suspicious executable paths and uncommon tool names absent from standard software inventories. Correlate entries with UserAssist and Shimcache for stronger execution confidence.', 'artifact_guidance': "Supplementary execution evidence — records executable descriptions from PE metadata when programs run.\n- Lower-confidence artifact on its own. Use primarily to corroborate findings from prefetch, amcache, and shimcache.\n- Suspicious: uncommon executables in user-writable directories, entries suggesting renamed binaries (description doesn't match filename), known attacker tool names.\n- Value: can reveal executables that ran but were later deleted, since the MUIcache entry persists in the registry.\n- Limitation: no timestamps. Only shows that something ran at some point. Always pair with other artifacts for timing."}, 'sam': {'name': 'SAM Users', 'category': 'Security', 'function': 'sam', 'description': 'Local Security Account Manager user account records and account state metadata. This artifact supports detection of unauthorized local account creation and privilege abuse.', 'analysis_hint': 'Flag newly created, enabled, or reactivated local accounts, especially admin-capable users. Correlate account changes with logon events and lateral movement artifacts.', 'artifact_guidance': "Local user accounts from the SAM registry hive.\n- Suspicious: recently created accounts (especially near the incident window), accounts added to the Administrators group, accounts with names mimicking system accounts, re-enabled previously disabled accounts, password changes on accounts that shouldn't change.\n- Key fields: account name, creation date, last password change, group memberships, account flags (enabled/disabled).\n- Cross-check: account creation/modification should correlate with EVTX Event IDs 4720, 4722, 4724, 4732.\n- Small artifact: SAM typically has few entries. Review all of them, not just flagged ones."}, 'defender.quarantine': {'name': 'Defender Quarantine', 'category': 'Security', 'function': 'defender.quarantine', 'description': 'Metadata about items quarantined by Microsoft Defender, including source path and detection context. Indicates which suspicious files were contained and where they originated.', 'analysis_hint': 'Confirm whether detections were successfully quarantined and whether the same paths reappear later. Use quarantine artifacts to pivot into file system, execution, and persistence traces.', 'artifact_guidance': "Files quarantined by Windows Defender — direct evidence of detected malware.\n- Every entry here is significant. This is confirmed detection, not a probabilistic indicator.\n- Key fields: original file path, threat name, detection timestamp.\n- Suspicious: quarantined files from startup/persistence locations (suggests malware achieved persistence before detection), repeated quarantine of the same threat (reinfection cycle), quarantine of attacker tools (mimikatz, cobalt strike, etc.).\n- Cross-check: correlate quarantine timestamps with Defender EVTX for remediation success/failure, and with execution artifacts to determine if the malware ran before being caught.\n- Small artifact: review all entries. Don't skip any."}}
def get_artifact_registry(os_type: str) -> dict[str, dict[str, str]]:
707def get_artifact_registry(os_type: str) -> dict[str, dict[str, str]]:
708    """Return the artifact registry appropriate for the given OS type.
709
710    Uses :func:`~app.os_utils.normalize_os_type` for consistent
711    normalisation across the codebase.
712
713    Args:
714        os_type: Operating system identifier as returned by Dissect's
715            ``target.os`` (e.g. ``"windows"``, ``"linux"``).  The value
716            is normalised to lowercase before comparison.
717
718    Returns:
719        The OS-specific artifact registry dictionary.  Defaults to
720        :data:`WINDOWS_ARTIFACT_REGISTRY` for unrecognised OS types.
721    """
722    if normalize_os_type(os_type) == "linux":
723        return LINUX_ARTIFACT_REGISTRY
724    return WINDOWS_ARTIFACT_REGISTRY

Return the artifact registry appropriate for the given OS type.

Uses ~app.os_utils.normalize_os_type() for consistent normalisation across the codebase.

Arguments:
  • os_type: Operating system identifier as returned by Dissect's target.os (e.g. "windows", "linux"). The value is normalised to lowercase before comparison.
Returns:

The OS-specific artifact registry dictionary. Defaults to WINDOWS_ARTIFACT_REGISTRY for unrecognised OS types.