app.audit

Append-only forensic audit trail logging.

Every significant action during an AIFT session (evidence intake, parsing, AI analysis, report generation, etc.) is recorded as a single-line JSON object in audit.jsonl inside the case directory. This module provides the AuditLogger that writes those entries and enforces a closed set of allowed action types via ACTION_TYPES.

The audit log is designed for forensic defensibility:

  • Entries are append-only and never overwritten.
  • Timestamps use UTC ISO 8601 with millisecond precision.
  • Each session receives a unique UUID so concurrent sessions are distinguishable.
  • Tool and Dissect versions are embedded in every record.
Attributes:
  • ACTION_TYPES: Closed set of valid action strings accepted by AuditLogger.log().
  • DEFAULT_TOOL_VERSION: Version string embedded in audit records when none is explicitly provided.
  1"""Append-only forensic audit trail logging.
  2
  3Every significant action during an AIFT session (evidence intake, parsing,
  4AI analysis, report generation, etc.) is recorded as a single-line JSON
  5object in ``audit.jsonl`` inside the case directory.  This module provides
  6the :class:`AuditLogger` that writes those entries and enforces a
  7closed set of allowed action types via :data:`ACTION_TYPES`.
  8
  9The audit log is designed for forensic defensibility:
 10
 11* Entries are append-only and never overwritten.
 12* Timestamps use UTC ISO 8601 with millisecond precision.
 13* Each session receives a unique UUID so concurrent sessions are
 14  distinguishable.
 15* Tool and Dissect versions are embedded in every record.
 16
 17Attributes:
 18    ACTION_TYPES: Closed set of valid action strings accepted by
 19        :meth:`AuditLogger.log`.
 20    DEFAULT_TOOL_VERSION: Version string embedded in audit records when
 21        none is explicitly provided.
 22"""
 23
 24from __future__ import annotations
 25
 26from datetime import date, datetime, time, timezone
 27from importlib import metadata
 28import json
 29import os
 30from pathlib import Path
 31from typing import Any
 32from uuid import uuid4
 33
 34from .version import TOOL_VERSION
 35
 36__all__ = ["AuditLogger"]
 37
 38ACTION_TYPES = frozenset(
 39    {
 40        "case_created",
 41        "evidence_intake",
 42        "image_opened",
 43        "parsing_started",
 44        "parsing_completed",
 45        "parsing_failed",
 46        "parsing_capped",
 47        "analysis_started",
 48        "analysis_completed",
 49        "citation_validation",
 50        "artifact_ai_projection",
 51        "artifact_deduplicated",
 52        "inline_csv_truncated",
 53        "chunked_analysis_started",
 54        "artifact_ai_projection_warning",
 55        "prompt_submitted",
 56        "chat_message_sent",
 57        "chat_response_received",
 58        "chat_data_retrieval",
 59        "chat_history_cleared",
 60        "report_generated",
 61        "hash_verification",
 62        "config_changed",
 63    }
 64)
 65DEFAULT_TOOL_VERSION = TOOL_VERSION
 66
 67
 68def _utc_now_iso8601_ms() -> str:
 69    """Return UTC timestamp in ISO 8601 format with millisecond precision."""
 70    return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
 71
 72
 73def _resolve_dissect_version() -> str:
 74    """Best-effort detection of the installed Dissect version."""
 75    for pkg in ("dissect", "dissect.target"):
 76        try:
 77            return metadata.version(pkg)
 78        except metadata.PackageNotFoundError:
 79            continue
 80    return "unknown"
 81
 82
 83def _json_default(value: Any) -> str:
 84    """Best-effort conversion for non-JSON-native audit detail values."""
 85    if isinstance(value, (datetime, date, time)):
 86        return value.isoformat()
 87    if isinstance(value, Path):
 88        return str(value)
 89    if isinstance(value, (bytes, bytearray, memoryview)):
 90        return bytes(value).hex()
 91    return str(value)
 92
 93
 94class AuditLogger:
 95    """Append-only forensic audit logger writing JSONL entries per action.
 96
 97    Each instance is bound to a single case directory and creates (or
 98    appends to) ``audit.jsonl`` in that directory.  A unique session ID
 99    is generated on construction and embedded in every record, allowing
100    multiple runs against the same case to be differentiated.
101
102    Attributes:
103        case_directory: Resolved path to the case directory.
104        audit_file: Path to the ``audit.jsonl`` file.
105        session_id: UUID string identifying this logger session.
106        tool_version: AIFT version recorded in every audit entry.
107        dissect_version: Dissect framework version string.
108    """
109
110    def __init__(
111        self,
112        case_directory: str | Path,
113        tool_version: str = DEFAULT_TOOL_VERSION,
114        dissect_version: str | None = None,
115    ) -> None:
116        """Initialise the audit logger for a case directory.
117
118        Args:
119            case_directory: Path to the case directory.  Created if it
120                does not exist.
121            tool_version: AIFT version string to embed in records.
122            dissect_version: Explicit Dissect version.  Auto-detected
123                from installed packages when *None*.
124        """
125        self.case_directory = Path(case_directory)
126        self.case_directory.mkdir(parents=True, exist_ok=True)
127
128        self.audit_file = self.case_directory / "audit.jsonl"
129        self.session_id = str(uuid4())
130        self.tool_version = tool_version
131        self.dissect_version = dissect_version or _resolve_dissect_version()
132
133        # Ensure the audit file exists immediately when the logger is created.
134        with self.audit_file.open("ab", buffering=0) as audit_stream:
135            audit_stream.flush()
136
137    def log(self, action: str, details: dict[str, Any]) -> None:
138        """Append one JSON-line audit record for *action*.
139
140        The record includes a UTC timestamp, session ID, tool and Dissect
141        versions, and the caller-supplied *details* dictionary.  The file
142        is opened, written, and flushed for each call to minimise data
143        loss on unexpected termination.
144
145        Args:
146            action: One of the strings in :data:`ACTION_TYPES`.
147            details: Arbitrary dictionary of action-specific metadata.
148
149        Raises:
150            ValueError: If *action* is not in :data:`ACTION_TYPES`.
151            TypeError: If *details* is not a dictionary.
152        """
153        if action not in ACTION_TYPES:
154            allowed = ", ".join(sorted(ACTION_TYPES))
155            raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.")
156
157        if not isinstance(details, dict):
158            raise TypeError("details must be a dictionary.")
159
160        record = {
161            "timestamp": _utc_now_iso8601_ms(),
162            "action": action,
163            "details": details,
164            "session_id": self.session_id,
165            "tool_version": self.tool_version,
166            "dissect_version": self.dissect_version,
167        }
168
169        line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n"
170        with self.audit_file.open("ab", buffering=0) as audit_stream:
171            audit_stream.write(line.encode("utf-8"))
172            audit_stream.flush()
173            os.fsync(audit_stream.fileno())
class AuditLogger:
 95class AuditLogger:
 96    """Append-only forensic audit logger writing JSONL entries per action.
 97
 98    Each instance is bound to a single case directory and creates (or
 99    appends to) ``audit.jsonl`` in that directory.  A unique session ID
100    is generated on construction and embedded in every record, allowing
101    multiple runs against the same case to be differentiated.
102
103    Attributes:
104        case_directory: Resolved path to the case directory.
105        audit_file: Path to the ``audit.jsonl`` file.
106        session_id: UUID string identifying this logger session.
107        tool_version: AIFT version recorded in every audit entry.
108        dissect_version: Dissect framework version string.
109    """
110
111    def __init__(
112        self,
113        case_directory: str | Path,
114        tool_version: str = DEFAULT_TOOL_VERSION,
115        dissect_version: str | None = None,
116    ) -> None:
117        """Initialise the audit logger for a case directory.
118
119        Args:
120            case_directory: Path to the case directory.  Created if it
121                does not exist.
122            tool_version: AIFT version string to embed in records.
123            dissect_version: Explicit Dissect version.  Auto-detected
124                from installed packages when *None*.
125        """
126        self.case_directory = Path(case_directory)
127        self.case_directory.mkdir(parents=True, exist_ok=True)
128
129        self.audit_file = self.case_directory / "audit.jsonl"
130        self.session_id = str(uuid4())
131        self.tool_version = tool_version
132        self.dissect_version = dissect_version or _resolve_dissect_version()
133
134        # Ensure the audit file exists immediately when the logger is created.
135        with self.audit_file.open("ab", buffering=0) as audit_stream:
136            audit_stream.flush()
137
138    def log(self, action: str, details: dict[str, Any]) -> None:
139        """Append one JSON-line audit record for *action*.
140
141        The record includes a UTC timestamp, session ID, tool and Dissect
142        versions, and the caller-supplied *details* dictionary.  The file
143        is opened, written, and flushed for each call to minimise data
144        loss on unexpected termination.
145
146        Args:
147            action: One of the strings in :data:`ACTION_TYPES`.
148            details: Arbitrary dictionary of action-specific metadata.
149
150        Raises:
151            ValueError: If *action* is not in :data:`ACTION_TYPES`.
152            TypeError: If *details* is not a dictionary.
153        """
154        if action not in ACTION_TYPES:
155            allowed = ", ".join(sorted(ACTION_TYPES))
156            raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.")
157
158        if not isinstance(details, dict):
159            raise TypeError("details must be a dictionary.")
160
161        record = {
162            "timestamp": _utc_now_iso8601_ms(),
163            "action": action,
164            "details": details,
165            "session_id": self.session_id,
166            "tool_version": self.tool_version,
167            "dissect_version": self.dissect_version,
168        }
169
170        line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n"
171        with self.audit_file.open("ab", buffering=0) as audit_stream:
172            audit_stream.write(line.encode("utf-8"))
173            audit_stream.flush()
174            os.fsync(audit_stream.fileno())

Append-only forensic audit logger writing JSONL entries per action.

Each instance is bound to a single case directory and creates (or appends to) audit.jsonl in that directory. A unique session ID is generated on construction and embedded in every record, allowing multiple runs against the same case to be differentiated.

Attributes:
  • case_directory: Resolved path to the case directory.
  • audit_file: Path to the audit.jsonl file.
  • session_id: UUID string identifying this logger session.
  • tool_version: AIFT version recorded in every audit entry.
  • dissect_version: Dissect framework version string.
AuditLogger( case_directory: str | pathlib.Path, tool_version: str = '1.4.1', dissect_version: str | None = None)
111    def __init__(
112        self,
113        case_directory: str | Path,
114        tool_version: str = DEFAULT_TOOL_VERSION,
115        dissect_version: str | None = None,
116    ) -> None:
117        """Initialise the audit logger for a case directory.
118
119        Args:
120            case_directory: Path to the case directory.  Created if it
121                does not exist.
122            tool_version: AIFT version string to embed in records.
123            dissect_version: Explicit Dissect version.  Auto-detected
124                from installed packages when *None*.
125        """
126        self.case_directory = Path(case_directory)
127        self.case_directory.mkdir(parents=True, exist_ok=True)
128
129        self.audit_file = self.case_directory / "audit.jsonl"
130        self.session_id = str(uuid4())
131        self.tool_version = tool_version
132        self.dissect_version = dissect_version or _resolve_dissect_version()
133
134        # Ensure the audit file exists immediately when the logger is created.
135        with self.audit_file.open("ab", buffering=0) as audit_stream:
136            audit_stream.flush()

Initialise the audit logger for a case directory.

Arguments:
  • case_directory: Path to the case directory. Created if it does not exist.
  • tool_version: AIFT version string to embed in records.
  • dissect_version: Explicit Dissect version. Auto-detected from installed packages when None.
case_directory
audit_file
session_id
tool_version
dissect_version
def log(self, action: str, details: dict[str, typing.Any]) -> None:
138    def log(self, action: str, details: dict[str, Any]) -> None:
139        """Append one JSON-line audit record for *action*.
140
141        The record includes a UTC timestamp, session ID, tool and Dissect
142        versions, and the caller-supplied *details* dictionary.  The file
143        is opened, written, and flushed for each call to minimise data
144        loss on unexpected termination.
145
146        Args:
147            action: One of the strings in :data:`ACTION_TYPES`.
148            details: Arbitrary dictionary of action-specific metadata.
149
150        Raises:
151            ValueError: If *action* is not in :data:`ACTION_TYPES`.
152            TypeError: If *details* is not a dictionary.
153        """
154        if action not in ACTION_TYPES:
155            allowed = ", ".join(sorted(ACTION_TYPES))
156            raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.")
157
158        if not isinstance(details, dict):
159            raise TypeError("details must be a dictionary.")
160
161        record = {
162            "timestamp": _utc_now_iso8601_ms(),
163            "action": action,
164            "details": details,
165            "session_id": self.session_id,
166            "tool_version": self.tool_version,
167            "dissect_version": self.dissect_version,
168        }
169
170        line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n"
171        with self.audit_file.open("ab", buffering=0) as audit_stream:
172            audit_stream.write(line.encode("utf-8"))
173            audit_stream.flush()
174            os.fsync(audit_stream.fileno())

Append one JSON-line audit record for action.

The record includes a UTC timestamp, session ID, tool and Dissect versions, and the caller-supplied details dictionary. The file is opened, written, and flushed for each call to minimise data loss on unexpected termination.

Arguments:
  • action: One of the strings in ACTION_TYPES.
  • details: Arbitrary dictionary of action-specific metadata.
Raises:
  • ValueError: If action is not in ACTION_TYPES.
  • TypeError: If details is not a dictionary.