app.reporter.generator

HTML report generation for forensic analysis results.

Renders AI analysis findings, evidence metadata, hash verification status, and the audit trail into a self-contained HTML file using Jinja2 templates. The generated report includes all CSS inlined so it can be opened as a standalone file without a web server.

Key capabilities:

  • Flexible input normalisation -- Per-artifact findings can be supplied as a list, a dict keyed by artifact name, or a single finding mapping; the generator coerces all shapes into a uniform list.
  • Logo embedding -- The project logo is base64-encoded and embedded as a data: URI so the report is fully self-contained.

Markdown rendering and confidence highlighting are delegated to app.reporter.markdown.

Attributes:
  • DEFAULT_CASE_NAME: Fallback case name when none is provided.
  • DEFAULT_TOOL_VERSION: AIFT version from app.version.
  • DEFAULT_AI_PROVIDER: Placeholder string when the provider is unknown.
  • SAFE_CASE_ID_PATTERN: Regex for sanitising case IDs.
  1"""HTML report generation for forensic analysis results.
  2
  3Renders AI analysis findings, evidence metadata, hash verification status,
  4and the audit trail into a self-contained HTML file using Jinja2 templates.
  5The generated report includes all CSS inlined so it can be opened as a
  6standalone file without a web server.
  7
  8Key capabilities:
  9
 10* **Flexible input normalisation** -- Per-artifact findings can be
 11  supplied as a list, a dict keyed by artifact name, or a single finding
 12  mapping; the generator coerces all shapes into a uniform list.
 13* **Logo embedding** -- The project logo is base64-encoded and embedded as
 14  a ``data:`` URI so the report is fully self-contained.
 15
 16Markdown rendering and confidence highlighting are delegated to
 17:mod:`app.reporter.markdown`.
 18
 19Attributes:
 20    DEFAULT_CASE_NAME: Fallback case name when none is provided.
 21    DEFAULT_TOOL_VERSION: AIFT version from :mod:`app.version`.
 22    DEFAULT_AI_PROVIDER: Placeholder string when the provider is unknown.
 23    SAFE_CASE_ID_PATTERN: Regex for sanitising case IDs.
 24"""
 25
 26from __future__ import annotations
 27
 28import base64
 29from collections.abc import Mapping, Sequence
 30from datetime import datetime, timezone
 31import json
 32from pathlib import Path
 33import re
 34from typing import Any
 35
 36from jinja2 import Environment, FileSystemLoader, select_autoescape
 37
 38from ..config import LOGO_FILE_CANDIDATES
 39from ..version import TOOL_VERSION
 40from .markdown import (
 41    CONFIDENCE_CLASS_MAP,
 42    CONFIDENCE_PATTERN,
 43    format_block,
 44    format_markdown_block,
 45)
 46
 47__all__ = ["ReportGenerator"]
 48
 49DEFAULT_CASE_NAME = "Untitled Investigation"
 50DEFAULT_TOOL_VERSION = TOOL_VERSION
 51DEFAULT_AI_PROVIDER = "unknown"
 52
 53SAFE_CASE_ID_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
 54
 55
 56class ReportGenerator:
 57    """Render investigation results into a standalone HTML report.
 58
 59    Sets up a Jinja2 :class:`~jinja2.Environment` with custom filters for
 60    Markdown-to-HTML conversion and confidence token highlighting.  The
 61    :meth:`generate` method assembles all case data into a template context
 62    and writes the rendered HTML to the case's ``reports/`` directory.
 63
 64    Attributes:
 65        templates_dir: Directory containing Jinja2 HTML templates.
 66        cases_root: Parent directory where case subdirectories live.
 67        environment: Configured Jinja2 rendering environment.
 68        template: The loaded report template object.
 69    """
 70
 71    def __init__(
 72        self,
 73        templates_dir: str | Path | None = None,
 74        cases_root: str | Path | None = None,
 75        template_name: str = "report_template.html",
 76    ) -> None:
 77        """Initialise the report generator.
 78
 79        Args:
 80            templates_dir: Path to the Jinja2 templates directory.  Defaults
 81                to ``<project_root>/templates/``.
 82            cases_root: Parent directory for case output.  Defaults to
 83                ``<project_root>/cases/``.
 84            template_name: Filename of the Jinja2 report template.
 85        """
 86        project_root = Path(__file__).resolve().parents[2]
 87        self.templates_dir = Path(templates_dir) if templates_dir is not None else project_root / "templates"
 88        self.cases_root = Path(cases_root) if cases_root is not None else project_root / "cases"
 89
 90        self.environment = Environment(
 91            loader=FileSystemLoader(str(self.templates_dir)),
 92            autoescape=select_autoescape(["html", "xml"]),
 93            trim_blocks=True,
 94            lstrip_blocks=True,
 95        )
 96        self.environment.filters["format_block"] = format_block
 97        self.environment.filters["format_markdown_block"] = format_markdown_block
 98        self.template = self.environment.get_template(template_name)
 99
100    def generate(
101        self,
102        analysis_results: dict[str, Any],
103        image_metadata: dict[str, Any],
104        evidence_hashes: dict[str, Any],
105        investigation_context: str,
106        audit_log_entries: list[dict[str, Any]],
107    ) -> Path:
108        """Generate a standalone HTML report and write it to disk.
109
110        Assembles evidence metadata, AI analysis, hash verification, and
111        the audit trail into a Jinja2 template context, renders the HTML,
112        and writes the output to ``cases/<case_id>/reports/``.
113
114        Args:
115            analysis_results: Dictionary containing per-artifact findings,
116                executive summary, model info, and case identifiers.
117            image_metadata: System metadata from the disk image (hostname,
118                OS version, domain, IPs, etc.).
119            evidence_hashes: Hash digests and verification status from
120                evidence intake.
121            investigation_context: Free-text description of the
122                investigation scope and timeline.
123            audit_log_entries: List of audit trail JSONL records.
124
125        Returns:
126            :class:`~pathlib.Path` to the generated HTML report file.
127
128        Raises:
129            ValueError: If a case identifier cannot be determined.
130        """
131        analysis = dict(analysis_results or {})
132        metadata = dict(image_metadata or {})
133        hashes = dict(evidence_hashes or {})
134        audit_entries = self._normalize_audit_entries(audit_log_entries)
135
136        case_id = self._resolve_case_id(analysis, metadata, hashes)
137        case_name = self._resolve_case_name(analysis)
138        generated_at = datetime.now(timezone.utc)
139        generated_iso = generated_at.isoformat(timespec="seconds").replace("+00:00", "Z")
140        report_timestamp = generated_at.strftime("%Y%m%d_%H%M%S")
141
142        summary_text = self._stringify(analysis.get("summary"))
143        executive_summary = self._stringify(analysis.get("executive_summary") or summary_text)
144
145        per_artifact = self._normalize_per_artifact_findings(analysis)
146        evidence_summary = self._build_evidence_summary(metadata, hashes)
147        hash_verification = self._resolve_hash_verification(hashes)
148
149        render_context = {
150            "case_name": case_name,
151            "case_id": case_id,
152            "generated_at": generated_iso,
153            "tool_version": self._resolve_tool_version(analysis, audit_entries),
154            "ai_provider": self._resolve_ai_provider(analysis),
155            "logo_data_uri": self._resolve_logo_data_uri(),
156            "evidence": evidence_summary,
157            "hash_verification": hash_verification,
158            "investigation_context": self._stringify(investigation_context, default="No investigation context provided."),
159            "executive_summary": executive_summary,
160            "per_artifact_findings": per_artifact,
161            "audit_entries": audit_entries,
162        }
163
164        rendered = self.template.render(**render_context)
165
166        report_dir = self.cases_root / case_id / "reports"
167        report_dir.mkdir(parents=True, exist_ok=True)
168        report_path = report_dir / f"report_{report_timestamp}.html"
169        report_path.write_text(rendered, encoding="utf-8")
170        return report_path
171
172    def _resolve_logo_data_uri(self) -> str:
173        """Locate the project logo and return it as a base64 ``data:`` URI.
174
175        Returns:
176            A ``data:image/...;base64,...`` string, or ``""`` if no logo found.
177        """
178        project_root = Path(__file__).resolve().parents[2]
179        images_dir = project_root / "images"
180        if not images_dir.is_dir():
181            return ""
182
183        for filename in LOGO_FILE_CANDIDATES:
184            candidate = images_dir / filename
185            if candidate.is_file():
186                return self._file_to_data_uri(candidate)
187
188        fallback_images = sorted(
189            path
190            for path in images_dir.iterdir()
191            if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"}
192        )
193        if fallback_images:
194            return self._file_to_data_uri(fallback_images[0])
195
196        return ""
197
198    @staticmethod
199    def _file_to_data_uri(path: Path) -> str:
200        """Read a file and encode it as a base64 data URI string.
201
202        Args:
203            path: Path to the image file.
204
205        Returns:
206            A ``data:<mime>;base64,...`` URI string.
207        """
208        mime_types = {
209            ".png": "image/png",
210            ".jpg": "image/jpeg",
211            ".jpeg": "image/jpeg",
212            ".webp": "image/webp",
213            ".svg": "image/svg+xml",
214        }
215        mime = mime_types.get(path.suffix.lower(), "application/octet-stream")
216        encoded = base64.b64encode(path.read_bytes()).decode("ascii")
217        return f"data:{mime};base64,{encoded}"
218
219    def _resolve_case_id(
220        self,
221        analysis: Mapping[str, Any],
222        metadata: Mapping[str, Any],
223        hashes: Mapping[str, Any],
224    ) -> str:
225        """Extract and sanitise a case ID from the available data sources.
226
227        Raises:
228            ValueError: If no case identifier can be determined.
229        """
230        candidates = [
231            analysis.get("case_id"),
232            analysis.get("id"),
233            hashes.get("case_id"),
234            metadata.get("case_id"),
235        ]
236
237        nested_case = analysis.get("case")
238        if isinstance(nested_case, Mapping):
239            candidates.extend([nested_case.get("id"), nested_case.get("case_id")])
240
241        for candidate in candidates:
242            value = self._stringify(candidate, default="")
243            if value:
244                safe = SAFE_CASE_ID_PATTERN.sub("_", value).strip("_")
245                if safe:
246                    return safe
247
248        raise ValueError("Unable to determine case identifier for report generation.")
249
250    def _resolve_case_name(self, analysis: Mapping[str, Any]) -> str:
251        """Determine a human-readable case name, falling back to a default."""
252        nested_case = analysis.get("case")
253        if isinstance(nested_case, Mapping):
254            nested_name = self._stringify(nested_case.get("name"), default="")
255            if nested_name:
256                return nested_name
257
258        return self._stringify(analysis.get("case_name"), default=DEFAULT_CASE_NAME)
259
260    def _resolve_tool_version(
261        self,
262        analysis: Mapping[str, Any],
263        audit_entries: list[dict[str, str]],
264    ) -> str:
265        """Determine the tool version from analysis data or audit entries."""
266        explicit_version = self._stringify(analysis.get("tool_version"), default="")
267        if explicit_version:
268            return explicit_version
269
270        for entry in reversed(audit_entries):
271            version = self._stringify(entry.get("tool_version"), default="")
272            if version:
273                return version
274
275        return DEFAULT_TOOL_VERSION
276
277    def _resolve_ai_provider(self, analysis: Mapping[str, Any]) -> str:
278        """Determine the AI provider label for the report header."""
279        explicit = self._stringify(analysis.get("ai_provider"), default="")
280        if explicit:
281            return explicit
282
283        model_info = analysis.get("model_info")
284        if isinstance(model_info, Mapping):
285            provider = self._stringify(model_info.get("provider"), default=DEFAULT_AI_PROVIDER)
286            model = self._stringify(model_info.get("model"), default="")
287            if model:
288                return f"{provider} ({model})"
289            return provider
290
291        return DEFAULT_AI_PROVIDER
292
293    def _build_evidence_summary(
294        self,
295        metadata: Mapping[str, Any],
296        hashes: Mapping[str, Any],
297    ) -> dict[str, str]:
298        """Assemble evidence summary fields for the report template.
299
300        Returns:
301            Dictionary with ``filename``, ``sha256``, ``md5``, ``file_size``,
302            ``hostname``, ``os_version``, ``domain``, and ``ips``.
303        """
304        hostname = self._stringify(metadata.get("hostname"), default="Unknown")
305        os_value = self._stringify(metadata.get("os_version") or metadata.get("os"), default="Unknown")
306        domain = self._stringify(metadata.get("domain"), default="Unknown")
307        ips = self._stringify_ips(metadata.get("ips") or metadata.get("ip_addresses") or metadata.get("ip"))
308
309        size_value = hashes.get("size_bytes")
310        if size_value is None:
311            size_value = hashes.get("file_size_bytes")
312
313        return {
314            "filename": self._stringify(
315                hashes.get("filename") or hashes.get("file_name") or metadata.get("filename"),
316                default="Unknown",
317            ),
318            "sha256": self._stringify(hashes.get("sha256"), default="N/A"),
319            "md5": self._stringify(hashes.get("md5"), default="N/A"),
320            "file_size": self._format_file_size(size_value),
321            "hostname": hostname,
322            "os_version": os_value,
323            "domain": domain,
324            "ips": ips,
325        }
326
327    def _resolve_hash_verification(self, hashes: Mapping[str, Any]) -> dict[str, str | bool]:
328        """Determine hash verification PASS/FAIL status for the report.
329
330        Returns:
331            Dictionary with ``passed`` (bool), ``label`` (``"PASS"`` or
332            ``"FAIL"``), and ``detail`` (human-readable explanation).
333        """
334        explicit = hashes.get("hash_verified")
335        if explicit is None:
336            explicit = hashes.get("verification_passed")
337        if explicit is None:
338            explicit = hashes.get("verified")
339
340        if isinstance(explicit, str) and explicit.strip().lower() == "skipped":
341            return {
342                "passed": True,
343                "skipped": True,
344                "label": "SKIPPED",
345                "detail": "Hash computation was skipped at user request during evidence intake.",
346            }
347        if isinstance(explicit, bool):
348            passed = explicit
349            detail = "Hash verification explicitly reported by workflow."
350            return {"passed": passed, "label": "PASS" if passed else "FAIL", "detail": detail}
351        if isinstance(explicit, str):
352            normalized_explicit = explicit.strip().lower()
353            if normalized_explicit in {"true", "pass", "passed", "ok", "yes"}:
354                return {
355                    "passed": True,
356                    "label": "PASS",
357                    "detail": "Hash verification explicitly reported by workflow.",
358                }
359            if normalized_explicit in {"false", "fail", "failed", "no"}:
360                return {
361                    "passed": False,
362                    "label": "FAIL",
363                    "detail": "Hash verification explicitly reported by workflow.",
364                }
365
366        expected = self._stringify(
367            hashes.get("expected_sha256") or hashes.get("intake_sha256") or hashes.get("original_sha256"),
368            default="",
369        ).lower()
370        observed = self._stringify(
371            hashes.get("reverified_sha256") or hashes.get("current_sha256") or hashes.get("computed_sha256"),
372            default="",
373        ).lower()
374
375        if expected and observed:
376            passed = expected == observed
377            detail = "Re-verified SHA-256 matches intake hash." if passed else "Re-verified SHA-256 does not match intake hash."
378            return {"passed": passed, "label": "PASS" if passed else "FAIL", "detail": detail}
379
380        return {
381            "passed": False,
382            "label": "FAIL",
383            "detail": "Insufficient data to validate hash integrity.",
384        }
385
386    def _normalize_per_artifact_findings(self, analysis: Mapping[str, Any]) -> list[dict[str, Any]]:
387        """Normalise per-artifact findings into a uniform list of dicts.
388
389        Accepts lists, dicts keyed by artifact name, or single-finding
390        mappings and coerces them into a list with consistent keys.
391
392        Returns:
393            List of dicts with ``artifact_name``, ``artifact_key``,
394            ``analysis``, ``record_count``, ``time_range_start``,
395            ``time_range_end``, ``key_data_points``, ``confidence_label``,
396            and ``confidence_class``.
397        """
398        raw_findings = analysis.get("per_artifact")
399        if raw_findings is None:
400            raw_findings = analysis.get("per_artifact_findings")
401
402        findings: list[dict[str, Any]] = []
403        iterable = self._coerce_per_artifact_iterable(raw_findings)
404
405        for index, finding in enumerate(iterable, start=1):
406            if not isinstance(finding, Mapping):
407                continue
408
409            artifact_name = self._stringify(
410                finding.get("artifact_name") or finding.get("name") or finding.get("artifact_key"),
411                default=f"Artifact {index}",
412            )
413            artifact_key = self._stringify(finding.get("artifact_key"), default="")
414            analysis_text = self._stringify(
415                finding.get("analysis") or finding.get("findings") or finding.get("text"),
416                default="No findings were provided.",
417            )
418            confidence_label, confidence_class = self._resolve_confidence(
419                self._stringify(finding.get("confidence"), default=""),
420                analysis_text,
421            )
422
423            time_range_start = self._stringify(
424                finding.get("time_range_start") or self._nested_lookup(finding, ("time_range", "start")),
425                default="N/A",
426            )
427            time_range_end = self._stringify(
428                finding.get("time_range_end") or self._nested_lookup(finding, ("time_range", "end")),
429                default="N/A",
430            )
431            record_count = self._stringify(finding.get("record_count"), default="N/A")
432            key_data_points = self._normalize_key_data_points(
433                finding.get("key_data_points") or finding.get("key_points") or finding.get("data_points")
434            )
435
436            findings.append(
437                {
438                    "artifact_name": artifact_name,
439                    "artifact_key": artifact_key,
440                    "analysis": analysis_text,
441                    "record_count": record_count,
442                    "time_range_start": time_range_start,
443                    "time_range_end": time_range_end,
444                    "key_data_points": key_data_points,
445                    "confidence_label": confidence_label,
446                    "confidence_class": confidence_class,
447                }
448            )
449
450        return findings
451
452    def _coerce_per_artifact_iterable(self, raw_findings: Any) -> Sequence[Any]:
453        """Coerce various per-artifact finding shapes into a sequence."""
454        if isinstance(raw_findings, Sequence) and not isinstance(raw_findings, (str, bytes, bytearray)):
455            return raw_findings
456
457        if isinstance(raw_findings, Mapping):
458            if self._looks_like_single_finding(raw_findings):
459                return [raw_findings]
460
461            coerced: list[dict[str, Any]] = []
462            for artifact_key, raw_value in raw_findings.items():
463                if isinstance(raw_value, Mapping):
464                    merged = dict(raw_value)
465                    merged.setdefault("artifact_key", self._stringify(artifact_key, default=""))
466                    if not self._stringify(merged.get("artifact_name"), default=""):
467                        merged["artifact_name"] = self._stringify(artifact_key, default="Unknown Artifact")
468                    coerced.append(merged)
469                    continue
470
471                analysis_text = self._stringify(raw_value, default="")
472                if not analysis_text:
473                    continue
474                artifact_label = self._stringify(artifact_key, default="Unknown Artifact")
475                coerced.append(
476                    {
477                        "artifact_key": artifact_label,
478                        "artifact_name": artifact_label,
479                        "analysis": analysis_text,
480                    }
481                )
482            return coerced
483
484        return []
485
486    @staticmethod
487    def _looks_like_single_finding(value: Mapping[str, Any]) -> bool:
488        """Return *True* if *value* appears to be a single finding mapping."""
489        finding_keys = {
490            "artifact_name",
491            "name",
492            "artifact_key",
493            "analysis",
494            "findings",
495            "text",
496            "record_count",
497            "time_range_start",
498            "time_range_end",
499            "time_range",
500            "key_data_points",
501            "key_points",
502            "data_points",
503            "confidence",
504        }
505        return any(key in value for key in finding_keys)
506
507    def _normalize_key_data_points(self, raw_points: Any) -> list[dict[str, str]]:
508        """Normalise key data points into a list of ``{timestamp, value}`` dicts."""
509        if isinstance(raw_points, Sequence) and not isinstance(raw_points, (str, bytes, bytearray)):
510            points: list[dict[str, str]] = []
511            for point in raw_points:
512                if isinstance(point, Mapping):
513                    timestamp = self._stringify(
514                        point.get("timestamp") or point.get("time") or point.get("date") or point.get("ts"),
515                        default="",
516                    )
517                    value = self._stringify(
518                        point.get("value") or point.get("data") or point.get("detail") or point.get("event"),
519                        default="",
520                    )
521                    if not value:
522                        value = self._mapping_to_kv_text(point)
523                    points.append({"timestamp": timestamp, "value": value})
524                else:
525                    text_value = self._stringify(point, default="")
526                    if text_value:
527                        points.append({"timestamp": "", "value": text_value})
528            return points
529
530        if isinstance(raw_points, Mapping):
531            return [{"timestamp": "", "value": self._mapping_to_kv_text(raw_points)}]
532
533        if raw_points is None:
534            return []
535
536        text_value = self._stringify(raw_points, default="")
537        if text_value:
538            return [{"timestamp": "", "value": text_value}]
539        return []
540
541    def _normalize_audit_entries(self, entries: Sequence[Any] | None) -> list[dict[str, str]]:
542        """Normalise raw audit log entries into template-ready dicts."""
543        if entries is None:
544            return []
545
546        normalized: list[dict[str, str]] = []
547        for entry in entries:
548            mapping = self._coerce_mapping(entry)
549            if mapping is None:
550                continue
551
552            details_value = mapping.get("details")
553            if isinstance(details_value, Mapping):
554                details_text = json.dumps(details_value, sort_keys=True, indent=2)
555                details_is_structured = True
556            elif isinstance(details_value, Sequence) and not isinstance(details_value, (str, bytes, bytearray)):
557                details_text = json.dumps(list(details_value), indent=2)
558                details_is_structured = True
559            else:
560                details_text = self._stringify(details_value, default="")
561                details_is_structured = False
562
563            normalized.append(
564                {
565                    "timestamp": self._stringify(mapping.get("timestamp"), default="N/A"),
566                    "action": self._stringify(mapping.get("action"), default="unknown"),
567                    "details": details_text,
568                    "details_is_structured": details_is_structured,
569                    "tool_version": self._stringify(mapping.get("tool_version"), default=""),
570                }
571            )
572
573        return normalized
574
575    @staticmethod
576    def _resolve_confidence(explicit_value: str, analysis_text: str) -> tuple[str, str]:
577        """Determine confidence label and CSS class from explicit value or text.
578
579        Returns:
580            Tuple of ``(label, css_class)`` -- e.g. ``("HIGH", "confidence-high")``.
581        """
582        if explicit_value:
583            label = explicit_value.strip().upper()
584            if label in CONFIDENCE_CLASS_MAP:
585                return label, CONFIDENCE_CLASS_MAP[label]
586
587        match = CONFIDENCE_PATTERN.search(analysis_text or "")
588        if match:
589            label = match.group(1).upper()
590            return label, CONFIDENCE_CLASS_MAP[label]
591
592        return "UNSPECIFIED", "confidence-unknown"
593
594    @staticmethod
595    def _nested_lookup(mapping: Mapping[str, Any], path: tuple[str, str]) -> Any:
596        """Traverse a nested mapping using a two-element key path."""
597        current: Any = mapping
598        for key in path:
599            if not isinstance(current, Mapping):
600                return None
601            current = current.get(key)
602        return current
603
604    @staticmethod
605    def _coerce_mapping(value: Any) -> dict[str, Any] | None:
606        """Attempt to coerce *value* into a plain dict, or return *None*."""
607        if isinstance(value, Mapping):
608            return dict(value)
609        if isinstance(value, str):
610            stripped = value.strip()
611            if not stripped:
612                return None
613            try:
614                parsed = json.loads(stripped)
615            except json.JSONDecodeError:
616                return None
617            if isinstance(parsed, Mapping):
618                return dict(parsed)
619        return None
620
621    @staticmethod
622    def _format_file_size(size_value: Any) -> str:
623        """Format a byte count as a human-readable size string (e.g. ``1.50 GB``)."""
624        if size_value is None:
625            return "N/A"
626
627        try:
628            size = int(size_value)
629        except (TypeError, ValueError):
630            return str(size_value)
631
632        units = ["B", "KB", "MB", "GB", "TB"]
633        working = float(size)
634        unit = units[0]
635        for candidate in units:
636            unit = candidate
637            if working < 1024.0 or candidate == units[-1]:
638                break
639            working /= 1024.0
640
641        if unit == "B":
642            return f"{int(working)} {unit}"
643        return f"{working:.2f} {unit} ({size} bytes)"
644
645    @staticmethod
646    def _stringify_ips(value: Any) -> str:
647        """Format IP addresses as a comma-separated string."""
648        if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)):
649            cleaned = [str(item).strip() for item in value if str(item).strip()]
650            return ", ".join(cleaned) if cleaned else "Unknown"
651
652        text = str(value).strip() if value is not None else ""
653        return text or "Unknown"
654
655    @staticmethod
656    def _mapping_to_kv_text(value: Mapping[str, Any]) -> str:
657        """Convert a mapping to a ``key=value; ...`` text representation."""
658        parts = [
659            f"{str(key)}={str(item)}"
660            for key, item in value.items()
661            if item not in (None, "")
662        ]
663        return "; ".join(parts)
664
665    @staticmethod
666    def _stringify(value: Any, default: str = "") -> str:
667        """Convert *value* to a stripped string, returning *default* if empty."""
668        if value is None:
669            return default
670        text = str(value).strip()
671        return text if text else default
class ReportGenerator:
 57class ReportGenerator:
 58    """Render investigation results into a standalone HTML report.
 59
 60    Sets up a Jinja2 :class:`~jinja2.Environment` with custom filters for
 61    Markdown-to-HTML conversion and confidence token highlighting.  The
 62    :meth:`generate` method assembles all case data into a template context
 63    and writes the rendered HTML to the case's ``reports/`` directory.
 64
 65    Attributes:
 66        templates_dir: Directory containing Jinja2 HTML templates.
 67        cases_root: Parent directory where case subdirectories live.
 68        environment: Configured Jinja2 rendering environment.
 69        template: The loaded report template object.
 70    """
 71
 72    def __init__(
 73        self,
 74        templates_dir: str | Path | None = None,
 75        cases_root: str | Path | None = None,
 76        template_name: str = "report_template.html",
 77    ) -> None:
 78        """Initialise the report generator.
 79
 80        Args:
 81            templates_dir: Path to the Jinja2 templates directory.  Defaults
 82                to ``<project_root>/templates/``.
 83            cases_root: Parent directory for case output.  Defaults to
 84                ``<project_root>/cases/``.
 85            template_name: Filename of the Jinja2 report template.
 86        """
 87        project_root = Path(__file__).resolve().parents[2]
 88        self.templates_dir = Path(templates_dir) if templates_dir is not None else project_root / "templates"
 89        self.cases_root = Path(cases_root) if cases_root is not None else project_root / "cases"
 90
 91        self.environment = Environment(
 92            loader=FileSystemLoader(str(self.templates_dir)),
 93            autoescape=select_autoescape(["html", "xml"]),
 94            trim_blocks=True,
 95            lstrip_blocks=True,
 96        )
 97        self.environment.filters["format_block"] = format_block
 98        self.environment.filters["format_markdown_block"] = format_markdown_block
 99        self.template = self.environment.get_template(template_name)
100
101    def generate(
102        self,
103        analysis_results: dict[str, Any],
104        image_metadata: dict[str, Any],
105        evidence_hashes: dict[str, Any],
106        investigation_context: str,
107        audit_log_entries: list[dict[str, Any]],
108    ) -> Path:
109        """Generate a standalone HTML report and write it to disk.
110
111        Assembles evidence metadata, AI analysis, hash verification, and
112        the audit trail into a Jinja2 template context, renders the HTML,
113        and writes the output to ``cases/<case_id>/reports/``.
114
115        Args:
116            analysis_results: Dictionary containing per-artifact findings,
117                executive summary, model info, and case identifiers.
118            image_metadata: System metadata from the disk image (hostname,
119                OS version, domain, IPs, etc.).
120            evidence_hashes: Hash digests and verification status from
121                evidence intake.
122            investigation_context: Free-text description of the
123                investigation scope and timeline.
124            audit_log_entries: List of audit trail JSONL records.
125
126        Returns:
127            :class:`~pathlib.Path` to the generated HTML report file.
128
129        Raises:
130            ValueError: If a case identifier cannot be determined.
131        """
132        analysis = dict(analysis_results or {})
133        metadata = dict(image_metadata or {})
134        hashes = dict(evidence_hashes or {})
135        audit_entries = self._normalize_audit_entries(audit_log_entries)
136
137        case_id = self._resolve_case_id(analysis, metadata, hashes)
138        case_name = self._resolve_case_name(analysis)
139        generated_at = datetime.now(timezone.utc)
140        generated_iso = generated_at.isoformat(timespec="seconds").replace("+00:00", "Z")
141        report_timestamp = generated_at.strftime("%Y%m%d_%H%M%S")
142
143        summary_text = self._stringify(analysis.get("summary"))
144        executive_summary = self._stringify(analysis.get("executive_summary") or summary_text)
145
146        per_artifact = self._normalize_per_artifact_findings(analysis)
147        evidence_summary = self._build_evidence_summary(metadata, hashes)
148        hash_verification = self._resolve_hash_verification(hashes)
149
150        render_context = {
151            "case_name": case_name,
152            "case_id": case_id,
153            "generated_at": generated_iso,
154            "tool_version": self._resolve_tool_version(analysis, audit_entries),
155            "ai_provider": self._resolve_ai_provider(analysis),
156            "logo_data_uri": self._resolve_logo_data_uri(),
157            "evidence": evidence_summary,
158            "hash_verification": hash_verification,
159            "investigation_context": self._stringify(investigation_context, default="No investigation context provided."),
160            "executive_summary": executive_summary,
161            "per_artifact_findings": per_artifact,
162            "audit_entries": audit_entries,
163        }
164
165        rendered = self.template.render(**render_context)
166
167        report_dir = self.cases_root / case_id / "reports"
168        report_dir.mkdir(parents=True, exist_ok=True)
169        report_path = report_dir / f"report_{report_timestamp}.html"
170        report_path.write_text(rendered, encoding="utf-8")
171        return report_path
172
173    def _resolve_logo_data_uri(self) -> str:
174        """Locate the project logo and return it as a base64 ``data:`` URI.
175
176        Returns:
177            A ``data:image/...;base64,...`` string, or ``""`` if no logo found.
178        """
179        project_root = Path(__file__).resolve().parents[2]
180        images_dir = project_root / "images"
181        if not images_dir.is_dir():
182            return ""
183
184        for filename in LOGO_FILE_CANDIDATES:
185            candidate = images_dir / filename
186            if candidate.is_file():
187                return self._file_to_data_uri(candidate)
188
189        fallback_images = sorted(
190            path
191            for path in images_dir.iterdir()
192            if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".svg"}
193        )
194        if fallback_images:
195            return self._file_to_data_uri(fallback_images[0])
196
197        return ""
198
199    @staticmethod
200    def _file_to_data_uri(path: Path) -> str:
201        """Read a file and encode it as a base64 data URI string.
202
203        Args:
204            path: Path to the image file.
205
206        Returns:
207            A ``data:<mime>;base64,...`` URI string.
208        """
209        mime_types = {
210            ".png": "image/png",
211            ".jpg": "image/jpeg",
212            ".jpeg": "image/jpeg",
213            ".webp": "image/webp",
214            ".svg": "image/svg+xml",
215        }
216        mime = mime_types.get(path.suffix.lower(), "application/octet-stream")
217        encoded = base64.b64encode(path.read_bytes()).decode("ascii")
218        return f"data:{mime};base64,{encoded}"
219
220    def _resolve_case_id(
221        self,
222        analysis: Mapping[str, Any],
223        metadata: Mapping[str, Any],
224        hashes: Mapping[str, Any],
225    ) -> str:
226        """Extract and sanitise a case ID from the available data sources.
227
228        Raises:
229            ValueError: If no case identifier can be determined.
230        """
231        candidates = [
232            analysis.get("case_id"),
233            analysis.get("id"),
234            hashes.get("case_id"),
235            metadata.get("case_id"),
236        ]
237
238        nested_case = analysis.get("case")
239        if isinstance(nested_case, Mapping):
240            candidates.extend([nested_case.get("id"), nested_case.get("case_id")])
241
242        for candidate in candidates:
243            value = self._stringify(candidate, default="")
244            if value:
245                safe = SAFE_CASE_ID_PATTERN.sub("_", value).strip("_")
246                if safe:
247                    return safe
248
249        raise ValueError("Unable to determine case identifier for report generation.")
250
251    def _resolve_case_name(self, analysis: Mapping[str, Any]) -> str:
252        """Determine a human-readable case name, falling back to a default."""
253        nested_case = analysis.get("case")
254        if isinstance(nested_case, Mapping):
255            nested_name = self._stringify(nested_case.get("name"), default="")
256            if nested_name:
257                return nested_name
258
259        return self._stringify(analysis.get("case_name"), default=DEFAULT_CASE_NAME)
260
261    def _resolve_tool_version(
262        self,
263        analysis: Mapping[str, Any],
264        audit_entries: list[dict[str, str]],
265    ) -> str:
266        """Determine the tool version from analysis data or audit entries."""
267        explicit_version = self._stringify(analysis.get("tool_version"), default="")
268        if explicit_version:
269            return explicit_version
270
271        for entry in reversed(audit_entries):
272            version = self._stringify(entry.get("tool_version"), default="")
273            if version:
274                return version
275
276        return DEFAULT_TOOL_VERSION
277
278    def _resolve_ai_provider(self, analysis: Mapping[str, Any]) -> str:
279        """Determine the AI provider label for the report header."""
280        explicit = self._stringify(analysis.get("ai_provider"), default="")
281        if explicit:
282            return explicit
283
284        model_info = analysis.get("model_info")
285        if isinstance(model_info, Mapping):
286            provider = self._stringify(model_info.get("provider"), default=DEFAULT_AI_PROVIDER)
287            model = self._stringify(model_info.get("model"), default="")
288            if model:
289                return f"{provider} ({model})"
290            return provider
291
292        return DEFAULT_AI_PROVIDER
293
294    def _build_evidence_summary(
295        self,
296        metadata: Mapping[str, Any],
297        hashes: Mapping[str, Any],
298    ) -> dict[str, str]:
299        """Assemble evidence summary fields for the report template.
300
301        Returns:
302            Dictionary with ``filename``, ``sha256``, ``md5``, ``file_size``,
303            ``hostname``, ``os_version``, ``domain``, and ``ips``.
304        """
305        hostname = self._stringify(metadata.get("hostname"), default="Unknown")
306        os_value = self._stringify(metadata.get("os_version") or metadata.get("os"), default="Unknown")
307        domain = self._stringify(metadata.get("domain"), default="Unknown")
308        ips = self._stringify_ips(metadata.get("ips") or metadata.get("ip_addresses") or metadata.get("ip"))
309
310        size_value = hashes.get("size_bytes")
311        if size_value is None:
312            size_value = hashes.get("file_size_bytes")
313
314        return {
315            "filename": self._stringify(
316                hashes.get("filename") or hashes.get("file_name") or metadata.get("filename"),
317                default="Unknown",
318            ),
319            "sha256": self._stringify(hashes.get("sha256"), default="N/A"),
320            "md5": self._stringify(hashes.get("md5"), default="N/A"),
321            "file_size": self._format_file_size(size_value),
322            "hostname": hostname,
323            "os_version": os_value,
324            "domain": domain,
325            "ips": ips,
326        }
327
328    def _resolve_hash_verification(self, hashes: Mapping[str, Any]) -> dict[str, str | bool]:
329        """Determine hash verification PASS/FAIL status for the report.
330
331        Returns:
332            Dictionary with ``passed`` (bool), ``label`` (``"PASS"`` or
333            ``"FAIL"``), and ``detail`` (human-readable explanation).
334        """
335        explicit = hashes.get("hash_verified")
336        if explicit is None:
337            explicit = hashes.get("verification_passed")
338        if explicit is None:
339            explicit = hashes.get("verified")
340
341        if isinstance(explicit, str) and explicit.strip().lower() == "skipped":
342            return {
343                "passed": True,
344                "skipped": True,
345                "label": "SKIPPED",
346                "detail": "Hash computation was skipped at user request during evidence intake.",
347            }
348        if isinstance(explicit, bool):
349            passed = explicit
350            detail = "Hash verification explicitly reported by workflow."
351            return {"passed": passed, "label": "PASS" if passed else "FAIL", "detail": detail}
352        if isinstance(explicit, str):
353            normalized_explicit = explicit.strip().lower()
354            if normalized_explicit in {"true", "pass", "passed", "ok", "yes"}:
355                return {
356                    "passed": True,
357                    "label": "PASS",
358                    "detail": "Hash verification explicitly reported by workflow.",
359                }
360            if normalized_explicit in {"false", "fail", "failed", "no"}:
361                return {
362                    "passed": False,
363                    "label": "FAIL",
364                    "detail": "Hash verification explicitly reported by workflow.",
365                }
366
367        expected = self._stringify(
368            hashes.get("expected_sha256") or hashes.get("intake_sha256") or hashes.get("original_sha256"),
369            default="",
370        ).lower()
371        observed = self._stringify(
372            hashes.get("reverified_sha256") or hashes.get("current_sha256") or hashes.get("computed_sha256"),
373            default="",
374        ).lower()
375
376        if expected and observed:
377            passed = expected == observed
378            detail = "Re-verified SHA-256 matches intake hash." if passed else "Re-verified SHA-256 does not match intake hash."
379            return {"passed": passed, "label": "PASS" if passed else "FAIL", "detail": detail}
380
381        return {
382            "passed": False,
383            "label": "FAIL",
384            "detail": "Insufficient data to validate hash integrity.",
385        }
386
387    def _normalize_per_artifact_findings(self, analysis: Mapping[str, Any]) -> list[dict[str, Any]]:
388        """Normalise per-artifact findings into a uniform list of dicts.
389
390        Accepts lists, dicts keyed by artifact name, or single-finding
391        mappings and coerces them into a list with consistent keys.
392
393        Returns:
394            List of dicts with ``artifact_name``, ``artifact_key``,
395            ``analysis``, ``record_count``, ``time_range_start``,
396            ``time_range_end``, ``key_data_points``, ``confidence_label``,
397            and ``confidence_class``.
398        """
399        raw_findings = analysis.get("per_artifact")
400        if raw_findings is None:
401            raw_findings = analysis.get("per_artifact_findings")
402
403        findings: list[dict[str, Any]] = []
404        iterable = self._coerce_per_artifact_iterable(raw_findings)
405
406        for index, finding in enumerate(iterable, start=1):
407            if not isinstance(finding, Mapping):
408                continue
409
410            artifact_name = self._stringify(
411                finding.get("artifact_name") or finding.get("name") or finding.get("artifact_key"),
412                default=f"Artifact {index}",
413            )
414            artifact_key = self._stringify(finding.get("artifact_key"), default="")
415            analysis_text = self._stringify(
416                finding.get("analysis") or finding.get("findings") or finding.get("text"),
417                default="No findings were provided.",
418            )
419            confidence_label, confidence_class = self._resolve_confidence(
420                self._stringify(finding.get("confidence"), default=""),
421                analysis_text,
422            )
423
424            time_range_start = self._stringify(
425                finding.get("time_range_start") or self._nested_lookup(finding, ("time_range", "start")),
426                default="N/A",
427            )
428            time_range_end = self._stringify(
429                finding.get("time_range_end") or self._nested_lookup(finding, ("time_range", "end")),
430                default="N/A",
431            )
432            record_count = self._stringify(finding.get("record_count"), default="N/A")
433            key_data_points = self._normalize_key_data_points(
434                finding.get("key_data_points") or finding.get("key_points") or finding.get("data_points")
435            )
436
437            findings.append(
438                {
439                    "artifact_name": artifact_name,
440                    "artifact_key": artifact_key,
441                    "analysis": analysis_text,
442                    "record_count": record_count,
443                    "time_range_start": time_range_start,
444                    "time_range_end": time_range_end,
445                    "key_data_points": key_data_points,
446                    "confidence_label": confidence_label,
447                    "confidence_class": confidence_class,
448                }
449            )
450
451        return findings
452
453    def _coerce_per_artifact_iterable(self, raw_findings: Any) -> Sequence[Any]:
454        """Coerce various per-artifact finding shapes into a sequence."""
455        if isinstance(raw_findings, Sequence) and not isinstance(raw_findings, (str, bytes, bytearray)):
456            return raw_findings
457
458        if isinstance(raw_findings, Mapping):
459            if self._looks_like_single_finding(raw_findings):
460                return [raw_findings]
461
462            coerced: list[dict[str, Any]] = []
463            for artifact_key, raw_value in raw_findings.items():
464                if isinstance(raw_value, Mapping):
465                    merged = dict(raw_value)
466                    merged.setdefault("artifact_key", self._stringify(artifact_key, default=""))
467                    if not self._stringify(merged.get("artifact_name"), default=""):
468                        merged["artifact_name"] = self._stringify(artifact_key, default="Unknown Artifact")
469                    coerced.append(merged)
470                    continue
471
472                analysis_text = self._stringify(raw_value, default="")
473                if not analysis_text:
474                    continue
475                artifact_label = self._stringify(artifact_key, default="Unknown Artifact")
476                coerced.append(
477                    {
478                        "artifact_key": artifact_label,
479                        "artifact_name": artifact_label,
480                        "analysis": analysis_text,
481                    }
482                )
483            return coerced
484
485        return []
486
487    @staticmethod
488    def _looks_like_single_finding(value: Mapping[str, Any]) -> bool:
489        """Return *True* if *value* appears to be a single finding mapping."""
490        finding_keys = {
491            "artifact_name",
492            "name",
493            "artifact_key",
494            "analysis",
495            "findings",
496            "text",
497            "record_count",
498            "time_range_start",
499            "time_range_end",
500            "time_range",
501            "key_data_points",
502            "key_points",
503            "data_points",
504            "confidence",
505        }
506        return any(key in value for key in finding_keys)
507
508    def _normalize_key_data_points(self, raw_points: Any) -> list[dict[str, str]]:
509        """Normalise key data points into a list of ``{timestamp, value}`` dicts."""
510        if isinstance(raw_points, Sequence) and not isinstance(raw_points, (str, bytes, bytearray)):
511            points: list[dict[str, str]] = []
512            for point in raw_points:
513                if isinstance(point, Mapping):
514                    timestamp = self._stringify(
515                        point.get("timestamp") or point.get("time") or point.get("date") or point.get("ts"),
516                        default="",
517                    )
518                    value = self._stringify(
519                        point.get("value") or point.get("data") or point.get("detail") or point.get("event"),
520                        default="",
521                    )
522                    if not value:
523                        value = self._mapping_to_kv_text(point)
524                    points.append({"timestamp": timestamp, "value": value})
525                else:
526                    text_value = self._stringify(point, default="")
527                    if text_value:
528                        points.append({"timestamp": "", "value": text_value})
529            return points
530
531        if isinstance(raw_points, Mapping):
532            return [{"timestamp": "", "value": self._mapping_to_kv_text(raw_points)}]
533
534        if raw_points is None:
535            return []
536
537        text_value = self._stringify(raw_points, default="")
538        if text_value:
539            return [{"timestamp": "", "value": text_value}]
540        return []
541
542    def _normalize_audit_entries(self, entries: Sequence[Any] | None) -> list[dict[str, str]]:
543        """Normalise raw audit log entries into template-ready dicts."""
544        if entries is None:
545            return []
546
547        normalized: list[dict[str, str]] = []
548        for entry in entries:
549            mapping = self._coerce_mapping(entry)
550            if mapping is None:
551                continue
552
553            details_value = mapping.get("details")
554            if isinstance(details_value, Mapping):
555                details_text = json.dumps(details_value, sort_keys=True, indent=2)
556                details_is_structured = True
557            elif isinstance(details_value, Sequence) and not isinstance(details_value, (str, bytes, bytearray)):
558                details_text = json.dumps(list(details_value), indent=2)
559                details_is_structured = True
560            else:
561                details_text = self._stringify(details_value, default="")
562                details_is_structured = False
563
564            normalized.append(
565                {
566                    "timestamp": self._stringify(mapping.get("timestamp"), default="N/A"),
567                    "action": self._stringify(mapping.get("action"), default="unknown"),
568                    "details": details_text,
569                    "details_is_structured": details_is_structured,
570                    "tool_version": self._stringify(mapping.get("tool_version"), default=""),
571                }
572            )
573
574        return normalized
575
576    @staticmethod
577    def _resolve_confidence(explicit_value: str, analysis_text: str) -> tuple[str, str]:
578        """Determine confidence label and CSS class from explicit value or text.
579
580        Returns:
581            Tuple of ``(label, css_class)`` -- e.g. ``("HIGH", "confidence-high")``.
582        """
583        if explicit_value:
584            label = explicit_value.strip().upper()
585            if label in CONFIDENCE_CLASS_MAP:
586                return label, CONFIDENCE_CLASS_MAP[label]
587
588        match = CONFIDENCE_PATTERN.search(analysis_text or "")
589        if match:
590            label = match.group(1).upper()
591            return label, CONFIDENCE_CLASS_MAP[label]
592
593        return "UNSPECIFIED", "confidence-unknown"
594
595    @staticmethod
596    def _nested_lookup(mapping: Mapping[str, Any], path: tuple[str, str]) -> Any:
597        """Traverse a nested mapping using a two-element key path."""
598        current: Any = mapping
599        for key in path:
600            if not isinstance(current, Mapping):
601                return None
602            current = current.get(key)
603        return current
604
605    @staticmethod
606    def _coerce_mapping(value: Any) -> dict[str, Any] | None:
607        """Attempt to coerce *value* into a plain dict, or return *None*."""
608        if isinstance(value, Mapping):
609            return dict(value)
610        if isinstance(value, str):
611            stripped = value.strip()
612            if not stripped:
613                return None
614            try:
615                parsed = json.loads(stripped)
616            except json.JSONDecodeError:
617                return None
618            if isinstance(parsed, Mapping):
619                return dict(parsed)
620        return None
621
622    @staticmethod
623    def _format_file_size(size_value: Any) -> str:
624        """Format a byte count as a human-readable size string (e.g. ``1.50 GB``)."""
625        if size_value is None:
626            return "N/A"
627
628        try:
629            size = int(size_value)
630        except (TypeError, ValueError):
631            return str(size_value)
632
633        units = ["B", "KB", "MB", "GB", "TB"]
634        working = float(size)
635        unit = units[0]
636        for candidate in units:
637            unit = candidate
638            if working < 1024.0 or candidate == units[-1]:
639                break
640            working /= 1024.0
641
642        if unit == "B":
643            return f"{int(working)} {unit}"
644        return f"{working:.2f} {unit} ({size} bytes)"
645
646    @staticmethod
647    def _stringify_ips(value: Any) -> str:
648        """Format IP addresses as a comma-separated string."""
649        if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)):
650            cleaned = [str(item).strip() for item in value if str(item).strip()]
651            return ", ".join(cleaned) if cleaned else "Unknown"
652
653        text = str(value).strip() if value is not None else ""
654        return text or "Unknown"
655
656    @staticmethod
657    def _mapping_to_kv_text(value: Mapping[str, Any]) -> str:
658        """Convert a mapping to a ``key=value; ...`` text representation."""
659        parts = [
660            f"{str(key)}={str(item)}"
661            for key, item in value.items()
662            if item not in (None, "")
663        ]
664        return "; ".join(parts)
665
666    @staticmethod
667    def _stringify(value: Any, default: str = "") -> str:
668        """Convert *value* to a stripped string, returning *default* if empty."""
669        if value is None:
670            return default
671        text = str(value).strip()
672        return text if text else default

Render investigation results into a standalone HTML report.

Sets up a Jinja2 ~jinja2.Environment with custom filters for Markdown-to-HTML conversion and confidence token highlighting. The generate() method assembles all case data into a template context and writes the rendered HTML to the case's reports/ directory.

Attributes:
  • templates_dir: Directory containing Jinja2 HTML templates.
  • cases_root: Parent directory where case subdirectories live.
  • environment: Configured Jinja2 rendering environment.
  • template: The loaded report template object.
ReportGenerator( templates_dir: str | pathlib.Path | None = None, cases_root: str | pathlib.Path | None = None, template_name: str = 'report_template.html')
72    def __init__(
73        self,
74        templates_dir: str | Path | None = None,
75        cases_root: str | Path | None = None,
76        template_name: str = "report_template.html",
77    ) -> None:
78        """Initialise the report generator.
79
80        Args:
81            templates_dir: Path to the Jinja2 templates directory.  Defaults
82                to ``<project_root>/templates/``.
83            cases_root: Parent directory for case output.  Defaults to
84                ``<project_root>/cases/``.
85            template_name: Filename of the Jinja2 report template.
86        """
87        project_root = Path(__file__).resolve().parents[2]
88        self.templates_dir = Path(templates_dir) if templates_dir is not None else project_root / "templates"
89        self.cases_root = Path(cases_root) if cases_root is not None else project_root / "cases"
90
91        self.environment = Environment(
92            loader=FileSystemLoader(str(self.templates_dir)),
93            autoescape=select_autoescape(["html", "xml"]),
94            trim_blocks=True,
95            lstrip_blocks=True,
96        )
97        self.environment.filters["format_block"] = format_block
98        self.environment.filters["format_markdown_block"] = format_markdown_block
99        self.template = self.environment.get_template(template_name)

Initialise the report generator.

Arguments:
  • templates_dir: Path to the Jinja2 templates directory. Defaults to <project_root>/templates/.
  • cases_root: Parent directory for case output. Defaults to <project_root>/cases/.
  • template_name: Filename of the Jinja2 report template.
templates_dir
cases_root
environment
template
def generate( self, analysis_results: dict[str, typing.Any], image_metadata: dict[str, typing.Any], evidence_hashes: dict[str, typing.Any], investigation_context: str, audit_log_entries: list[dict[str, typing.Any]]) -> pathlib.Path:
101    def generate(
102        self,
103        analysis_results: dict[str, Any],
104        image_metadata: dict[str, Any],
105        evidence_hashes: dict[str, Any],
106        investigation_context: str,
107        audit_log_entries: list[dict[str, Any]],
108    ) -> Path:
109        """Generate a standalone HTML report and write it to disk.
110
111        Assembles evidence metadata, AI analysis, hash verification, and
112        the audit trail into a Jinja2 template context, renders the HTML,
113        and writes the output to ``cases/<case_id>/reports/``.
114
115        Args:
116            analysis_results: Dictionary containing per-artifact findings,
117                executive summary, model info, and case identifiers.
118            image_metadata: System metadata from the disk image (hostname,
119                OS version, domain, IPs, etc.).
120            evidence_hashes: Hash digests and verification status from
121                evidence intake.
122            investigation_context: Free-text description of the
123                investigation scope and timeline.
124            audit_log_entries: List of audit trail JSONL records.
125
126        Returns:
127            :class:`~pathlib.Path` to the generated HTML report file.
128
129        Raises:
130            ValueError: If a case identifier cannot be determined.
131        """
132        analysis = dict(analysis_results or {})
133        metadata = dict(image_metadata or {})
134        hashes = dict(evidence_hashes or {})
135        audit_entries = self._normalize_audit_entries(audit_log_entries)
136
137        case_id = self._resolve_case_id(analysis, metadata, hashes)
138        case_name = self._resolve_case_name(analysis)
139        generated_at = datetime.now(timezone.utc)
140        generated_iso = generated_at.isoformat(timespec="seconds").replace("+00:00", "Z")
141        report_timestamp = generated_at.strftime("%Y%m%d_%H%M%S")
142
143        summary_text = self._stringify(analysis.get("summary"))
144        executive_summary = self._stringify(analysis.get("executive_summary") or summary_text)
145
146        per_artifact = self._normalize_per_artifact_findings(analysis)
147        evidence_summary = self._build_evidence_summary(metadata, hashes)
148        hash_verification = self._resolve_hash_verification(hashes)
149
150        render_context = {
151            "case_name": case_name,
152            "case_id": case_id,
153            "generated_at": generated_iso,
154            "tool_version": self._resolve_tool_version(analysis, audit_entries),
155            "ai_provider": self._resolve_ai_provider(analysis),
156            "logo_data_uri": self._resolve_logo_data_uri(),
157            "evidence": evidence_summary,
158            "hash_verification": hash_verification,
159            "investigation_context": self._stringify(investigation_context, default="No investigation context provided."),
160            "executive_summary": executive_summary,
161            "per_artifact_findings": per_artifact,
162            "audit_entries": audit_entries,
163        }
164
165        rendered = self.template.render(**render_context)
166
167        report_dir = self.cases_root / case_id / "reports"
168        report_dir.mkdir(parents=True, exist_ok=True)
169        report_path = report_dir / f"report_{report_timestamp}.html"
170        report_path.write_text(rendered, encoding="utf-8")
171        return report_path

Generate a standalone HTML report and write it to disk.

Assembles evidence metadata, AI analysis, hash verification, and the audit trail into a Jinja2 template context, renders the HTML, and writes the output to cases/<case_id>/reports/.

Arguments:
  • analysis_results: Dictionary containing per-artifact findings, executive summary, model info, and case identifiers.
  • image_metadata: System metadata from the disk image (hostname, OS version, domain, IPs, etc.).
  • evidence_hashes: Hash digests and verification status from evidence intake.
  • investigation_context: Free-text description of the investigation scope and timeline.
  • audit_log_entries: List of audit trail JSONL records.
Returns:

~pathlib.Path to the generated HTML report file.

Raises:
  • ValueError: If a case identifier cannot be determined.