app.audit
Append-only forensic audit trail logging.
Every significant action during an AIFT session (evidence intake, parsing,
AI analysis, report generation, etc.) is recorded as a single-line JSON
object in audit.jsonl inside the case directory. This module provides
the AuditLogger that writes those entries and enforces a
closed set of allowed action types via ACTION_TYPES.
The audit log is designed for forensic defensibility:
- Entries are append-only and never overwritten.
- Timestamps use UTC ISO 8601 with millisecond precision.
- Each session receives a unique UUID so concurrent sessions are distinguishable.
- Tool and Dissect versions are embedded in every record.
Attributes:
- ACTION_TYPES: Closed set of valid action strings accepted by
AuditLogger.log(). - DEFAULT_TOOL_VERSION: Version string embedded in audit records when none is explicitly provided.
1"""Append-only forensic audit trail logging. 2 3Every significant action during an AIFT session (evidence intake, parsing, 4AI analysis, report generation, etc.) is recorded as a single-line JSON 5object in ``audit.jsonl`` inside the case directory. This module provides 6the :class:`AuditLogger` that writes those entries and enforces a 7closed set of allowed action types via :data:`ACTION_TYPES`. 8 9The audit log is designed for forensic defensibility: 10 11* Entries are append-only and never overwritten. 12* Timestamps use UTC ISO 8601 with millisecond precision. 13* Each session receives a unique UUID so concurrent sessions are 14 distinguishable. 15* Tool and Dissect versions are embedded in every record. 16 17Attributes: 18 ACTION_TYPES: Closed set of valid action strings accepted by 19 :meth:`AuditLogger.log`. 20 DEFAULT_TOOL_VERSION: Version string embedded in audit records when 21 none is explicitly provided. 22""" 23 24from __future__ import annotations 25 26from datetime import date, datetime, time, timezone 27from importlib import metadata 28import json 29import os 30from pathlib import Path 31from typing import Any 32from uuid import uuid4 33 34from .version import TOOL_VERSION 35 36__all__ = ["AuditLogger"] 37 38ACTION_TYPES = frozenset( 39 { 40 "case_created", 41 "evidence_intake", 42 "image_opened", 43 "parsing_started", 44 "parsing_completed", 45 "parsing_failed", 46 "parsing_capped", 47 "analysis_started", 48 "analysis_completed", 49 "citation_validation", 50 "artifact_ai_projection", 51 "artifact_deduplicated", 52 "inline_csv_truncated", 53 "chunked_analysis_started", 54 "artifact_ai_projection_warning", 55 "prompt_submitted", 56 "chat_message_sent", 57 "chat_response_received", 58 "chat_data_retrieval", 59 "chat_history_cleared", 60 "report_generated", 61 "hash_verification", 62 "config_changed", 63 } 64) 65DEFAULT_TOOL_VERSION = TOOL_VERSION 66 67 68def _utc_now_iso8601_ms() -> str: 69 """Return UTC timestamp in ISO 8601 format with millisecond precision.""" 70 return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 71 72 73def _resolve_dissect_version() -> str: 74 """Best-effort detection of the installed Dissect version.""" 75 for pkg in ("dissect", "dissect.target"): 76 try: 77 return metadata.version(pkg) 78 except metadata.PackageNotFoundError: 79 continue 80 return "unknown" 81 82 83def _json_default(value: Any) -> str: 84 """Best-effort conversion for non-JSON-native audit detail values.""" 85 if isinstance(value, (datetime, date, time)): 86 return value.isoformat() 87 if isinstance(value, Path): 88 return str(value) 89 if isinstance(value, (bytes, bytearray, memoryview)): 90 return bytes(value).hex() 91 return str(value) 92 93 94class AuditLogger: 95 """Append-only forensic audit logger writing JSONL entries per action. 96 97 Each instance is bound to a single case directory and creates (or 98 appends to) ``audit.jsonl`` in that directory. A unique session ID 99 is generated on construction and embedded in every record, allowing 100 multiple runs against the same case to be differentiated. 101 102 Attributes: 103 case_directory: Resolved path to the case directory. 104 audit_file: Path to the ``audit.jsonl`` file. 105 session_id: UUID string identifying this logger session. 106 tool_version: AIFT version recorded in every audit entry. 107 dissect_version: Dissect framework version string. 108 """ 109 110 def __init__( 111 self, 112 case_directory: str | Path, 113 tool_version: str = DEFAULT_TOOL_VERSION, 114 dissect_version: str | None = None, 115 ) -> None: 116 """Initialise the audit logger for a case directory. 117 118 Args: 119 case_directory: Path to the case directory. Created if it 120 does not exist. 121 tool_version: AIFT version string to embed in records. 122 dissect_version: Explicit Dissect version. Auto-detected 123 from installed packages when *None*. 124 """ 125 self.case_directory = Path(case_directory) 126 self.case_directory.mkdir(parents=True, exist_ok=True) 127 128 self.audit_file = self.case_directory / "audit.jsonl" 129 self.session_id = str(uuid4()) 130 self.tool_version = tool_version 131 self.dissect_version = dissect_version or _resolve_dissect_version() 132 133 # Ensure the audit file exists immediately when the logger is created. 134 with self.audit_file.open("ab", buffering=0) as audit_stream: 135 audit_stream.flush() 136 137 def log(self, action: str, details: dict[str, Any]) -> None: 138 """Append one JSON-line audit record for *action*. 139 140 The record includes a UTC timestamp, session ID, tool and Dissect 141 versions, and the caller-supplied *details* dictionary. The file 142 is opened, written, and flushed for each call to minimise data 143 loss on unexpected termination. 144 145 Args: 146 action: One of the strings in :data:`ACTION_TYPES`. 147 details: Arbitrary dictionary of action-specific metadata. 148 149 Raises: 150 ValueError: If *action* is not in :data:`ACTION_TYPES`. 151 TypeError: If *details* is not a dictionary. 152 """ 153 if action not in ACTION_TYPES: 154 allowed = ", ".join(sorted(ACTION_TYPES)) 155 raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.") 156 157 if not isinstance(details, dict): 158 raise TypeError("details must be a dictionary.") 159 160 record = { 161 "timestamp": _utc_now_iso8601_ms(), 162 "action": action, 163 "details": details, 164 "session_id": self.session_id, 165 "tool_version": self.tool_version, 166 "dissect_version": self.dissect_version, 167 } 168 169 line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n" 170 with self.audit_file.open("ab", buffering=0) as audit_stream: 171 audit_stream.write(line.encode("utf-8")) 172 audit_stream.flush() 173 os.fsync(audit_stream.fileno())
95class AuditLogger: 96 """Append-only forensic audit logger writing JSONL entries per action. 97 98 Each instance is bound to a single case directory and creates (or 99 appends to) ``audit.jsonl`` in that directory. A unique session ID 100 is generated on construction and embedded in every record, allowing 101 multiple runs against the same case to be differentiated. 102 103 Attributes: 104 case_directory: Resolved path to the case directory. 105 audit_file: Path to the ``audit.jsonl`` file. 106 session_id: UUID string identifying this logger session. 107 tool_version: AIFT version recorded in every audit entry. 108 dissect_version: Dissect framework version string. 109 """ 110 111 def __init__( 112 self, 113 case_directory: str | Path, 114 tool_version: str = DEFAULT_TOOL_VERSION, 115 dissect_version: str | None = None, 116 ) -> None: 117 """Initialise the audit logger for a case directory. 118 119 Args: 120 case_directory: Path to the case directory. Created if it 121 does not exist. 122 tool_version: AIFT version string to embed in records. 123 dissect_version: Explicit Dissect version. Auto-detected 124 from installed packages when *None*. 125 """ 126 self.case_directory = Path(case_directory) 127 self.case_directory.mkdir(parents=True, exist_ok=True) 128 129 self.audit_file = self.case_directory / "audit.jsonl" 130 self.session_id = str(uuid4()) 131 self.tool_version = tool_version 132 self.dissect_version = dissect_version or _resolve_dissect_version() 133 134 # Ensure the audit file exists immediately when the logger is created. 135 with self.audit_file.open("ab", buffering=0) as audit_stream: 136 audit_stream.flush() 137 138 def log(self, action: str, details: dict[str, Any]) -> None: 139 """Append one JSON-line audit record for *action*. 140 141 The record includes a UTC timestamp, session ID, tool and Dissect 142 versions, and the caller-supplied *details* dictionary. The file 143 is opened, written, and flushed for each call to minimise data 144 loss on unexpected termination. 145 146 Args: 147 action: One of the strings in :data:`ACTION_TYPES`. 148 details: Arbitrary dictionary of action-specific metadata. 149 150 Raises: 151 ValueError: If *action* is not in :data:`ACTION_TYPES`. 152 TypeError: If *details* is not a dictionary. 153 """ 154 if action not in ACTION_TYPES: 155 allowed = ", ".join(sorted(ACTION_TYPES)) 156 raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.") 157 158 if not isinstance(details, dict): 159 raise TypeError("details must be a dictionary.") 160 161 record = { 162 "timestamp": _utc_now_iso8601_ms(), 163 "action": action, 164 "details": details, 165 "session_id": self.session_id, 166 "tool_version": self.tool_version, 167 "dissect_version": self.dissect_version, 168 } 169 170 line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n" 171 with self.audit_file.open("ab", buffering=0) as audit_stream: 172 audit_stream.write(line.encode("utf-8")) 173 audit_stream.flush() 174 os.fsync(audit_stream.fileno())
Append-only forensic audit logger writing JSONL entries per action.
Each instance is bound to a single case directory and creates (or
appends to) audit.jsonl in that directory. A unique session ID
is generated on construction and embedded in every record, allowing
multiple runs against the same case to be differentiated.
Attributes:
- case_directory: Resolved path to the case directory.
- audit_file: Path to the
audit.jsonlfile. - session_id: UUID string identifying this logger session.
- tool_version: AIFT version recorded in every audit entry.
- dissect_version: Dissect framework version string.
111 def __init__( 112 self, 113 case_directory: str | Path, 114 tool_version: str = DEFAULT_TOOL_VERSION, 115 dissect_version: str | None = None, 116 ) -> None: 117 """Initialise the audit logger for a case directory. 118 119 Args: 120 case_directory: Path to the case directory. Created if it 121 does not exist. 122 tool_version: AIFT version string to embed in records. 123 dissect_version: Explicit Dissect version. Auto-detected 124 from installed packages when *None*. 125 """ 126 self.case_directory = Path(case_directory) 127 self.case_directory.mkdir(parents=True, exist_ok=True) 128 129 self.audit_file = self.case_directory / "audit.jsonl" 130 self.session_id = str(uuid4()) 131 self.tool_version = tool_version 132 self.dissect_version = dissect_version or _resolve_dissect_version() 133 134 # Ensure the audit file exists immediately when the logger is created. 135 with self.audit_file.open("ab", buffering=0) as audit_stream: 136 audit_stream.flush()
Initialise the audit logger for a case directory.
Arguments:
- case_directory: Path to the case directory. Created if it does not exist.
- tool_version: AIFT version string to embed in records.
- dissect_version: Explicit Dissect version. Auto-detected from installed packages when None.
138 def log(self, action: str, details: dict[str, Any]) -> None: 139 """Append one JSON-line audit record for *action*. 140 141 The record includes a UTC timestamp, session ID, tool and Dissect 142 versions, and the caller-supplied *details* dictionary. The file 143 is opened, written, and flushed for each call to minimise data 144 loss on unexpected termination. 145 146 Args: 147 action: One of the strings in :data:`ACTION_TYPES`. 148 details: Arbitrary dictionary of action-specific metadata. 149 150 Raises: 151 ValueError: If *action* is not in :data:`ACTION_TYPES`. 152 TypeError: If *details* is not a dictionary. 153 """ 154 if action not in ACTION_TYPES: 155 allowed = ", ".join(sorted(ACTION_TYPES)) 156 raise ValueError(f"Unsupported action '{action}'. Allowed values: {allowed}.") 157 158 if not isinstance(details, dict): 159 raise TypeError("details must be a dictionary.") 160 161 record = { 162 "timestamp": _utc_now_iso8601_ms(), 163 "action": action, 164 "details": details, 165 "session_id": self.session_id, 166 "tool_version": self.tool_version, 167 "dissect_version": self.dissect_version, 168 } 169 170 line = json.dumps(record, separators=(",", ":"), default=_json_default) + "\n" 171 with self.audit_file.open("ab", buffering=0) as audit_stream: 172 audit_stream.write(line.encode("utf-8")) 173 audit_stream.flush() 174 os.fsync(audit_stream.fileno())
Append one JSON-line audit record for action.
The record includes a UTC timestamp, session ID, tool and Dissect versions, and the caller-supplied details dictionary. The file is opened, written, and flushed for each call to minimise data loss on unexpected termination.
Arguments:
- action: One of the strings in
ACTION_TYPES. - details: Arbitrary dictionary of action-specific metadata.
Raises:
- ValueError: If action is not in
ACTION_TYPES. - TypeError: If details is not a dictionary.