app.analyzer.core

AI analysis orchestration module for forensic triage.

Implements the ForensicAnalyzer class that orchestrates the full analysis pipeline: token budgeting, column projection, deduplication, chunked analysis, citation validation, IOC extraction, and audit logging.

Sub-module organisation:

  • analyzer_constants: Compile-time constants, regex, prompt templates.
  • analyzer_utils: Pure utility functions (string, datetime, CSV).
  • analyzer_ioc: IOC extraction and prompt-building helpers.
  • analyzer_citations: Citation validation against source CSV.
  • analyzer_data_prep: Dedup, statistics, prompt assembly.
  • analyzer_chunking: Chunked analysis and hierarchical merge.
  • analyzer_prompts: Prompt template loading and construction.
Attributes:
  • PROJECT_ROOT (Path): Re-exported from analyzer_constants.
   1"""AI analysis orchestration module for forensic triage.
   2
   3Implements the ``ForensicAnalyzer`` class that orchestrates the full analysis
   4pipeline: token budgeting, column projection, deduplication,
   5chunked analysis, citation validation, IOC extraction, and audit logging.
   6
   7Sub-module organisation:
   8
   9- ``analyzer_constants``: Compile-time constants, regex, prompt templates.
  10- ``analyzer_utils``: Pure utility functions (string, datetime, CSV).
  11- ``analyzer_ioc``: IOC extraction and prompt-building helpers.
  12- ``analyzer_citations``: Citation validation against source CSV.
  13- ``analyzer_data_prep``: Dedup, statistics, prompt assembly.
  14- ``analyzer_chunking``: Chunked analysis and hierarchical merge.
  15- ``analyzer_prompts``: Prompt template loading and construction.
  16
  17Attributes:
  18    PROJECT_ROOT (Path): Re-exported from ``analyzer_constants``.
  19"""
  20
  21from __future__ import annotations
  22
  23import logging
  24from pathlib import Path
  25from time import perf_counter, sleep
  26from typing import Any, Callable, Iterable, Mapping
  27
  28from ..ai_providers import AIProviderError, create_provider
  29from .chunking import analyze_artifact_chunked, split_csv_and_suffix, split_csv_into_chunks
  30from .citations import match_column_name, timestamp_found_in_csv, timestamp_lookup_keys, validate_citations
  31from .constants import (
  32    AI_MAX_TOKENS, AI_RETRY_ATTEMPTS, AI_RETRY_BASE_DELAY,
  33    ARTIFACT_DEDUPLICATION_ENABLED, CITATION_SPOT_CHECK_LIMIT,
  34    DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH, DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
  35    DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
  36    DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, DEFAULT_SUMMARY_PROMPT_TEMPLATE,
  37    DEFAULT_SYSTEM_PROMPT, MAX_MERGE_ROUNDS, PROJECT_ROOT, TOKEN_CHAR_RATIO,
  38    UnavailableProvider,
  39)
  40from .data_prep import (
  41    build_artifact_csv_attachment, build_full_data_csv, compute_statistics,
  42    deduplicate_rows_for_analysis, prepare_artifact_data,
  43)
  44from .ioc import build_priority_directives, extract_ioc_targets, format_ioc_targets
  45from .prompts import (
  46    build_summary_prompt, load_artifact_ai_column_projections,
  47    load_artifact_instruction_prompts, load_prompt_template,
  48    resolve_artifact_ai_columns_config_path,
  49)
  50from .utils import (
  51    build_datetime, coerce_projection_columns, emit_analysis_progress,
  52    estimate_tokens, is_dedup_safe_identifier_column, normalize_artifact_key,
  53    normalize_os_type, read_bool_setting, read_int_setting, read_path_setting,
  54    sanitize_filename, stringify_value,
  55)
  56
  57LOGGER = logging.getLogger(__name__)
  58
  59try:
  60    from ..parser import LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY
  61except Exception as error:
  62    LOGGER.warning(
  63        "Failed to import artifact registries from app.parser: %s. "
  64        "Artifact metadata lookups will be unavailable.",
  65        error,
  66    )
  67    WINDOWS_ARTIFACT_REGISTRY: dict[str, dict[str, str]] = {}
  68    LINUX_ARTIFACT_REGISTRY: dict[str, dict[str, str]] = {}
  69
  70__all__ = ["AnalysisCancelledError", "ForensicAnalyzer"]
  71
  72
  73class AnalysisCancelledError(Exception):
  74    """Raised when analysis is cancelled by the user."""
  75
  76
  77class ForensicAnalyzer:
  78    """Orchestrates AI-powered forensic analysis of parsed artifact CSV data.
  79
  80    Central analysis engine for AIFT: reads parsed artifact CSV files, applies
  81    column projection, and deduplication, builds token-budgeted
  82    prompts, sends them to a configured AI provider, and validates citations.
  83
  84    Attributes:
  85        case_dir: Path to the case directory, or ``None``.
  86        config: Merged configuration dictionary.
  87        ai_provider: The configured AI provider instance.
  88        model_info: Dict with ``provider`` and ``model`` keys.
  89    """
  90
  91    # Expose extracted functions as static methods for backward compatibility
  92    # with tests and callers that use ForensicAnalyzer._method_name().
  93    _stringify_value = staticmethod(stringify_value)
  94    _build_datetime = staticmethod(build_datetime)
  95    _normalize_artifact_key = staticmethod(normalize_artifact_key)
  96    _sanitize_filename = staticmethod(sanitize_filename)
  97    _split_csv_into_chunks = staticmethod(split_csv_into_chunks)
  98    _split_csv_and_suffix = staticmethod(split_csv_and_suffix)
  99    _coerce_projection_columns = staticmethod(coerce_projection_columns)
 100    _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column)
 101    _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys)
 102    _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv)
 103    _match_column_name = staticmethod(match_column_name)
 104    _emit_analysis_progress = staticmethod(emit_analysis_progress)
 105
 106    def __init__(
 107        self,
 108        case_dir: str | Path | Mapping[str, str | Path] | None = None,
 109        config: Mapping[str, Any] | None = None,
 110        audit_logger: Any | None = None,
 111        artifact_csv_paths: Mapping[str, str | Path] | None = None,
 112        prompts_dir: str | Path | None = None,
 113        random_seed: int | None = None,
 114        os_type: str = "windows",
 115    ) -> None:
 116        """Initialize the forensic analyzer with case context and configuration.
 117
 118        Args:
 119            case_dir: Path to the case directory, or a mapping of artifact
 120                keys to CSV paths (convenience shorthand).
 121            config: Application configuration dictionary.
 122            audit_logger: Optional object with a ``log(action, details)``
 123                method.
 124            artifact_csv_paths: Mapping of artifact keys to CSV paths.
 125            prompts_dir: Directory containing prompt template files.
 126            random_seed: Optional seed for the internal RNG.
 127            os_type: Detected operating system type (``"windows"``,
 128                ``"linux"``, etc.).  Controls which artifact instruction
 129                prompts are loaded.
 130        """
 131        if (
 132            isinstance(case_dir, Mapping)
 133            and config is None
 134            and audit_logger is None
 135            and artifact_csv_paths is None
 136        ):
 137            artifact_csv_paths = case_dir
 138            case_dir = None
 139
 140        self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None
 141        self.logger = LOGGER
 142        self.config = dict(config) if isinstance(config, Mapping) else {}
 143        self.audit_logger = audit_logger
 144        self.artifact_csv_paths: dict[str, Path | list[Path]] = {}
 145        for artifact_key, csv_path in (artifact_csv_paths or {}).items():
 146            key = str(artifact_key)
 147            if isinstance(csv_path, list):
 148                self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path]
 149            else:
 150                self.artifact_csv_paths[key] = Path(str(csv_path))
 151        self._analysis_input_csv_paths: dict[str, Path] = {}
 152        self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts"
 153        self.os_type = normalize_os_type(os_type)
 154        import random
 155        self._random = random.Random(random_seed)
 156        self._load_analysis_settings()
 157        self.artifact_ai_column_projections = self._load_artifact_ai_column_projections()
 158        self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT)
 159        self.artifact_prompt_template = self._load_prompt_template(
 160            "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
 161        )
 162        self.artifact_prompt_template_small_context = self._load_prompt_template(
 163            "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT,
 164        )
 165        self.artifact_instruction_prompts = self._load_artifact_instruction_prompts()
 166        self.summary_prompt_template = self._load_prompt_template(
 167            "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE,
 168        )
 169        self.chunk_merge_prompt_template = self._load_prompt_template(
 170            "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
 171        )
 172        self.ai_provider = self._create_ai_provider()
 173        self.model_info = self._read_model_info()
 174
 175    # ------------------------------------------------------------------
 176    # Configuration loading
 177    # ------------------------------------------------------------------
 178
 179    def _load_analysis_settings(self) -> None:
 180        """Load and validate analysis tuning parameters from the config dict."""
 181        analysis_config = self.config.get("analysis")
 182        if not isinstance(analysis_config, Mapping):
 183            analysis_config = {}
 184
 185        self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1)
 186        self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2))
 187        legacy_shortened = read_int_setting(
 188            analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1,
 189        )
 190        self.shortened_prompt_cutoff_tokens = read_int_setting(
 191            analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1,
 192        )
 193        self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6)
 194        self.citation_spot_check_limit = read_int_setting(
 195            analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1,
 196        )
 197        self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1)
 198        self.artifact_deduplication_enabled = read_bool_setting(
 199            analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED,
 200        )
 201        self.artifact_ai_columns_config_path = read_path_setting(
 202            analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH),
 203        )
 204
 205    # Backward-compatible aliases for the extracted config readers.
 206    _read_int_setting = staticmethod(read_int_setting)
 207    _read_bool_setting = staticmethod(read_bool_setting)
 208    _read_path_setting = staticmethod(read_path_setting)
 209
 210    def _resolve_artifact_ai_columns_config_path(self) -> Path:
 211        """Resolve the artifact AI columns config path to an absolute Path.
 212
 213        Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`.
 214
 215        Returns:
 216            Resolved absolute ``Path`` to the YAML config file.
 217        """
 218        return resolve_artifact_ai_columns_config_path(
 219            self.artifact_ai_columns_config_path, self.case_dir,
 220        )
 221
 222    def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]:
 223        """Load per-artifact column projection configuration from YAML.
 224
 225        Delegates to :func:`prompts.load_artifact_ai_column_projections`.
 226
 227        Returns:
 228            A dict mapping normalized artifact keys to tuples of column names.
 229        """
 230        config_path = self._resolve_artifact_ai_columns_config_path()
 231        return load_artifact_ai_column_projections(config_path, os_type=self.os_type)
 232
 233    def _load_prompt_template(self, filename: str, default: str) -> str:
 234        """Read a prompt template file from the prompts directory.
 235
 236        Delegates to :func:`prompts.load_prompt_template`.
 237
 238        Args:
 239            filename: Name of the template file.
 240            default: Fallback template string.
 241
 242        Returns:
 243            The template text.
 244        """
 245        return load_prompt_template(self.prompts_dir, filename, default)
 246
 247    def _load_artifact_instruction_prompts(self) -> dict[str, str]:
 248        """Load per-artifact analysis instruction prompts.
 249
 250        Delegates to :func:`prompts.load_artifact_instruction_prompts`,
 251        passing :attr:`os_type` so the correct OS-specific instruction
 252        directory is selected.
 253
 254        Returns:
 255            A dict mapping artifact keys to instruction prompt text.
 256        """
 257        return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type)
 258
 259    # ------------------------------------------------------------------
 260    # AI provider
 261    # ------------------------------------------------------------------
 262
 263    def _create_ai_provider(self) -> Any:
 264        """Instantiate the configured AI provider, or a fallback on failure.
 265
 266        Returns:
 267            An AI provider instance, or an ``UnavailableProvider``.
 268        """
 269        provider_config: Mapping[str, Any]
 270        if self.config:
 271            provider_config = self.config
 272        else:
 273            provider_config = {
 274                "ai": {
 275                    "provider": "local",
 276                    "local": {
 277                        "base_url": "http://localhost:11434/v1",
 278                        "model": "llama3.1:70b",
 279                        "api_key": "not-needed",
 280                    },
 281                }
 282            }
 283        try:
 284            return create_provider(dict(provider_config))
 285        except Exception as error:
 286            return UnavailableProvider(str(error))
 287
 288    def _read_model_info(self) -> dict[str, str]:
 289        """Read provider and model metadata from the AI provider.
 290
 291        Returns:
 292            A dict with at least ``provider`` and ``model`` keys.
 293        """
 294        try:
 295            model_info = self.ai_provider.get_model_info()
 296        except Exception:
 297            return {"provider": "unknown", "model": "unknown"}
 298
 299        if not isinstance(model_info, Mapping):
 300            return {"provider": "unknown", "model": "unknown"}
 301
 302        return {str(key): str(value) for key, value in model_info.items()}
 303
 304    def _call_ai_with_retry(self, call: Callable[[], str]) -> str:
 305        """Call the AI provider with retry on transient failures.
 306
 307        Args:
 308            call: A zero-argument callable that invokes the AI provider.
 309
 310        Returns:
 311            The AI provider's response string.
 312
 313        Raises:
 314            AIProviderError: If the provider raises a permanent error.
 315            Exception: The last transient error after all retries.
 316        """
 317        last_error: Exception | None = None
 318        for attempt in range(AI_RETRY_ATTEMPTS):
 319            try:
 320                return call()
 321            except AIProviderError:
 322                raise
 323            except Exception as error:
 324                last_error = error
 325                if attempt < AI_RETRY_ATTEMPTS - 1:
 326                    delay = AI_RETRY_BASE_DELAY * (2 ** attempt)
 327                    self.logger.warning(
 328                        "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s",
 329                        attempt + 1, AI_RETRY_ATTEMPTS, delay, error,
 330                    )
 331                    sleep(delay)
 332        raise last_error  # type: ignore[misc]
 333
 334    # ------------------------------------------------------------------
 335    # Audit / prompt saving
 336    # ------------------------------------------------------------------
 337
 338    def _audit_log(self, action: str, details: dict[str, Any]) -> None:
 339        """Write an entry to the forensic audit trail.
 340
 341        Args:
 342            action: The audit action name.
 343            details: Key-value details for the audit entry.
 344        """
 345        if self.audit_logger is None:
 346            return
 347        logger = getattr(self.audit_logger, "log", None)
 348        if not callable(logger):
 349            return
 350        try:
 351            logger(action, details)
 352        except Exception:
 353            return
 354
 355    def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None:
 356        """Save a prompt to the case prompts directory for audit.
 357
 358        Args:
 359            filename: Output filename.
 360            system_prompt: The system prompt text.
 361            user_prompt: The user prompt text.
 362        """
 363        if self.case_dir is None:
 364            return
 365        prompts_dir = self.case_dir / "prompts"
 366        try:
 367            prompts_dir.mkdir(parents=True, exist_ok=True)
 368            prompt_path = prompts_dir / filename
 369            prompt_path.write_text(
 370                f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n",
 371                encoding="utf-8",
 372            )
 373        except OSError:
 374            self.logger.warning("Failed to save prompt to %s", prompts_dir / filename)
 375
 376    # ------------------------------------------------------------------
 377    # Delegation methods — thin wrappers for backward compatibility.
 378    # Methods that don't use self are exposed as staticmethod assignments
 379    # at the class level (see above).  The methods below need self.
 380    # ------------------------------------------------------------------
 381
 382    def _estimate_tokens(self, text: str) -> int:
 383        """Estimate the token count of *text* using model-specific info."""
 384        return estimate_tokens(text, model_info=self.model_info)
 385
 386    # These are also exposed as staticmethods on the class (see above)
 387    # but tests may call them on instances, so they work either way.
 388    _extract_ioc_targets = staticmethod(extract_ioc_targets)
 389    _format_ioc_targets = staticmethod(format_ioc_targets)
 390    _build_priority_directives = staticmethod(build_priority_directives)
 391    _compute_statistics = staticmethod(compute_statistics)
 392    _build_full_data_csv = staticmethod(build_full_data_csv)
 393    _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis)
 394
 395    def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]:
 396        """Spot-check AI-cited values against source CSV.
 397
 398        Args:
 399            artifact_key: Artifact identifier.
 400            analysis_text: The AI's analysis text.
 401
 402        Returns:
 403            List of warning strings.
 404        """
 405        if analysis_text.startswith("Analysis failed:"):
 406            return []
 407        try:
 408            original_path = self._resolve_artifact_csv_path(artifact_key)
 409        except FileNotFoundError:
 410            return []
 411        csv_path = self._resolve_analysis_input_csv_path(
 412            artifact_key, fallback=original_path,
 413        )
 414        return validate_citations(
 415            artifact_key=artifact_key,
 416            analysis_text=analysis_text,
 417            csv_path=csv_path,
 418            citation_spot_check_limit=self.citation_spot_check_limit,
 419            audit_log_fn=self._audit_log,
 420        )
 421
 422    # ------------------------------------------------------------------
 423    # Path resolution
 424    # ------------------------------------------------------------------
 425
 426    def _resolve_artifact_csv_path(self, artifact_key: str) -> Path:
 427        """Resolve the CSV file path for a given artifact key.
 428
 429        For split artifacts with multiple CSV files, returns the first
 430        path.  Use :meth:`_resolve_all_artifact_csv_paths` to get every
 431        path for a split artifact.
 432
 433        Args:
 434            artifact_key: Artifact identifier to resolve.
 435
 436        Returns:
 437            A ``Path`` to the artifact's CSV file.
 438
 439        Raises:
 440            FileNotFoundError: If no CSV path can be found.
 441        """
 442        mapped = self.artifact_csv_paths.get(artifact_key)
 443        if mapped is not None:
 444            if isinstance(mapped, list):
 445                return mapped[0]
 446            return mapped
 447
 448        normalized = normalize_artifact_key(artifact_key)
 449        mapped_normalized = self.artifact_csv_paths.get(normalized)
 450        if mapped_normalized is not None:
 451            if isinstance(mapped_normalized, list):
 452                return mapped_normalized[0]
 453            return mapped_normalized
 454
 455        candidate_path = Path(artifact_key)
 456        if candidate_path.exists():
 457            return candidate_path
 458
 459        if self.case_dir is not None:
 460            parsed_dir = self.case_dir / "parsed"
 461            if parsed_dir.exists():
 462                normalized = normalize_artifact_key(artifact_key)
 463                file_stubs = {
 464                    artifact_key, normalized,
 465                    sanitize_filename(artifact_key),
 466                    sanitize_filename(normalized),
 467                }
 468                for file_stub in file_stubs:
 469                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 470                    if direct_csv_path.exists():
 471                        return direct_csv_path
 472                for file_stub in file_stubs:
 473                    prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv"))
 474                    if prefixed_paths:
 475                        return prefixed_paths[0]
 476
 477        raise FileNotFoundError(
 478            f"No CSV path mapped for artifact '{artifact_key}'. "
 479            "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths."
 480        )
 481
 482    def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]:
 483        """Resolve all CSV file paths for a given artifact key.
 484
 485        For single-file artifacts returns a one-element list.  For split
 486        artifacts (e.g. EVTX) returns all constituent CSV paths.
 487
 488        Args:
 489            artifact_key: Artifact identifier to resolve.
 490
 491        Returns:
 492            A non-empty list of ``Path`` objects.
 493
 494        Raises:
 495            FileNotFoundError: If no CSV path can be found.
 496        """
 497        for key in (artifact_key, normalize_artifact_key(artifact_key)):
 498            mapped = self.artifact_csv_paths.get(key)
 499            if mapped is not None:
 500                if isinstance(mapped, list):
 501                    return list(mapped)
 502                return [mapped]
 503
 504        # Filesystem fallback: search case_dir/parsed for all matching parts.
 505        if self.case_dir is not None:
 506            parsed_dir = self.case_dir / "parsed"
 507            if parsed_dir.exists():
 508                normalized = normalize_artifact_key(artifact_key)
 509                file_stubs = {
 510                    artifact_key, normalized,
 511                    sanitize_filename(artifact_key),
 512                    sanitize_filename(normalized),
 513                }
 514                for file_stub in file_stubs:
 515                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 516                    combined_csv_path = parsed_dir / f"{file_stub}_combined.csv"
 517                    prefixed_paths = sorted(
 518                        path
 519                        for path in parsed_dir.glob(f"{file_stub}_*.csv")
 520                        if path != combined_csv_path
 521                    )
 522                    if direct_csv_path.exists() and prefixed_paths:
 523                        return sorted([direct_csv_path] + prefixed_paths)
 524                    if prefixed_paths:
 525                        return prefixed_paths
 526                    if direct_csv_path.exists():
 527                        return [direct_csv_path]
 528
 529        # Final fallback: delegate to single-path resolver.
 530        return [self._resolve_artifact_csv_path(artifact_key)]
 531
 532    def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path:
 533        """Concatenate multiple CSV files into a single combined CSV.
 534
 535        All input files are assumed to share the same schema (column names).
 536        The combined file is written to the case's ``parsed/`` directory (or
 537        next to the first input file) with a ``_combined`` suffix.
 538
 539        Args:
 540            artifact_key: Artifact identifier (used for the output filename).
 541            csv_paths: List of CSV file paths to combine.
 542
 543        Returns:
 544            Path to the combined CSV file.
 545        """
 546        import csv as csv_mod
 547
 548        output_dir = csv_paths[0].parent
 549        safe_key = sanitize_filename(artifact_key)
 550        combined_path = output_dir / f"{safe_key}_combined.csv"
 551
 552        fieldnames: list[str] = []
 553        fieldnames_set: set[str] = set()
 554
 555        for csv_path in csv_paths:
 556            if not csv_path.exists():
 557                continue
 558            with csv_path.open("r", newline="", encoding="utf-8") as fh:
 559                reader = csv_mod.DictReader(fh)
 560                if reader.fieldnames:
 561                    for fn in reader.fieldnames:
 562                        if fn not in fieldnames_set:
 563                            fieldnames.append(fn)
 564                            fieldnames_set.add(fn)
 565
 566        with combined_path.open("w", newline="", encoding="utf-8") as out:
 567            writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore")
 568            writer.writeheader()
 569            for csv_path in csv_paths:
 570                if not csv_path.exists():
 571                    continue
 572                with csv_path.open("r", newline="", encoding="utf-8") as fh:
 573                    reader = csv_mod.DictReader(fh)
 574                    for row in reader:
 575                        writer.writerow(row)
 576
 577        return combined_path
 578
 579    def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None:
 580        """Store the analysis-input CSV path for an artifact.
 581
 582        Args:
 583            artifact_key: Artifact identifier.
 584            csv_path: Path to the analysis-input CSV.
 585        """
 586        self._analysis_input_csv_paths[artifact_key] = csv_path
 587        normalized = normalize_artifact_key(artifact_key)
 588        self._analysis_input_csv_paths[normalized] = csv_path
 589
 590    def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path:
 591        """Retrieve the analysis-input CSV path, with fallback.
 592
 593        Args:
 594            artifact_key: Artifact identifier.
 595            fallback: Default path if not stored.
 596
 597        Returns:
 598            The stored analysis-input CSV path, or *fallback*.
 599        """
 600        mapped = self._analysis_input_csv_paths.get(artifact_key)
 601        if mapped is not None:
 602            return mapped
 603        normalized = normalize_artifact_key(artifact_key)
 604        mapped = self._analysis_input_csv_paths.get(normalized)
 605        if mapped is not None:
 606            return mapped
 607        return fallback
 608
 609    def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]:
 610        """Look up artifact metadata from the OS-appropriate registry first.
 611
 612        Searches the registry matching :attr:`os_type` first so that
 613        shared keys like ``services`` resolve to the correct OS-specific
 614        entry.  Falls back to the other registry for cross-OS lookups.
 615
 616        Args:
 617            artifact_key: Artifact identifier.
 618
 619        Returns:
 620            A dict with at least ``name``, ``description``, and
 621            ``analysis_hint`` keys.
 622        """
 623        if self.os_type == "linux":
 624            registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY)
 625        else:
 626            registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY)
 627
 628        for registry in registries:
 629            if artifact_key in registry:
 630                metadata = registry[artifact_key]
 631                return {str(key): str(value) for key, value in metadata.items()}
 632
 633        normalized = normalize_artifact_key(artifact_key)
 634        for registry in registries:
 635            if normalized in registry:
 636                metadata = registry[normalized]
 637                return {str(key): str(value) for key, value in metadata.items()}
 638
 639        return {
 640            "name": artifact_key,
 641            "description": "No artifact description available.",
 642            "analysis_hint": "No specific analysis guidance is available for this artifact.",
 643        }
 644
 645    # ------------------------------------------------------------------
 646    # Metadata registration
 647    # ------------------------------------------------------------------
 648
 649    def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None:
 650        """Extract and register artifact CSV paths from run metadata.
 651
 652        Args:
 653            metadata: Optional metadata mapping.
 654        """
 655        if not isinstance(metadata, Mapping):
 656            return
 657
 658        artifact_csv_paths = metadata.get("artifact_csv_paths")
 659        if isinstance(artifact_csv_paths, Mapping):
 660            for artifact_key, csv_path in artifact_csv_paths.items():
 661                if isinstance(csv_path, list) and len(csv_path) > 1:
 662                    self.artifact_csv_paths[str(artifact_key)] = [
 663                        Path(str(p)) for p in csv_path
 664                    ]
 665                elif isinstance(csv_path, list) and csv_path:
 666                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0]))
 667                else:
 668                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 669
 670        for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"):
 671            container = metadata.get(container_key)
 672            if isinstance(container, Mapping):
 673                for artifact_key, value in container.items():
 674                    self._register_artifact_path_entry(artifact_key=artifact_key, value=value)
 675            elif isinstance(container, list):
 676                for item in container:
 677                    if isinstance(item, Mapping):
 678                        artifact_key = item.get("artifact_key") or item.get("key")
 679                        if artifact_key:
 680                            self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item)
 681
 682    def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None:
 683        """Register a single artifact CSV path from a metadata entry.
 684
 685        Args:
 686            artifact_key: Artifact identifier.
 687            value: Metadata entry (mapping, string, or Path).
 688        """
 689        if artifact_key in (None, ""):
 690            return
 691
 692        if isinstance(value, Mapping):
 693            csv_path = value.get("csv_path")
 694            csv_paths = value.get("csv_paths")
 695            if isinstance(csv_paths, list) and len(csv_paths) > 1:
 696                self.artifact_csv_paths[str(artifact_key)] = [
 697                    Path(str(p)) for p in csv_paths
 698                ]
 699                return
 700            if csv_path:
 701                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 702                return
 703            if isinstance(csv_paths, list) and csv_paths:
 704                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0]))
 705                return
 706
 707        if isinstance(value, (str, Path)):
 708            self.artifact_csv_paths[str(artifact_key)] = Path(str(value))
 709
 710    # ------------------------------------------------------------------
 711    # Core analysis pipeline
 712    # ------------------------------------------------------------------
 713
 714    def _prepare_artifact_data(
 715        self, artifact_key: str, investigation_context: str, csv_path: Path | None = None,
 716    ) -> str:
 717        """Prepare one artifact CSV as a bounded, analysis-ready prompt.
 718
 719        Args:
 720            artifact_key: Unique identifier for the artifact.
 721            investigation_context: Free-text investigation context.
 722            csv_path: Explicit path to the artifact CSV.
 723
 724        Returns:
 725            The fully rendered prompt string.
 726
 727        Raises:
 728            FileNotFoundError: If the artifact CSV cannot be located.
 729        """
 730        resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key)
 731        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 732
 733        prompt_text, analysis_csv_path, _ = prepare_artifact_data(
 734            artifact_key=artifact_key,
 735            investigation_context=investigation_context,
 736            csv_path=resolved_csv_path,
 737            artifact_metadata=artifact_metadata,
 738            artifact_prompt_template=self.artifact_prompt_template,
 739            artifact_prompt_template_small_context=self.artifact_prompt_template_small_context,
 740            artifact_instruction_prompts=self.artifact_instruction_prompts,
 741            artifact_ai_column_projections=self.artifact_ai_column_projections,
 742            artifact_deduplication_enabled=self.artifact_deduplication_enabled,
 743            ai_max_tokens=self.ai_max_tokens,
 744            shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens,
 745            case_dir=self.case_dir,
 746            audit_log_fn=self._audit_log,
 747        )
 748        self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path)
 749        return prompt_text
 750
 751    def analyze_artifact(
 752        self,
 753        artifact_key: str,
 754        investigation_context: str,
 755        progress_callback: Any | None = None,
 756    ) -> dict[str, Any]:
 757        """Analyze a single artifact's CSV data and return AI findings.
 758
 759        Args:
 760            artifact_key: Unique identifier for the artifact.
 761            investigation_context: Free-text investigation context.
 762            progress_callback: Optional callable for streaming progress.
 763
 764        Returns:
 765            A dict with ``artifact_key``, ``artifact_name``, ``analysis``,
 766            ``model``, and optionally ``citation_warnings``.
 767        """
 768        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 769        artifact_name = artifact_metadata.get("name", artifact_key)
 770        model = self.model_info.get("model", "unknown")
 771        provider = self.model_info.get("provider", "unknown")
 772
 773        self._audit_log("analysis_started", {
 774            "artifact_key": artifact_key, "artifact_name": artifact_name,
 775            "provider": provider, "model": model,
 776        })
 777
 778        start_time = perf_counter()
 779        try:
 780            all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key)
 781            if len(all_csv_paths) > 1:
 782                csv_path = self._combine_csv_files(artifact_key, all_csv_paths)
 783            else:
 784                csv_path = all_csv_paths[0]
 785            artifact_prompt = self._prepare_artifact_data(
 786                artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path,
 787            )
 788            analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path)
 789            attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)]
 790
 791            safe_key = sanitize_filename(artifact_key)
 792            self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt)
 793
 794            prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt)
 795            if prompt_tokens_estimate > self.ai_max_tokens:
 796                self.logger.info(
 797                    "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.",
 798                    artifact_key, prompt_tokens_estimate, self.ai_max_tokens,
 799                )
 800                if progress_callback is not None:
 801                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 802                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 803                    })
 804                analysis_text = analyze_artifact_chunked(
 805                    artifact_prompt=artifact_prompt,
 806                    artifact_key=artifact_key,
 807                    artifact_name=artifact_name,
 808                    investigation_context=investigation_context,
 809                    model=model,
 810                    system_prompt=self.system_prompt,
 811                    ai_response_max_tokens=self.ai_response_max_tokens,
 812                    chunk_csv_budget=self.chunk_csv_budget,
 813                    chunk_merge_prompt_template=self.chunk_merge_prompt_template,
 814                    max_merge_rounds=self.max_merge_rounds,
 815                    call_ai_with_retry_fn=self._call_ai_with_retry,
 816                    ai_provider=self.ai_provider,
 817                    audit_log_fn=self._audit_log,
 818                    save_case_prompt_fn=self._save_case_prompt,
 819                    progress_callback=progress_callback,
 820                )
 821                duration_seconds = perf_counter() - start_time
 822                self._audit_log("analysis_completed", {
 823                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 824                    "token_count": self._estimate_tokens(analysis_text),
 825                    "duration_seconds": round(duration_seconds, 6),
 826                    "status": "success", "chunked": True,
 827                })
 828                citation_warnings = self._validate_citations(artifact_key, analysis_text)
 829                result: dict[str, Any] = {
 830                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 831                    "analysis": analysis_text, "model": model,
 832                }
 833                if citation_warnings:
 834                    result["citation_warnings"] = citation_warnings
 835                return result
 836
 837            analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None)
 838            if callable(analyze_with_progress) and progress_callback is not None:
 839                emit_analysis_progress(progress_callback, artifact_key, "started", {
 840                    "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 841                })
 842
 843                def _provider_progress(payload: Mapping[str, Any]) -> None:
 844                    """Forward provider progress to the frontend."""
 845                    if not isinstance(payload, Mapping):
 846                        return
 847                    emit_analysis_progress(progress_callback, artifact_key, "thinking", {
 848                        "artifact_key": artifact_key, "artifact_name": artifact_name,
 849                        "thinking_text": str(payload.get("thinking_text", "")),
 850                        "partial_text": str(payload.get("partial_text", "")),
 851                        "model": model,
 852                    })
 853
 854                try:
 855                    analysis_text = analyze_with_progress(
 856                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 857                        progress_callback=_provider_progress, attachments=attachments,
 858                        max_tokens=self.ai_response_max_tokens,
 859                    )
 860                except TypeError:
 861                    analysis_text = analyze_with_progress(
 862                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 863                        progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens,
 864                    )
 865            else:
 866                if progress_callback is not None:
 867                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 868                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 869                    })
 870                analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None)
 871                if callable(analyze_with_attachments):
 872                    analysis_text = self._call_ai_with_retry(
 873                        lambda: analyze_with_attachments(
 874                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 875                            attachments=attachments, max_tokens=self.ai_response_max_tokens,
 876                        )
 877                    )
 878                else:
 879                    analysis_text = self._call_ai_with_retry(
 880                        lambda: self.ai_provider.analyze(
 881                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 882                            max_tokens=self.ai_response_max_tokens,
 883                        )
 884                    )
 885            duration_seconds = perf_counter() - start_time
 886            self._audit_log("analysis_completed", {
 887                "artifact_key": artifact_key, "artifact_name": artifact_name,
 888                "token_count": self._estimate_tokens(analysis_text),
 889                "duration_seconds": round(duration_seconds, 6), "status": "success",
 890            })
 891        except Exception as error:
 892            duration_seconds = perf_counter() - start_time
 893            analysis_text = f"Analysis failed: {error}"
 894            self._audit_log("analysis_completed", {
 895                "artifact_key": artifact_key, "artifact_name": artifact_name,
 896                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 897                "status": "failed", "error": str(error),
 898            })
 899
 900        citation_warnings = self._validate_citations(artifact_key, analysis_text)
 901
 902        result = {
 903            "artifact_key": artifact_key, "artifact_name": artifact_name,
 904            "analysis": analysis_text, "model": model,
 905        }
 906        if citation_warnings:
 907            result["citation_warnings"] = citation_warnings
 908        return result
 909
 910    def generate_summary(
 911        self,
 912        per_artifact_results: list[Mapping[str, Any]],
 913        investigation_context: str,
 914        metadata: Mapping[str, Any] | None,
 915    ) -> str:
 916        """Generate a cross-artifact summary by correlating findings.
 917
 918        Args:
 919            per_artifact_results: List of per-artifact result dicts.
 920            investigation_context: The user's investigation context.
 921            metadata: Optional host metadata mapping.
 922
 923        Returns:
 924            The AI-generated summary text, or an error message.
 925        """
 926        metadata_map = metadata if isinstance(metadata, Mapping) else {}
 927        summary_prompt = build_summary_prompt(
 928            summary_prompt_template=self.summary_prompt_template,
 929            investigation_context=investigation_context,
 930            per_artifact_results=per_artifact_results,
 931            metadata_map=metadata_map,
 932        )
 933
 934        model = self.model_info.get("model", "unknown")
 935        provider = self.model_info.get("provider", "unknown")
 936        summary_artifact_key = "cross_artifact_summary"
 937        summary_artifact_name = "Cross-Artifact Summary"
 938        summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md"
 939
 940        self._audit_log("analysis_started", {
 941            "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 942            "provider": provider, "model": model,
 943        })
 944        self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt)
 945
 946        start_time = perf_counter()
 947        try:
 948            summary = self._call_ai_with_retry(
 949                lambda: self.ai_provider.analyze(
 950                    system_prompt=self.system_prompt, user_prompt=summary_prompt,
 951                    max_tokens=self.ai_response_max_tokens,
 952                )
 953            )
 954            duration_seconds = perf_counter() - start_time
 955            self._audit_log("analysis_completed", {
 956                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 957                "token_count": self._estimate_tokens(summary),
 958                "duration_seconds": round(duration_seconds, 6), "status": "success",
 959            })
 960            return summary
 961        except Exception as error:
 962            duration_seconds = perf_counter() - start_time
 963            summary = f"Analysis failed: {error}"
 964            self._audit_log("analysis_completed", {
 965                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 966                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 967                "status": "failed", "error": str(error),
 968            })
 969            return summary
 970
 971    def run_full_analysis(
 972        self,
 973        artifact_keys: Iterable[str],
 974        investigation_context: str,
 975        metadata: Mapping[str, Any] | None,
 976        progress_callback: Any | None = None,
 977        cancel_check: Any | None = None,
 978    ) -> dict[str, Any]:
 979        """Run the complete analysis pipeline: per-artifact then summary.
 980
 981        Args:
 982            artifact_keys: Iterable of artifact key strings.
 983            investigation_context: The user's investigation context.
 984            metadata: Optional metadata mapping.
 985            progress_callback: Optional callable for streaming progress.
 986            cancel_check: Optional callable returning ``True`` when the
 987                analysis should be aborted early.
 988
 989        Returns:
 990            A dict with ``per_artifact``, ``summary``, and ``model_info``.
 991
 992        Raises:
 993            AnalysisCancelledError: If *cancel_check* returns ``True``.
 994        """
 995        if isinstance(self.ai_provider, UnavailableProvider):
 996            raise AIProviderError(self.ai_provider._error_message)
 997
 998        self._register_artifact_paths_from_metadata(metadata)
 999        per_artifact_results: list[dict[str, Any]] = []
1000        for artifact_key in artifact_keys:
1001            if cancel_check is not None and cancel_check():
1002                raise AnalysisCancelledError("Analysis cancelled by user.")
1003            result = self.analyze_artifact(
1004                artifact_key=str(artifact_key),
1005                investigation_context=investigation_context,
1006                progress_callback=progress_callback,
1007            )
1008            per_artifact_results.append(result)
1009            if progress_callback is not None:
1010                emit_analysis_progress(progress_callback, str(artifact_key), "complete", result)
1011
1012        summary = self.generate_summary(
1013            per_artifact_results=per_artifact_results,
1014            investigation_context=investigation_context,
1015            metadata=metadata,
1016        )
1017        return {
1018            "per_artifact": per_artifact_results,
1019            "summary": summary,
1020            "model_info": dict(self.model_info),
1021        }
class AnalysisCancelledError(builtins.Exception):
74class AnalysisCancelledError(Exception):
75    """Raised when analysis is cancelled by the user."""

Raised when analysis is cancelled by the user.

class ForensicAnalyzer:
  78class ForensicAnalyzer:
  79    """Orchestrates AI-powered forensic analysis of parsed artifact CSV data.
  80
  81    Central analysis engine for AIFT: reads parsed artifact CSV files, applies
  82    column projection, and deduplication, builds token-budgeted
  83    prompts, sends them to a configured AI provider, and validates citations.
  84
  85    Attributes:
  86        case_dir: Path to the case directory, or ``None``.
  87        config: Merged configuration dictionary.
  88        ai_provider: The configured AI provider instance.
  89        model_info: Dict with ``provider`` and ``model`` keys.
  90    """
  91
  92    # Expose extracted functions as static methods for backward compatibility
  93    # with tests and callers that use ForensicAnalyzer._method_name().
  94    _stringify_value = staticmethod(stringify_value)
  95    _build_datetime = staticmethod(build_datetime)
  96    _normalize_artifact_key = staticmethod(normalize_artifact_key)
  97    _sanitize_filename = staticmethod(sanitize_filename)
  98    _split_csv_into_chunks = staticmethod(split_csv_into_chunks)
  99    _split_csv_and_suffix = staticmethod(split_csv_and_suffix)
 100    _coerce_projection_columns = staticmethod(coerce_projection_columns)
 101    _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column)
 102    _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys)
 103    _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv)
 104    _match_column_name = staticmethod(match_column_name)
 105    _emit_analysis_progress = staticmethod(emit_analysis_progress)
 106
 107    def __init__(
 108        self,
 109        case_dir: str | Path | Mapping[str, str | Path] | None = None,
 110        config: Mapping[str, Any] | None = None,
 111        audit_logger: Any | None = None,
 112        artifact_csv_paths: Mapping[str, str | Path] | None = None,
 113        prompts_dir: str | Path | None = None,
 114        random_seed: int | None = None,
 115        os_type: str = "windows",
 116    ) -> None:
 117        """Initialize the forensic analyzer with case context and configuration.
 118
 119        Args:
 120            case_dir: Path to the case directory, or a mapping of artifact
 121                keys to CSV paths (convenience shorthand).
 122            config: Application configuration dictionary.
 123            audit_logger: Optional object with a ``log(action, details)``
 124                method.
 125            artifact_csv_paths: Mapping of artifact keys to CSV paths.
 126            prompts_dir: Directory containing prompt template files.
 127            random_seed: Optional seed for the internal RNG.
 128            os_type: Detected operating system type (``"windows"``,
 129                ``"linux"``, etc.).  Controls which artifact instruction
 130                prompts are loaded.
 131        """
 132        if (
 133            isinstance(case_dir, Mapping)
 134            and config is None
 135            and audit_logger is None
 136            and artifact_csv_paths is None
 137        ):
 138            artifact_csv_paths = case_dir
 139            case_dir = None
 140
 141        self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None
 142        self.logger = LOGGER
 143        self.config = dict(config) if isinstance(config, Mapping) else {}
 144        self.audit_logger = audit_logger
 145        self.artifact_csv_paths: dict[str, Path | list[Path]] = {}
 146        for artifact_key, csv_path in (artifact_csv_paths or {}).items():
 147            key = str(artifact_key)
 148            if isinstance(csv_path, list):
 149                self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path]
 150            else:
 151                self.artifact_csv_paths[key] = Path(str(csv_path))
 152        self._analysis_input_csv_paths: dict[str, Path] = {}
 153        self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts"
 154        self.os_type = normalize_os_type(os_type)
 155        import random
 156        self._random = random.Random(random_seed)
 157        self._load_analysis_settings()
 158        self.artifact_ai_column_projections = self._load_artifact_ai_column_projections()
 159        self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT)
 160        self.artifact_prompt_template = self._load_prompt_template(
 161            "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
 162        )
 163        self.artifact_prompt_template_small_context = self._load_prompt_template(
 164            "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT,
 165        )
 166        self.artifact_instruction_prompts = self._load_artifact_instruction_prompts()
 167        self.summary_prompt_template = self._load_prompt_template(
 168            "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE,
 169        )
 170        self.chunk_merge_prompt_template = self._load_prompt_template(
 171            "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
 172        )
 173        self.ai_provider = self._create_ai_provider()
 174        self.model_info = self._read_model_info()
 175
 176    # ------------------------------------------------------------------
 177    # Configuration loading
 178    # ------------------------------------------------------------------
 179
 180    def _load_analysis_settings(self) -> None:
 181        """Load and validate analysis tuning parameters from the config dict."""
 182        analysis_config = self.config.get("analysis")
 183        if not isinstance(analysis_config, Mapping):
 184            analysis_config = {}
 185
 186        self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1)
 187        self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2))
 188        legacy_shortened = read_int_setting(
 189            analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1,
 190        )
 191        self.shortened_prompt_cutoff_tokens = read_int_setting(
 192            analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1,
 193        )
 194        self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6)
 195        self.citation_spot_check_limit = read_int_setting(
 196            analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1,
 197        )
 198        self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1)
 199        self.artifact_deduplication_enabled = read_bool_setting(
 200            analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED,
 201        )
 202        self.artifact_ai_columns_config_path = read_path_setting(
 203            analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH),
 204        )
 205
 206    # Backward-compatible aliases for the extracted config readers.
 207    _read_int_setting = staticmethod(read_int_setting)
 208    _read_bool_setting = staticmethod(read_bool_setting)
 209    _read_path_setting = staticmethod(read_path_setting)
 210
 211    def _resolve_artifact_ai_columns_config_path(self) -> Path:
 212        """Resolve the artifact AI columns config path to an absolute Path.
 213
 214        Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`.
 215
 216        Returns:
 217            Resolved absolute ``Path`` to the YAML config file.
 218        """
 219        return resolve_artifact_ai_columns_config_path(
 220            self.artifact_ai_columns_config_path, self.case_dir,
 221        )
 222
 223    def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]:
 224        """Load per-artifact column projection configuration from YAML.
 225
 226        Delegates to :func:`prompts.load_artifact_ai_column_projections`.
 227
 228        Returns:
 229            A dict mapping normalized artifact keys to tuples of column names.
 230        """
 231        config_path = self._resolve_artifact_ai_columns_config_path()
 232        return load_artifact_ai_column_projections(config_path, os_type=self.os_type)
 233
 234    def _load_prompt_template(self, filename: str, default: str) -> str:
 235        """Read a prompt template file from the prompts directory.
 236
 237        Delegates to :func:`prompts.load_prompt_template`.
 238
 239        Args:
 240            filename: Name of the template file.
 241            default: Fallback template string.
 242
 243        Returns:
 244            The template text.
 245        """
 246        return load_prompt_template(self.prompts_dir, filename, default)
 247
 248    def _load_artifact_instruction_prompts(self) -> dict[str, str]:
 249        """Load per-artifact analysis instruction prompts.
 250
 251        Delegates to :func:`prompts.load_artifact_instruction_prompts`,
 252        passing :attr:`os_type` so the correct OS-specific instruction
 253        directory is selected.
 254
 255        Returns:
 256            A dict mapping artifact keys to instruction prompt text.
 257        """
 258        return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type)
 259
 260    # ------------------------------------------------------------------
 261    # AI provider
 262    # ------------------------------------------------------------------
 263
 264    def _create_ai_provider(self) -> Any:
 265        """Instantiate the configured AI provider, or a fallback on failure.
 266
 267        Returns:
 268            An AI provider instance, or an ``UnavailableProvider``.
 269        """
 270        provider_config: Mapping[str, Any]
 271        if self.config:
 272            provider_config = self.config
 273        else:
 274            provider_config = {
 275                "ai": {
 276                    "provider": "local",
 277                    "local": {
 278                        "base_url": "http://localhost:11434/v1",
 279                        "model": "llama3.1:70b",
 280                        "api_key": "not-needed",
 281                    },
 282                }
 283            }
 284        try:
 285            return create_provider(dict(provider_config))
 286        except Exception as error:
 287            return UnavailableProvider(str(error))
 288
 289    def _read_model_info(self) -> dict[str, str]:
 290        """Read provider and model metadata from the AI provider.
 291
 292        Returns:
 293            A dict with at least ``provider`` and ``model`` keys.
 294        """
 295        try:
 296            model_info = self.ai_provider.get_model_info()
 297        except Exception:
 298            return {"provider": "unknown", "model": "unknown"}
 299
 300        if not isinstance(model_info, Mapping):
 301            return {"provider": "unknown", "model": "unknown"}
 302
 303        return {str(key): str(value) for key, value in model_info.items()}
 304
 305    def _call_ai_with_retry(self, call: Callable[[], str]) -> str:
 306        """Call the AI provider with retry on transient failures.
 307
 308        Args:
 309            call: A zero-argument callable that invokes the AI provider.
 310
 311        Returns:
 312            The AI provider's response string.
 313
 314        Raises:
 315            AIProviderError: If the provider raises a permanent error.
 316            Exception: The last transient error after all retries.
 317        """
 318        last_error: Exception | None = None
 319        for attempt in range(AI_RETRY_ATTEMPTS):
 320            try:
 321                return call()
 322            except AIProviderError:
 323                raise
 324            except Exception as error:
 325                last_error = error
 326                if attempt < AI_RETRY_ATTEMPTS - 1:
 327                    delay = AI_RETRY_BASE_DELAY * (2 ** attempt)
 328                    self.logger.warning(
 329                        "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s",
 330                        attempt + 1, AI_RETRY_ATTEMPTS, delay, error,
 331                    )
 332                    sleep(delay)
 333        raise last_error  # type: ignore[misc]
 334
 335    # ------------------------------------------------------------------
 336    # Audit / prompt saving
 337    # ------------------------------------------------------------------
 338
 339    def _audit_log(self, action: str, details: dict[str, Any]) -> None:
 340        """Write an entry to the forensic audit trail.
 341
 342        Args:
 343            action: The audit action name.
 344            details: Key-value details for the audit entry.
 345        """
 346        if self.audit_logger is None:
 347            return
 348        logger = getattr(self.audit_logger, "log", None)
 349        if not callable(logger):
 350            return
 351        try:
 352            logger(action, details)
 353        except Exception:
 354            return
 355
 356    def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None:
 357        """Save a prompt to the case prompts directory for audit.
 358
 359        Args:
 360            filename: Output filename.
 361            system_prompt: The system prompt text.
 362            user_prompt: The user prompt text.
 363        """
 364        if self.case_dir is None:
 365            return
 366        prompts_dir = self.case_dir / "prompts"
 367        try:
 368            prompts_dir.mkdir(parents=True, exist_ok=True)
 369            prompt_path = prompts_dir / filename
 370            prompt_path.write_text(
 371                f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n",
 372                encoding="utf-8",
 373            )
 374        except OSError:
 375            self.logger.warning("Failed to save prompt to %s", prompts_dir / filename)
 376
 377    # ------------------------------------------------------------------
 378    # Delegation methods — thin wrappers for backward compatibility.
 379    # Methods that don't use self are exposed as staticmethod assignments
 380    # at the class level (see above).  The methods below need self.
 381    # ------------------------------------------------------------------
 382
 383    def _estimate_tokens(self, text: str) -> int:
 384        """Estimate the token count of *text* using model-specific info."""
 385        return estimate_tokens(text, model_info=self.model_info)
 386
 387    # These are also exposed as staticmethods on the class (see above)
 388    # but tests may call them on instances, so they work either way.
 389    _extract_ioc_targets = staticmethod(extract_ioc_targets)
 390    _format_ioc_targets = staticmethod(format_ioc_targets)
 391    _build_priority_directives = staticmethod(build_priority_directives)
 392    _compute_statistics = staticmethod(compute_statistics)
 393    _build_full_data_csv = staticmethod(build_full_data_csv)
 394    _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis)
 395
 396    def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]:
 397        """Spot-check AI-cited values against source CSV.
 398
 399        Args:
 400            artifact_key: Artifact identifier.
 401            analysis_text: The AI's analysis text.
 402
 403        Returns:
 404            List of warning strings.
 405        """
 406        if analysis_text.startswith("Analysis failed:"):
 407            return []
 408        try:
 409            original_path = self._resolve_artifact_csv_path(artifact_key)
 410        except FileNotFoundError:
 411            return []
 412        csv_path = self._resolve_analysis_input_csv_path(
 413            artifact_key, fallback=original_path,
 414        )
 415        return validate_citations(
 416            artifact_key=artifact_key,
 417            analysis_text=analysis_text,
 418            csv_path=csv_path,
 419            citation_spot_check_limit=self.citation_spot_check_limit,
 420            audit_log_fn=self._audit_log,
 421        )
 422
 423    # ------------------------------------------------------------------
 424    # Path resolution
 425    # ------------------------------------------------------------------
 426
 427    def _resolve_artifact_csv_path(self, artifact_key: str) -> Path:
 428        """Resolve the CSV file path for a given artifact key.
 429
 430        For split artifacts with multiple CSV files, returns the first
 431        path.  Use :meth:`_resolve_all_artifact_csv_paths` to get every
 432        path for a split artifact.
 433
 434        Args:
 435            artifact_key: Artifact identifier to resolve.
 436
 437        Returns:
 438            A ``Path`` to the artifact's CSV file.
 439
 440        Raises:
 441            FileNotFoundError: If no CSV path can be found.
 442        """
 443        mapped = self.artifact_csv_paths.get(artifact_key)
 444        if mapped is not None:
 445            if isinstance(mapped, list):
 446                return mapped[0]
 447            return mapped
 448
 449        normalized = normalize_artifact_key(artifact_key)
 450        mapped_normalized = self.artifact_csv_paths.get(normalized)
 451        if mapped_normalized is not None:
 452            if isinstance(mapped_normalized, list):
 453                return mapped_normalized[0]
 454            return mapped_normalized
 455
 456        candidate_path = Path(artifact_key)
 457        if candidate_path.exists():
 458            return candidate_path
 459
 460        if self.case_dir is not None:
 461            parsed_dir = self.case_dir / "parsed"
 462            if parsed_dir.exists():
 463                normalized = normalize_artifact_key(artifact_key)
 464                file_stubs = {
 465                    artifact_key, normalized,
 466                    sanitize_filename(artifact_key),
 467                    sanitize_filename(normalized),
 468                }
 469                for file_stub in file_stubs:
 470                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 471                    if direct_csv_path.exists():
 472                        return direct_csv_path
 473                for file_stub in file_stubs:
 474                    prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv"))
 475                    if prefixed_paths:
 476                        return prefixed_paths[0]
 477
 478        raise FileNotFoundError(
 479            f"No CSV path mapped for artifact '{artifact_key}'. "
 480            "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths."
 481        )
 482
 483    def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]:
 484        """Resolve all CSV file paths for a given artifact key.
 485
 486        For single-file artifacts returns a one-element list.  For split
 487        artifacts (e.g. EVTX) returns all constituent CSV paths.
 488
 489        Args:
 490            artifact_key: Artifact identifier to resolve.
 491
 492        Returns:
 493            A non-empty list of ``Path`` objects.
 494
 495        Raises:
 496            FileNotFoundError: If no CSV path can be found.
 497        """
 498        for key in (artifact_key, normalize_artifact_key(artifact_key)):
 499            mapped = self.artifact_csv_paths.get(key)
 500            if mapped is not None:
 501                if isinstance(mapped, list):
 502                    return list(mapped)
 503                return [mapped]
 504
 505        # Filesystem fallback: search case_dir/parsed for all matching parts.
 506        if self.case_dir is not None:
 507            parsed_dir = self.case_dir / "parsed"
 508            if parsed_dir.exists():
 509                normalized = normalize_artifact_key(artifact_key)
 510                file_stubs = {
 511                    artifact_key, normalized,
 512                    sanitize_filename(artifact_key),
 513                    sanitize_filename(normalized),
 514                }
 515                for file_stub in file_stubs:
 516                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 517                    combined_csv_path = parsed_dir / f"{file_stub}_combined.csv"
 518                    prefixed_paths = sorted(
 519                        path
 520                        for path in parsed_dir.glob(f"{file_stub}_*.csv")
 521                        if path != combined_csv_path
 522                    )
 523                    if direct_csv_path.exists() and prefixed_paths:
 524                        return sorted([direct_csv_path] + prefixed_paths)
 525                    if prefixed_paths:
 526                        return prefixed_paths
 527                    if direct_csv_path.exists():
 528                        return [direct_csv_path]
 529
 530        # Final fallback: delegate to single-path resolver.
 531        return [self._resolve_artifact_csv_path(artifact_key)]
 532
 533    def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path:
 534        """Concatenate multiple CSV files into a single combined CSV.
 535
 536        All input files are assumed to share the same schema (column names).
 537        The combined file is written to the case's ``parsed/`` directory (or
 538        next to the first input file) with a ``_combined`` suffix.
 539
 540        Args:
 541            artifact_key: Artifact identifier (used for the output filename).
 542            csv_paths: List of CSV file paths to combine.
 543
 544        Returns:
 545            Path to the combined CSV file.
 546        """
 547        import csv as csv_mod
 548
 549        output_dir = csv_paths[0].parent
 550        safe_key = sanitize_filename(artifact_key)
 551        combined_path = output_dir / f"{safe_key}_combined.csv"
 552
 553        fieldnames: list[str] = []
 554        fieldnames_set: set[str] = set()
 555
 556        for csv_path in csv_paths:
 557            if not csv_path.exists():
 558                continue
 559            with csv_path.open("r", newline="", encoding="utf-8") as fh:
 560                reader = csv_mod.DictReader(fh)
 561                if reader.fieldnames:
 562                    for fn in reader.fieldnames:
 563                        if fn not in fieldnames_set:
 564                            fieldnames.append(fn)
 565                            fieldnames_set.add(fn)
 566
 567        with combined_path.open("w", newline="", encoding="utf-8") as out:
 568            writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore")
 569            writer.writeheader()
 570            for csv_path in csv_paths:
 571                if not csv_path.exists():
 572                    continue
 573                with csv_path.open("r", newline="", encoding="utf-8") as fh:
 574                    reader = csv_mod.DictReader(fh)
 575                    for row in reader:
 576                        writer.writerow(row)
 577
 578        return combined_path
 579
 580    def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None:
 581        """Store the analysis-input CSV path for an artifact.
 582
 583        Args:
 584            artifact_key: Artifact identifier.
 585            csv_path: Path to the analysis-input CSV.
 586        """
 587        self._analysis_input_csv_paths[artifact_key] = csv_path
 588        normalized = normalize_artifact_key(artifact_key)
 589        self._analysis_input_csv_paths[normalized] = csv_path
 590
 591    def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path:
 592        """Retrieve the analysis-input CSV path, with fallback.
 593
 594        Args:
 595            artifact_key: Artifact identifier.
 596            fallback: Default path if not stored.
 597
 598        Returns:
 599            The stored analysis-input CSV path, or *fallback*.
 600        """
 601        mapped = self._analysis_input_csv_paths.get(artifact_key)
 602        if mapped is not None:
 603            return mapped
 604        normalized = normalize_artifact_key(artifact_key)
 605        mapped = self._analysis_input_csv_paths.get(normalized)
 606        if mapped is not None:
 607            return mapped
 608        return fallback
 609
 610    def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]:
 611        """Look up artifact metadata from the OS-appropriate registry first.
 612
 613        Searches the registry matching :attr:`os_type` first so that
 614        shared keys like ``services`` resolve to the correct OS-specific
 615        entry.  Falls back to the other registry for cross-OS lookups.
 616
 617        Args:
 618            artifact_key: Artifact identifier.
 619
 620        Returns:
 621            A dict with at least ``name``, ``description``, and
 622            ``analysis_hint`` keys.
 623        """
 624        if self.os_type == "linux":
 625            registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY)
 626        else:
 627            registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY)
 628
 629        for registry in registries:
 630            if artifact_key in registry:
 631                metadata = registry[artifact_key]
 632                return {str(key): str(value) for key, value in metadata.items()}
 633
 634        normalized = normalize_artifact_key(artifact_key)
 635        for registry in registries:
 636            if normalized in registry:
 637                metadata = registry[normalized]
 638                return {str(key): str(value) for key, value in metadata.items()}
 639
 640        return {
 641            "name": artifact_key,
 642            "description": "No artifact description available.",
 643            "analysis_hint": "No specific analysis guidance is available for this artifact.",
 644        }
 645
 646    # ------------------------------------------------------------------
 647    # Metadata registration
 648    # ------------------------------------------------------------------
 649
 650    def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None:
 651        """Extract and register artifact CSV paths from run metadata.
 652
 653        Args:
 654            metadata: Optional metadata mapping.
 655        """
 656        if not isinstance(metadata, Mapping):
 657            return
 658
 659        artifact_csv_paths = metadata.get("artifact_csv_paths")
 660        if isinstance(artifact_csv_paths, Mapping):
 661            for artifact_key, csv_path in artifact_csv_paths.items():
 662                if isinstance(csv_path, list) and len(csv_path) > 1:
 663                    self.artifact_csv_paths[str(artifact_key)] = [
 664                        Path(str(p)) for p in csv_path
 665                    ]
 666                elif isinstance(csv_path, list) and csv_path:
 667                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0]))
 668                else:
 669                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 670
 671        for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"):
 672            container = metadata.get(container_key)
 673            if isinstance(container, Mapping):
 674                for artifact_key, value in container.items():
 675                    self._register_artifact_path_entry(artifact_key=artifact_key, value=value)
 676            elif isinstance(container, list):
 677                for item in container:
 678                    if isinstance(item, Mapping):
 679                        artifact_key = item.get("artifact_key") or item.get("key")
 680                        if artifact_key:
 681                            self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item)
 682
 683    def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None:
 684        """Register a single artifact CSV path from a metadata entry.
 685
 686        Args:
 687            artifact_key: Artifact identifier.
 688            value: Metadata entry (mapping, string, or Path).
 689        """
 690        if artifact_key in (None, ""):
 691            return
 692
 693        if isinstance(value, Mapping):
 694            csv_path = value.get("csv_path")
 695            csv_paths = value.get("csv_paths")
 696            if isinstance(csv_paths, list) and len(csv_paths) > 1:
 697                self.artifact_csv_paths[str(artifact_key)] = [
 698                    Path(str(p)) for p in csv_paths
 699                ]
 700                return
 701            if csv_path:
 702                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 703                return
 704            if isinstance(csv_paths, list) and csv_paths:
 705                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0]))
 706                return
 707
 708        if isinstance(value, (str, Path)):
 709            self.artifact_csv_paths[str(artifact_key)] = Path(str(value))
 710
 711    # ------------------------------------------------------------------
 712    # Core analysis pipeline
 713    # ------------------------------------------------------------------
 714
 715    def _prepare_artifact_data(
 716        self, artifact_key: str, investigation_context: str, csv_path: Path | None = None,
 717    ) -> str:
 718        """Prepare one artifact CSV as a bounded, analysis-ready prompt.
 719
 720        Args:
 721            artifact_key: Unique identifier for the artifact.
 722            investigation_context: Free-text investigation context.
 723            csv_path: Explicit path to the artifact CSV.
 724
 725        Returns:
 726            The fully rendered prompt string.
 727
 728        Raises:
 729            FileNotFoundError: If the artifact CSV cannot be located.
 730        """
 731        resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key)
 732        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 733
 734        prompt_text, analysis_csv_path, _ = prepare_artifact_data(
 735            artifact_key=artifact_key,
 736            investigation_context=investigation_context,
 737            csv_path=resolved_csv_path,
 738            artifact_metadata=artifact_metadata,
 739            artifact_prompt_template=self.artifact_prompt_template,
 740            artifact_prompt_template_small_context=self.artifact_prompt_template_small_context,
 741            artifact_instruction_prompts=self.artifact_instruction_prompts,
 742            artifact_ai_column_projections=self.artifact_ai_column_projections,
 743            artifact_deduplication_enabled=self.artifact_deduplication_enabled,
 744            ai_max_tokens=self.ai_max_tokens,
 745            shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens,
 746            case_dir=self.case_dir,
 747            audit_log_fn=self._audit_log,
 748        )
 749        self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path)
 750        return prompt_text
 751
 752    def analyze_artifact(
 753        self,
 754        artifact_key: str,
 755        investigation_context: str,
 756        progress_callback: Any | None = None,
 757    ) -> dict[str, Any]:
 758        """Analyze a single artifact's CSV data and return AI findings.
 759
 760        Args:
 761            artifact_key: Unique identifier for the artifact.
 762            investigation_context: Free-text investigation context.
 763            progress_callback: Optional callable for streaming progress.
 764
 765        Returns:
 766            A dict with ``artifact_key``, ``artifact_name``, ``analysis``,
 767            ``model``, and optionally ``citation_warnings``.
 768        """
 769        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 770        artifact_name = artifact_metadata.get("name", artifact_key)
 771        model = self.model_info.get("model", "unknown")
 772        provider = self.model_info.get("provider", "unknown")
 773
 774        self._audit_log("analysis_started", {
 775            "artifact_key": artifact_key, "artifact_name": artifact_name,
 776            "provider": provider, "model": model,
 777        })
 778
 779        start_time = perf_counter()
 780        try:
 781            all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key)
 782            if len(all_csv_paths) > 1:
 783                csv_path = self._combine_csv_files(artifact_key, all_csv_paths)
 784            else:
 785                csv_path = all_csv_paths[0]
 786            artifact_prompt = self._prepare_artifact_data(
 787                artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path,
 788            )
 789            analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path)
 790            attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)]
 791
 792            safe_key = sanitize_filename(artifact_key)
 793            self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt)
 794
 795            prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt)
 796            if prompt_tokens_estimate > self.ai_max_tokens:
 797                self.logger.info(
 798                    "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.",
 799                    artifact_key, prompt_tokens_estimate, self.ai_max_tokens,
 800                )
 801                if progress_callback is not None:
 802                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 803                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 804                    })
 805                analysis_text = analyze_artifact_chunked(
 806                    artifact_prompt=artifact_prompt,
 807                    artifact_key=artifact_key,
 808                    artifact_name=artifact_name,
 809                    investigation_context=investigation_context,
 810                    model=model,
 811                    system_prompt=self.system_prompt,
 812                    ai_response_max_tokens=self.ai_response_max_tokens,
 813                    chunk_csv_budget=self.chunk_csv_budget,
 814                    chunk_merge_prompt_template=self.chunk_merge_prompt_template,
 815                    max_merge_rounds=self.max_merge_rounds,
 816                    call_ai_with_retry_fn=self._call_ai_with_retry,
 817                    ai_provider=self.ai_provider,
 818                    audit_log_fn=self._audit_log,
 819                    save_case_prompt_fn=self._save_case_prompt,
 820                    progress_callback=progress_callback,
 821                )
 822                duration_seconds = perf_counter() - start_time
 823                self._audit_log("analysis_completed", {
 824                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 825                    "token_count": self._estimate_tokens(analysis_text),
 826                    "duration_seconds": round(duration_seconds, 6),
 827                    "status": "success", "chunked": True,
 828                })
 829                citation_warnings = self._validate_citations(artifact_key, analysis_text)
 830                result: dict[str, Any] = {
 831                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 832                    "analysis": analysis_text, "model": model,
 833                }
 834                if citation_warnings:
 835                    result["citation_warnings"] = citation_warnings
 836                return result
 837
 838            analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None)
 839            if callable(analyze_with_progress) and progress_callback is not None:
 840                emit_analysis_progress(progress_callback, artifact_key, "started", {
 841                    "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 842                })
 843
 844                def _provider_progress(payload: Mapping[str, Any]) -> None:
 845                    """Forward provider progress to the frontend."""
 846                    if not isinstance(payload, Mapping):
 847                        return
 848                    emit_analysis_progress(progress_callback, artifact_key, "thinking", {
 849                        "artifact_key": artifact_key, "artifact_name": artifact_name,
 850                        "thinking_text": str(payload.get("thinking_text", "")),
 851                        "partial_text": str(payload.get("partial_text", "")),
 852                        "model": model,
 853                    })
 854
 855                try:
 856                    analysis_text = analyze_with_progress(
 857                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 858                        progress_callback=_provider_progress, attachments=attachments,
 859                        max_tokens=self.ai_response_max_tokens,
 860                    )
 861                except TypeError:
 862                    analysis_text = analyze_with_progress(
 863                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 864                        progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens,
 865                    )
 866            else:
 867                if progress_callback is not None:
 868                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 869                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 870                    })
 871                analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None)
 872                if callable(analyze_with_attachments):
 873                    analysis_text = self._call_ai_with_retry(
 874                        lambda: analyze_with_attachments(
 875                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 876                            attachments=attachments, max_tokens=self.ai_response_max_tokens,
 877                        )
 878                    )
 879                else:
 880                    analysis_text = self._call_ai_with_retry(
 881                        lambda: self.ai_provider.analyze(
 882                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 883                            max_tokens=self.ai_response_max_tokens,
 884                        )
 885                    )
 886            duration_seconds = perf_counter() - start_time
 887            self._audit_log("analysis_completed", {
 888                "artifact_key": artifact_key, "artifact_name": artifact_name,
 889                "token_count": self._estimate_tokens(analysis_text),
 890                "duration_seconds": round(duration_seconds, 6), "status": "success",
 891            })
 892        except Exception as error:
 893            duration_seconds = perf_counter() - start_time
 894            analysis_text = f"Analysis failed: {error}"
 895            self._audit_log("analysis_completed", {
 896                "artifact_key": artifact_key, "artifact_name": artifact_name,
 897                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 898                "status": "failed", "error": str(error),
 899            })
 900
 901        citation_warnings = self._validate_citations(artifact_key, analysis_text)
 902
 903        result = {
 904            "artifact_key": artifact_key, "artifact_name": artifact_name,
 905            "analysis": analysis_text, "model": model,
 906        }
 907        if citation_warnings:
 908            result["citation_warnings"] = citation_warnings
 909        return result
 910
 911    def generate_summary(
 912        self,
 913        per_artifact_results: list[Mapping[str, Any]],
 914        investigation_context: str,
 915        metadata: Mapping[str, Any] | None,
 916    ) -> str:
 917        """Generate a cross-artifact summary by correlating findings.
 918
 919        Args:
 920            per_artifact_results: List of per-artifact result dicts.
 921            investigation_context: The user's investigation context.
 922            metadata: Optional host metadata mapping.
 923
 924        Returns:
 925            The AI-generated summary text, or an error message.
 926        """
 927        metadata_map = metadata if isinstance(metadata, Mapping) else {}
 928        summary_prompt = build_summary_prompt(
 929            summary_prompt_template=self.summary_prompt_template,
 930            investigation_context=investigation_context,
 931            per_artifact_results=per_artifact_results,
 932            metadata_map=metadata_map,
 933        )
 934
 935        model = self.model_info.get("model", "unknown")
 936        provider = self.model_info.get("provider", "unknown")
 937        summary_artifact_key = "cross_artifact_summary"
 938        summary_artifact_name = "Cross-Artifact Summary"
 939        summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md"
 940
 941        self._audit_log("analysis_started", {
 942            "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 943            "provider": provider, "model": model,
 944        })
 945        self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt)
 946
 947        start_time = perf_counter()
 948        try:
 949            summary = self._call_ai_with_retry(
 950                lambda: self.ai_provider.analyze(
 951                    system_prompt=self.system_prompt, user_prompt=summary_prompt,
 952                    max_tokens=self.ai_response_max_tokens,
 953                )
 954            )
 955            duration_seconds = perf_counter() - start_time
 956            self._audit_log("analysis_completed", {
 957                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 958                "token_count": self._estimate_tokens(summary),
 959                "duration_seconds": round(duration_seconds, 6), "status": "success",
 960            })
 961            return summary
 962        except Exception as error:
 963            duration_seconds = perf_counter() - start_time
 964            summary = f"Analysis failed: {error}"
 965            self._audit_log("analysis_completed", {
 966                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 967                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 968                "status": "failed", "error": str(error),
 969            })
 970            return summary
 971
 972    def run_full_analysis(
 973        self,
 974        artifact_keys: Iterable[str],
 975        investigation_context: str,
 976        metadata: Mapping[str, Any] | None,
 977        progress_callback: Any | None = None,
 978        cancel_check: Any | None = None,
 979    ) -> dict[str, Any]:
 980        """Run the complete analysis pipeline: per-artifact then summary.
 981
 982        Args:
 983            artifact_keys: Iterable of artifact key strings.
 984            investigation_context: The user's investigation context.
 985            metadata: Optional metadata mapping.
 986            progress_callback: Optional callable for streaming progress.
 987            cancel_check: Optional callable returning ``True`` when the
 988                analysis should be aborted early.
 989
 990        Returns:
 991            A dict with ``per_artifact``, ``summary``, and ``model_info``.
 992
 993        Raises:
 994            AnalysisCancelledError: If *cancel_check* returns ``True``.
 995        """
 996        if isinstance(self.ai_provider, UnavailableProvider):
 997            raise AIProviderError(self.ai_provider._error_message)
 998
 999        self._register_artifact_paths_from_metadata(metadata)
1000        per_artifact_results: list[dict[str, Any]] = []
1001        for artifact_key in artifact_keys:
1002            if cancel_check is not None and cancel_check():
1003                raise AnalysisCancelledError("Analysis cancelled by user.")
1004            result = self.analyze_artifact(
1005                artifact_key=str(artifact_key),
1006                investigation_context=investigation_context,
1007                progress_callback=progress_callback,
1008            )
1009            per_artifact_results.append(result)
1010            if progress_callback is not None:
1011                emit_analysis_progress(progress_callback, str(artifact_key), "complete", result)
1012
1013        summary = self.generate_summary(
1014            per_artifact_results=per_artifact_results,
1015            investigation_context=investigation_context,
1016            metadata=metadata,
1017        )
1018        return {
1019            "per_artifact": per_artifact_results,
1020            "summary": summary,
1021            "model_info": dict(self.model_info),
1022        }

Orchestrates AI-powered forensic analysis of parsed artifact CSV data.

Central analysis engine for AIFT: reads parsed artifact CSV files, applies column projection, and deduplication, builds token-budgeted prompts, sends them to a configured AI provider, and validates citations.

Attributes:
  • case_dir: Path to the case directory, or None.
  • config: Merged configuration dictionary.
  • ai_provider: The configured AI provider instance.
  • model_info: Dict with provider and model keys.
ForensicAnalyzer( case_dir: Union[str, pathlib.Path, Mapping[str, str | pathlib.Path], NoneType] = None, config: Optional[Mapping[str, Any]] = None, audit_logger: typing.Any | None = None, artifact_csv_paths: Optional[Mapping[str, str | pathlib.Path]] = None, prompts_dir: str | pathlib.Path | None = None, random_seed: int | None = None, os_type: str = 'windows')
107    def __init__(
108        self,
109        case_dir: str | Path | Mapping[str, str | Path] | None = None,
110        config: Mapping[str, Any] | None = None,
111        audit_logger: Any | None = None,
112        artifact_csv_paths: Mapping[str, str | Path] | None = None,
113        prompts_dir: str | Path | None = None,
114        random_seed: int | None = None,
115        os_type: str = "windows",
116    ) -> None:
117        """Initialize the forensic analyzer with case context and configuration.
118
119        Args:
120            case_dir: Path to the case directory, or a mapping of artifact
121                keys to CSV paths (convenience shorthand).
122            config: Application configuration dictionary.
123            audit_logger: Optional object with a ``log(action, details)``
124                method.
125            artifact_csv_paths: Mapping of artifact keys to CSV paths.
126            prompts_dir: Directory containing prompt template files.
127            random_seed: Optional seed for the internal RNG.
128            os_type: Detected operating system type (``"windows"``,
129                ``"linux"``, etc.).  Controls which artifact instruction
130                prompts are loaded.
131        """
132        if (
133            isinstance(case_dir, Mapping)
134            and config is None
135            and audit_logger is None
136            and artifact_csv_paths is None
137        ):
138            artifact_csv_paths = case_dir
139            case_dir = None
140
141        self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None
142        self.logger = LOGGER
143        self.config = dict(config) if isinstance(config, Mapping) else {}
144        self.audit_logger = audit_logger
145        self.artifact_csv_paths: dict[str, Path | list[Path]] = {}
146        for artifact_key, csv_path in (artifact_csv_paths or {}).items():
147            key = str(artifact_key)
148            if isinstance(csv_path, list):
149                self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path]
150            else:
151                self.artifact_csv_paths[key] = Path(str(csv_path))
152        self._analysis_input_csv_paths: dict[str, Path] = {}
153        self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts"
154        self.os_type = normalize_os_type(os_type)
155        import random
156        self._random = random.Random(random_seed)
157        self._load_analysis_settings()
158        self.artifact_ai_column_projections = self._load_artifact_ai_column_projections()
159        self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT)
160        self.artifact_prompt_template = self._load_prompt_template(
161            "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
162        )
163        self.artifact_prompt_template_small_context = self._load_prompt_template(
164            "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT,
165        )
166        self.artifact_instruction_prompts = self._load_artifact_instruction_prompts()
167        self.summary_prompt_template = self._load_prompt_template(
168            "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE,
169        )
170        self.chunk_merge_prompt_template = self._load_prompt_template(
171            "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
172        )
173        self.ai_provider = self._create_ai_provider()
174        self.model_info = self._read_model_info()

Initialize the forensic analyzer with case context and configuration.

Arguments:
  • case_dir: Path to the case directory, or a mapping of artifact keys to CSV paths (convenience shorthand).
  • config: Application configuration dictionary.
  • audit_logger: Optional object with a log(action, details) method.
  • artifact_csv_paths: Mapping of artifact keys to CSV paths.
  • prompts_dir: Directory containing prompt template files.
  • random_seed: Optional seed for the internal RNG.
  • os_type: Detected operating system type ("windows", "linux", etc.). Controls which artifact instruction prompts are loaded.
case_dir
logger
config
audit_logger
artifact_csv_paths: dict[str, pathlib.Path | list[pathlib.Path]]
prompts_dir
os_type
artifact_ai_column_projections
system_prompt
artifact_prompt_template
artifact_prompt_template_small_context
artifact_instruction_prompts
summary_prompt_template
chunk_merge_prompt_template
ai_provider
model_info
def analyze_artifact( self, artifact_key: str, investigation_context: str, progress_callback: typing.Any | None = None) -> dict[str, typing.Any]:
752    def analyze_artifact(
753        self,
754        artifact_key: str,
755        investigation_context: str,
756        progress_callback: Any | None = None,
757    ) -> dict[str, Any]:
758        """Analyze a single artifact's CSV data and return AI findings.
759
760        Args:
761            artifact_key: Unique identifier for the artifact.
762            investigation_context: Free-text investigation context.
763            progress_callback: Optional callable for streaming progress.
764
765        Returns:
766            A dict with ``artifact_key``, ``artifact_name``, ``analysis``,
767            ``model``, and optionally ``citation_warnings``.
768        """
769        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
770        artifact_name = artifact_metadata.get("name", artifact_key)
771        model = self.model_info.get("model", "unknown")
772        provider = self.model_info.get("provider", "unknown")
773
774        self._audit_log("analysis_started", {
775            "artifact_key": artifact_key, "artifact_name": artifact_name,
776            "provider": provider, "model": model,
777        })
778
779        start_time = perf_counter()
780        try:
781            all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key)
782            if len(all_csv_paths) > 1:
783                csv_path = self._combine_csv_files(artifact_key, all_csv_paths)
784            else:
785                csv_path = all_csv_paths[0]
786            artifact_prompt = self._prepare_artifact_data(
787                artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path,
788            )
789            analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path)
790            attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)]
791
792            safe_key = sanitize_filename(artifact_key)
793            self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt)
794
795            prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt)
796            if prompt_tokens_estimate > self.ai_max_tokens:
797                self.logger.info(
798                    "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.",
799                    artifact_key, prompt_tokens_estimate, self.ai_max_tokens,
800                )
801                if progress_callback is not None:
802                    emit_analysis_progress(progress_callback, artifact_key, "started", {
803                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
804                    })
805                analysis_text = analyze_artifact_chunked(
806                    artifact_prompt=artifact_prompt,
807                    artifact_key=artifact_key,
808                    artifact_name=artifact_name,
809                    investigation_context=investigation_context,
810                    model=model,
811                    system_prompt=self.system_prompt,
812                    ai_response_max_tokens=self.ai_response_max_tokens,
813                    chunk_csv_budget=self.chunk_csv_budget,
814                    chunk_merge_prompt_template=self.chunk_merge_prompt_template,
815                    max_merge_rounds=self.max_merge_rounds,
816                    call_ai_with_retry_fn=self._call_ai_with_retry,
817                    ai_provider=self.ai_provider,
818                    audit_log_fn=self._audit_log,
819                    save_case_prompt_fn=self._save_case_prompt,
820                    progress_callback=progress_callback,
821                )
822                duration_seconds = perf_counter() - start_time
823                self._audit_log("analysis_completed", {
824                    "artifact_key": artifact_key, "artifact_name": artifact_name,
825                    "token_count": self._estimate_tokens(analysis_text),
826                    "duration_seconds": round(duration_seconds, 6),
827                    "status": "success", "chunked": True,
828                })
829                citation_warnings = self._validate_citations(artifact_key, analysis_text)
830                result: dict[str, Any] = {
831                    "artifact_key": artifact_key, "artifact_name": artifact_name,
832                    "analysis": analysis_text, "model": model,
833                }
834                if citation_warnings:
835                    result["citation_warnings"] = citation_warnings
836                return result
837
838            analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None)
839            if callable(analyze_with_progress) and progress_callback is not None:
840                emit_analysis_progress(progress_callback, artifact_key, "started", {
841                    "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
842                })
843
844                def _provider_progress(payload: Mapping[str, Any]) -> None:
845                    """Forward provider progress to the frontend."""
846                    if not isinstance(payload, Mapping):
847                        return
848                    emit_analysis_progress(progress_callback, artifact_key, "thinking", {
849                        "artifact_key": artifact_key, "artifact_name": artifact_name,
850                        "thinking_text": str(payload.get("thinking_text", "")),
851                        "partial_text": str(payload.get("partial_text", "")),
852                        "model": model,
853                    })
854
855                try:
856                    analysis_text = analyze_with_progress(
857                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
858                        progress_callback=_provider_progress, attachments=attachments,
859                        max_tokens=self.ai_response_max_tokens,
860                    )
861                except TypeError:
862                    analysis_text = analyze_with_progress(
863                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
864                        progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens,
865                    )
866            else:
867                if progress_callback is not None:
868                    emit_analysis_progress(progress_callback, artifact_key, "started", {
869                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
870                    })
871                analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None)
872                if callable(analyze_with_attachments):
873                    analysis_text = self._call_ai_with_retry(
874                        lambda: analyze_with_attachments(
875                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
876                            attachments=attachments, max_tokens=self.ai_response_max_tokens,
877                        )
878                    )
879                else:
880                    analysis_text = self._call_ai_with_retry(
881                        lambda: self.ai_provider.analyze(
882                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
883                            max_tokens=self.ai_response_max_tokens,
884                        )
885                    )
886            duration_seconds = perf_counter() - start_time
887            self._audit_log("analysis_completed", {
888                "artifact_key": artifact_key, "artifact_name": artifact_name,
889                "token_count": self._estimate_tokens(analysis_text),
890                "duration_seconds": round(duration_seconds, 6), "status": "success",
891            })
892        except Exception as error:
893            duration_seconds = perf_counter() - start_time
894            analysis_text = f"Analysis failed: {error}"
895            self._audit_log("analysis_completed", {
896                "artifact_key": artifact_key, "artifact_name": artifact_name,
897                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
898                "status": "failed", "error": str(error),
899            })
900
901        citation_warnings = self._validate_citations(artifact_key, analysis_text)
902
903        result = {
904            "artifact_key": artifact_key, "artifact_name": artifact_name,
905            "analysis": analysis_text, "model": model,
906        }
907        if citation_warnings:
908            result["citation_warnings"] = citation_warnings
909        return result

Analyze a single artifact's CSV data and return AI findings.

Arguments:
  • artifact_key: Unique identifier for the artifact.
  • investigation_context: Free-text investigation context.
  • progress_callback: Optional callable for streaming progress.
Returns:

A dict with artifact_key, artifact_name, analysis, model, and optionally citation_warnings.

def generate_summary( self, per_artifact_results: list[typing.Mapping[str, typing.Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]]) -> str:
911    def generate_summary(
912        self,
913        per_artifact_results: list[Mapping[str, Any]],
914        investigation_context: str,
915        metadata: Mapping[str, Any] | None,
916    ) -> str:
917        """Generate a cross-artifact summary by correlating findings.
918
919        Args:
920            per_artifact_results: List of per-artifact result dicts.
921            investigation_context: The user's investigation context.
922            metadata: Optional host metadata mapping.
923
924        Returns:
925            The AI-generated summary text, or an error message.
926        """
927        metadata_map = metadata if isinstance(metadata, Mapping) else {}
928        summary_prompt = build_summary_prompt(
929            summary_prompt_template=self.summary_prompt_template,
930            investigation_context=investigation_context,
931            per_artifact_results=per_artifact_results,
932            metadata_map=metadata_map,
933        )
934
935        model = self.model_info.get("model", "unknown")
936        provider = self.model_info.get("provider", "unknown")
937        summary_artifact_key = "cross_artifact_summary"
938        summary_artifact_name = "Cross-Artifact Summary"
939        summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md"
940
941        self._audit_log("analysis_started", {
942            "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
943            "provider": provider, "model": model,
944        })
945        self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt)
946
947        start_time = perf_counter()
948        try:
949            summary = self._call_ai_with_retry(
950                lambda: self.ai_provider.analyze(
951                    system_prompt=self.system_prompt, user_prompt=summary_prompt,
952                    max_tokens=self.ai_response_max_tokens,
953                )
954            )
955            duration_seconds = perf_counter() - start_time
956            self._audit_log("analysis_completed", {
957                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
958                "token_count": self._estimate_tokens(summary),
959                "duration_seconds": round(duration_seconds, 6), "status": "success",
960            })
961            return summary
962        except Exception as error:
963            duration_seconds = perf_counter() - start_time
964            summary = f"Analysis failed: {error}"
965            self._audit_log("analysis_completed", {
966                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
967                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
968                "status": "failed", "error": str(error),
969            })
970            return summary

Generate a cross-artifact summary by correlating findings.

Arguments:
  • per_artifact_results: List of per-artifact result dicts.
  • investigation_context: The user's investigation context.
  • metadata: Optional host metadata mapping.
Returns:

The AI-generated summary text, or an error message.

def run_full_analysis( self, artifact_keys: Iterable[str], investigation_context: str, metadata: Optional[Mapping[str, Any]], progress_callback: typing.Any | None = None, cancel_check: typing.Any | None = None) -> dict[str, typing.Any]:
 972    def run_full_analysis(
 973        self,
 974        artifact_keys: Iterable[str],
 975        investigation_context: str,
 976        metadata: Mapping[str, Any] | None,
 977        progress_callback: Any | None = None,
 978        cancel_check: Any | None = None,
 979    ) -> dict[str, Any]:
 980        """Run the complete analysis pipeline: per-artifact then summary.
 981
 982        Args:
 983            artifact_keys: Iterable of artifact key strings.
 984            investigation_context: The user's investigation context.
 985            metadata: Optional metadata mapping.
 986            progress_callback: Optional callable for streaming progress.
 987            cancel_check: Optional callable returning ``True`` when the
 988                analysis should be aborted early.
 989
 990        Returns:
 991            A dict with ``per_artifact``, ``summary``, and ``model_info``.
 992
 993        Raises:
 994            AnalysisCancelledError: If *cancel_check* returns ``True``.
 995        """
 996        if isinstance(self.ai_provider, UnavailableProvider):
 997            raise AIProviderError(self.ai_provider._error_message)
 998
 999        self._register_artifact_paths_from_metadata(metadata)
1000        per_artifact_results: list[dict[str, Any]] = []
1001        for artifact_key in artifact_keys:
1002            if cancel_check is not None and cancel_check():
1003                raise AnalysisCancelledError("Analysis cancelled by user.")
1004            result = self.analyze_artifact(
1005                artifact_key=str(artifact_key),
1006                investigation_context=investigation_context,
1007                progress_callback=progress_callback,
1008            )
1009            per_artifact_results.append(result)
1010            if progress_callback is not None:
1011                emit_analysis_progress(progress_callback, str(artifact_key), "complete", result)
1012
1013        summary = self.generate_summary(
1014            per_artifact_results=per_artifact_results,
1015            investigation_context=investigation_context,
1016            metadata=metadata,
1017        )
1018        return {
1019            "per_artifact": per_artifact_results,
1020            "summary": summary,
1021            "model_info": dict(self.model_info),
1022        }

Run the complete analysis pipeline: per-artifact then summary.

Arguments:
  • artifact_keys: Iterable of artifact key strings.
  • investigation_context: The user's investigation context.
  • metadata: Optional metadata mapping.
  • progress_callback: Optional callable for streaming progress.
  • cancel_check: Optional callable returning True when the analysis should be aborted early.
Returns:

A dict with per_artifact, summary, and model_info.

Raises:
  • AnalysisCancelledError: If cancel_check returns True.