app.analyzer

AI analysis orchestration package for forensic triage.

Provides the ForensicAnalyzer class and supporting sub-modules for token budgeting, date filtering, column projection, deduplication, chunked analysis, citation validation, IOC extraction, and audit logging.

Sub-modules:

  • constants: Compile-time constants, regex, prompt templates.
  • utils: Pure utility functions (string, datetime, CSV, config readers).
  • ioc: IOC extraction and prompt-building helpers.
  • citations: Citation validation against source CSV.
  • data_prep: Date filtering, dedup, statistics, prompt assembly.
  • chunking: Chunked analysis and hierarchical merge.
  • prompts: Prompt template loading and construction.
  • core: The ForensicAnalyzer class itself.
 1"""AI analysis orchestration package for forensic triage.
 2
 3Provides the ``ForensicAnalyzer`` class and supporting sub-modules for
 4token budgeting, date filtering, column projection, deduplication,
 5chunked analysis, citation validation, IOC extraction, and audit logging.
 6
 7Sub-modules:
 8
 9- ``constants``: Compile-time constants, regex, prompt templates.
10- ``utils``: Pure utility functions (string, datetime, CSV, config readers).
11- ``ioc``: IOC extraction and prompt-building helpers.
12- ``citations``: Citation validation against source CSV.
13- ``data_prep``: Date filtering, dedup, statistics, prompt assembly.
14- ``chunking``: Chunked analysis and hierarchical merge.
15- ``prompts``: Prompt template loading and construction.
16- ``core``: The ``ForensicAnalyzer`` class itself.
17"""
18
19from __future__ import annotations
20
21from .constants import PROJECT_ROOT
22from .core import ForensicAnalyzer
23
24__all__ = [
25    "ForensicAnalyzer",
26    "PROJECT_ROOT",
27    "chunking",
28    "citations",
29    "constants",
30    "core",
31    "data_prep",
32    "ioc",
33    "prompts",
34    "utils",
35]
class ForensicAnalyzer:
  78class ForensicAnalyzer:
  79    """Orchestrates AI-powered forensic analysis of parsed artifact CSV data.
  80
  81    Central analysis engine for AIFT: reads parsed artifact CSV files, applies
  82    column projection, and deduplication, builds token-budgeted
  83    prompts, sends them to a configured AI provider, and validates citations.
  84
  85    Attributes:
  86        case_dir: Path to the case directory, or ``None``.
  87        config: Merged configuration dictionary.
  88        ai_provider: The configured AI provider instance.
  89        model_info: Dict with ``provider`` and ``model`` keys.
  90    """
  91
  92    # Expose extracted functions as static methods for backward compatibility
  93    # with tests and callers that use ForensicAnalyzer._method_name().
  94    _stringify_value = staticmethod(stringify_value)
  95    _build_datetime = staticmethod(build_datetime)
  96    _normalize_artifact_key = staticmethod(normalize_artifact_key)
  97    _sanitize_filename = staticmethod(sanitize_filename)
  98    _split_csv_into_chunks = staticmethod(split_csv_into_chunks)
  99    _split_csv_and_suffix = staticmethod(split_csv_and_suffix)
 100    _coerce_projection_columns = staticmethod(coerce_projection_columns)
 101    _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column)
 102    _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys)
 103    _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv)
 104    _match_column_name = staticmethod(match_column_name)
 105    _emit_analysis_progress = staticmethod(emit_analysis_progress)
 106
 107    def __init__(
 108        self,
 109        case_dir: str | Path | Mapping[str, str | Path] | None = None,
 110        config: Mapping[str, Any] | None = None,
 111        audit_logger: Any | None = None,
 112        artifact_csv_paths: Mapping[str, str | Path] | None = None,
 113        prompts_dir: str | Path | None = None,
 114        random_seed: int | None = None,
 115        os_type: str = "windows",
 116    ) -> None:
 117        """Initialize the forensic analyzer with case context and configuration.
 118
 119        Args:
 120            case_dir: Path to the case directory, or a mapping of artifact
 121                keys to CSV paths (convenience shorthand).
 122            config: Application configuration dictionary.
 123            audit_logger: Optional object with a ``log(action, details)``
 124                method.
 125            artifact_csv_paths: Mapping of artifact keys to CSV paths.
 126            prompts_dir: Directory containing prompt template files.
 127            random_seed: Optional seed for the internal RNG.
 128            os_type: Detected operating system type (``"windows"``,
 129                ``"linux"``, etc.).  Controls which artifact instruction
 130                prompts are loaded.
 131        """
 132        if (
 133            isinstance(case_dir, Mapping)
 134            and config is None
 135            and audit_logger is None
 136            and artifact_csv_paths is None
 137        ):
 138            artifact_csv_paths = case_dir
 139            case_dir = None
 140
 141        self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None
 142        self.logger = LOGGER
 143        self.config = dict(config) if isinstance(config, Mapping) else {}
 144        self.audit_logger = audit_logger
 145        self.artifact_csv_paths: dict[str, Path | list[Path]] = {}
 146        for artifact_key, csv_path in (artifact_csv_paths or {}).items():
 147            key = str(artifact_key)
 148            if isinstance(csv_path, list):
 149                self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path]
 150            else:
 151                self.artifact_csv_paths[key] = Path(str(csv_path))
 152        self._analysis_input_csv_paths: dict[str, Path] = {}
 153        self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts"
 154        self.os_type = normalize_os_type(os_type)
 155        import random
 156        self._random = random.Random(random_seed)
 157        self._load_analysis_settings()
 158        self.artifact_ai_column_projections = self._load_artifact_ai_column_projections()
 159        self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT)
 160        self.artifact_prompt_template = self._load_prompt_template(
 161            "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
 162        )
 163        self.artifact_prompt_template_small_context = self._load_prompt_template(
 164            "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT,
 165        )
 166        self.artifact_instruction_prompts = self._load_artifact_instruction_prompts()
 167        self.summary_prompt_template = self._load_prompt_template(
 168            "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE,
 169        )
 170        self.chunk_merge_prompt_template = self._load_prompt_template(
 171            "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
 172        )
 173        self.ai_provider = self._create_ai_provider()
 174        self.model_info = self._read_model_info()
 175
 176    # ------------------------------------------------------------------
 177    # Configuration loading
 178    # ------------------------------------------------------------------
 179
 180    def _load_analysis_settings(self) -> None:
 181        """Load and validate analysis tuning parameters from the config dict."""
 182        analysis_config = self.config.get("analysis")
 183        if not isinstance(analysis_config, Mapping):
 184            analysis_config = {}
 185
 186        self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1)
 187        self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2))
 188        legacy_shortened = read_int_setting(
 189            analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1,
 190        )
 191        self.shortened_prompt_cutoff_tokens = read_int_setting(
 192            analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1,
 193        )
 194        self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6)
 195        self.citation_spot_check_limit = read_int_setting(
 196            analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1,
 197        )
 198        self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1)
 199        self.artifact_deduplication_enabled = read_bool_setting(
 200            analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED,
 201        )
 202        self.artifact_ai_columns_config_path = read_path_setting(
 203            analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH),
 204        )
 205
 206    # Backward-compatible aliases for the extracted config readers.
 207    _read_int_setting = staticmethod(read_int_setting)
 208    _read_bool_setting = staticmethod(read_bool_setting)
 209    _read_path_setting = staticmethod(read_path_setting)
 210
 211    def _resolve_artifact_ai_columns_config_path(self) -> Path:
 212        """Resolve the artifact AI columns config path to an absolute Path.
 213
 214        Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`.
 215
 216        Returns:
 217            Resolved absolute ``Path`` to the YAML config file.
 218        """
 219        return resolve_artifact_ai_columns_config_path(
 220            self.artifact_ai_columns_config_path, self.case_dir,
 221        )
 222
 223    def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]:
 224        """Load per-artifact column projection configuration from YAML.
 225
 226        Delegates to :func:`prompts.load_artifact_ai_column_projections`.
 227
 228        Returns:
 229            A dict mapping normalized artifact keys to tuples of column names.
 230        """
 231        config_path = self._resolve_artifact_ai_columns_config_path()
 232        return load_artifact_ai_column_projections(config_path, os_type=self.os_type)
 233
 234    def _load_prompt_template(self, filename: str, default: str) -> str:
 235        """Read a prompt template file from the prompts directory.
 236
 237        Delegates to :func:`prompts.load_prompt_template`.
 238
 239        Args:
 240            filename: Name of the template file.
 241            default: Fallback template string.
 242
 243        Returns:
 244            The template text.
 245        """
 246        return load_prompt_template(self.prompts_dir, filename, default)
 247
 248    def _load_artifact_instruction_prompts(self) -> dict[str, str]:
 249        """Load per-artifact analysis instruction prompts.
 250
 251        Delegates to :func:`prompts.load_artifact_instruction_prompts`,
 252        passing :attr:`os_type` so the correct OS-specific instruction
 253        directory is selected.
 254
 255        Returns:
 256            A dict mapping artifact keys to instruction prompt text.
 257        """
 258        return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type)
 259
 260    # ------------------------------------------------------------------
 261    # AI provider
 262    # ------------------------------------------------------------------
 263
 264    def _create_ai_provider(self) -> Any:
 265        """Instantiate the configured AI provider, or a fallback on failure.
 266
 267        Returns:
 268            An AI provider instance, or an ``UnavailableProvider``.
 269        """
 270        provider_config: Mapping[str, Any]
 271        if self.config:
 272            provider_config = self.config
 273        else:
 274            provider_config = {
 275                "ai": {
 276                    "provider": "local",
 277                    "local": {
 278                        "base_url": "http://localhost:11434/v1",
 279                        "model": "llama3.1:70b",
 280                        "api_key": "not-needed",
 281                    },
 282                }
 283            }
 284        try:
 285            return create_provider(dict(provider_config))
 286        except Exception as error:
 287            return UnavailableProvider(str(error))
 288
 289    def _read_model_info(self) -> dict[str, str]:
 290        """Read provider and model metadata from the AI provider.
 291
 292        Returns:
 293            A dict with at least ``provider`` and ``model`` keys.
 294        """
 295        try:
 296            model_info = self.ai_provider.get_model_info()
 297        except Exception:
 298            return {"provider": "unknown", "model": "unknown"}
 299
 300        if not isinstance(model_info, Mapping):
 301            return {"provider": "unknown", "model": "unknown"}
 302
 303        return {str(key): str(value) for key, value in model_info.items()}
 304
 305    def _call_ai_with_retry(self, call: Callable[[], str]) -> str:
 306        """Call the AI provider with retry on transient failures.
 307
 308        Args:
 309            call: A zero-argument callable that invokes the AI provider.
 310
 311        Returns:
 312            The AI provider's response string.
 313
 314        Raises:
 315            AIProviderError: If the provider raises a permanent error.
 316            Exception: The last transient error after all retries.
 317        """
 318        last_error: Exception | None = None
 319        for attempt in range(AI_RETRY_ATTEMPTS):
 320            try:
 321                return call()
 322            except AIProviderError:
 323                raise
 324            except Exception as error:
 325                last_error = error
 326                if attempt < AI_RETRY_ATTEMPTS - 1:
 327                    delay = AI_RETRY_BASE_DELAY * (2 ** attempt)
 328                    self.logger.warning(
 329                        "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s",
 330                        attempt + 1, AI_RETRY_ATTEMPTS, delay, error,
 331                    )
 332                    sleep(delay)
 333        raise last_error  # type: ignore[misc]
 334
 335    # ------------------------------------------------------------------
 336    # Audit / prompt saving
 337    # ------------------------------------------------------------------
 338
 339    def _audit_log(self, action: str, details: dict[str, Any]) -> None:
 340        """Write an entry to the forensic audit trail.
 341
 342        Args:
 343            action: The audit action name.
 344            details: Key-value details for the audit entry.
 345        """
 346        if self.audit_logger is None:
 347            return
 348        logger = getattr(self.audit_logger, "log", None)
 349        if not callable(logger):
 350            return
 351        try:
 352            logger(action, details)
 353        except Exception:
 354            return
 355
 356    def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None:
 357        """Save a prompt to the case prompts directory for audit.
 358
 359        Args:
 360            filename: Output filename.
 361            system_prompt: The system prompt text.
 362            user_prompt: The user prompt text.
 363        """
 364        if self.case_dir is None:
 365            return
 366        prompts_dir = self.case_dir / "prompts"
 367        try:
 368            prompts_dir.mkdir(parents=True, exist_ok=True)
 369            prompt_path = prompts_dir / filename
 370            prompt_path.write_text(
 371                f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n",
 372                encoding="utf-8",
 373            )
 374        except OSError:
 375            self.logger.warning("Failed to save prompt to %s", prompts_dir / filename)
 376
 377    # ------------------------------------------------------------------
 378    # Delegation methods — thin wrappers for backward compatibility.
 379    # Methods that don't use self are exposed as staticmethod assignments
 380    # at the class level (see above).  The methods below need self.
 381    # ------------------------------------------------------------------
 382
 383    def _estimate_tokens(self, text: str) -> int:
 384        """Estimate the token count of *text* using model-specific info."""
 385        return estimate_tokens(text, model_info=self.model_info)
 386
 387    # These are also exposed as staticmethods on the class (see above)
 388    # but tests may call them on instances, so they work either way.
 389    _extract_ioc_targets = staticmethod(extract_ioc_targets)
 390    _format_ioc_targets = staticmethod(format_ioc_targets)
 391    _build_priority_directives = staticmethod(build_priority_directives)
 392    _compute_statistics = staticmethod(compute_statistics)
 393    _build_full_data_csv = staticmethod(build_full_data_csv)
 394    _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis)
 395
 396    def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]:
 397        """Spot-check AI-cited values against source CSV.
 398
 399        Args:
 400            artifact_key: Artifact identifier.
 401            analysis_text: The AI's analysis text.
 402
 403        Returns:
 404            List of warning strings.
 405        """
 406        if analysis_text.startswith("Analysis failed:"):
 407            return []
 408        try:
 409            original_path = self._resolve_artifact_csv_path(artifact_key)
 410        except FileNotFoundError:
 411            return []
 412        csv_path = self._resolve_analysis_input_csv_path(
 413            artifact_key, fallback=original_path,
 414        )
 415        return validate_citations(
 416            artifact_key=artifact_key,
 417            analysis_text=analysis_text,
 418            csv_path=csv_path,
 419            citation_spot_check_limit=self.citation_spot_check_limit,
 420            audit_log_fn=self._audit_log,
 421        )
 422
 423    # ------------------------------------------------------------------
 424    # Path resolution
 425    # ------------------------------------------------------------------
 426
 427    def _resolve_artifact_csv_path(self, artifact_key: str) -> Path:
 428        """Resolve the CSV file path for a given artifact key.
 429
 430        For split artifacts with multiple CSV files, returns the first
 431        path.  Use :meth:`_resolve_all_artifact_csv_paths` to get every
 432        path for a split artifact.
 433
 434        Args:
 435            artifact_key: Artifact identifier to resolve.
 436
 437        Returns:
 438            A ``Path`` to the artifact's CSV file.
 439
 440        Raises:
 441            FileNotFoundError: If no CSV path can be found.
 442        """
 443        mapped = self.artifact_csv_paths.get(artifact_key)
 444        if mapped is not None:
 445            if isinstance(mapped, list):
 446                return mapped[0]
 447            return mapped
 448
 449        normalized = normalize_artifact_key(artifact_key)
 450        mapped_normalized = self.artifact_csv_paths.get(normalized)
 451        if mapped_normalized is not None:
 452            if isinstance(mapped_normalized, list):
 453                return mapped_normalized[0]
 454            return mapped_normalized
 455
 456        candidate_path = Path(artifact_key)
 457        if candidate_path.exists():
 458            return candidate_path
 459
 460        if self.case_dir is not None:
 461            parsed_dir = self.case_dir / "parsed"
 462            if parsed_dir.exists():
 463                normalized = normalize_artifact_key(artifact_key)
 464                file_stubs = {
 465                    artifact_key, normalized,
 466                    sanitize_filename(artifact_key),
 467                    sanitize_filename(normalized),
 468                }
 469                for file_stub in file_stubs:
 470                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 471                    if direct_csv_path.exists():
 472                        return direct_csv_path
 473                for file_stub in file_stubs:
 474                    prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv"))
 475                    if prefixed_paths:
 476                        return prefixed_paths[0]
 477
 478        raise FileNotFoundError(
 479            f"No CSV path mapped for artifact '{artifact_key}'. "
 480            "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths."
 481        )
 482
 483    def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]:
 484        """Resolve all CSV file paths for a given artifact key.
 485
 486        For single-file artifacts returns a one-element list.  For split
 487        artifacts (e.g. EVTX) returns all constituent CSV paths.
 488
 489        Args:
 490            artifact_key: Artifact identifier to resolve.
 491
 492        Returns:
 493            A non-empty list of ``Path`` objects.
 494
 495        Raises:
 496            FileNotFoundError: If no CSV path can be found.
 497        """
 498        for key in (artifact_key, normalize_artifact_key(artifact_key)):
 499            mapped = self.artifact_csv_paths.get(key)
 500            if mapped is not None:
 501                if isinstance(mapped, list):
 502                    return list(mapped)
 503                return [mapped]
 504
 505        # Filesystem fallback: search case_dir/parsed for all matching parts.
 506        if self.case_dir is not None:
 507            parsed_dir = self.case_dir / "parsed"
 508            if parsed_dir.exists():
 509                normalized = normalize_artifact_key(artifact_key)
 510                file_stubs = {
 511                    artifact_key, normalized,
 512                    sanitize_filename(artifact_key),
 513                    sanitize_filename(normalized),
 514                }
 515                for file_stub in file_stubs:
 516                    direct_csv_path = parsed_dir / f"{file_stub}.csv"
 517                    combined_csv_path = parsed_dir / f"{file_stub}_combined.csv"
 518                    prefixed_paths = sorted(
 519                        path
 520                        for path in parsed_dir.glob(f"{file_stub}_*.csv")
 521                        if path != combined_csv_path
 522                    )
 523                    if direct_csv_path.exists() and prefixed_paths:
 524                        return sorted([direct_csv_path] + prefixed_paths)
 525                    if prefixed_paths:
 526                        return prefixed_paths
 527                    if direct_csv_path.exists():
 528                        return [direct_csv_path]
 529
 530        # Final fallback: delegate to single-path resolver.
 531        return [self._resolve_artifact_csv_path(artifact_key)]
 532
 533    def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path:
 534        """Concatenate multiple CSV files into a single combined CSV.
 535
 536        All input files are assumed to share the same schema (column names).
 537        The combined file is written to the case's ``parsed/`` directory (or
 538        next to the first input file) with a ``_combined`` suffix.
 539
 540        Args:
 541            artifact_key: Artifact identifier (used for the output filename).
 542            csv_paths: List of CSV file paths to combine.
 543
 544        Returns:
 545            Path to the combined CSV file.
 546        """
 547        import csv as csv_mod
 548
 549        output_dir = csv_paths[0].parent
 550        safe_key = sanitize_filename(artifact_key)
 551        combined_path = output_dir / f"{safe_key}_combined.csv"
 552
 553        fieldnames: list[str] = []
 554        fieldnames_set: set[str] = set()
 555
 556        for csv_path in csv_paths:
 557            if not csv_path.exists():
 558                continue
 559            with csv_path.open("r", newline="", encoding="utf-8") as fh:
 560                reader = csv_mod.DictReader(fh)
 561                if reader.fieldnames:
 562                    for fn in reader.fieldnames:
 563                        if fn not in fieldnames_set:
 564                            fieldnames.append(fn)
 565                            fieldnames_set.add(fn)
 566
 567        with combined_path.open("w", newline="", encoding="utf-8") as out:
 568            writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore")
 569            writer.writeheader()
 570            for csv_path in csv_paths:
 571                if not csv_path.exists():
 572                    continue
 573                with csv_path.open("r", newline="", encoding="utf-8") as fh:
 574                    reader = csv_mod.DictReader(fh)
 575                    for row in reader:
 576                        writer.writerow(row)
 577
 578        return combined_path
 579
 580    def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None:
 581        """Store the analysis-input CSV path for an artifact.
 582
 583        Args:
 584            artifact_key: Artifact identifier.
 585            csv_path: Path to the analysis-input CSV.
 586        """
 587        self._analysis_input_csv_paths[artifact_key] = csv_path
 588        normalized = normalize_artifact_key(artifact_key)
 589        self._analysis_input_csv_paths[normalized] = csv_path
 590
 591    def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path:
 592        """Retrieve the analysis-input CSV path, with fallback.
 593
 594        Args:
 595            artifact_key: Artifact identifier.
 596            fallback: Default path if not stored.
 597
 598        Returns:
 599            The stored analysis-input CSV path, or *fallback*.
 600        """
 601        mapped = self._analysis_input_csv_paths.get(artifact_key)
 602        if mapped is not None:
 603            return mapped
 604        normalized = normalize_artifact_key(artifact_key)
 605        mapped = self._analysis_input_csv_paths.get(normalized)
 606        if mapped is not None:
 607            return mapped
 608        return fallback
 609
 610    def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]:
 611        """Look up artifact metadata from the OS-appropriate registry first.
 612
 613        Searches the registry matching :attr:`os_type` first so that
 614        shared keys like ``services`` resolve to the correct OS-specific
 615        entry.  Falls back to the other registry for cross-OS lookups.
 616
 617        Args:
 618            artifact_key: Artifact identifier.
 619
 620        Returns:
 621            A dict with at least ``name``, ``description``, and
 622            ``analysis_hint`` keys.
 623        """
 624        if self.os_type == "linux":
 625            registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY)
 626        else:
 627            registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY)
 628
 629        for registry in registries:
 630            if artifact_key in registry:
 631                metadata = registry[artifact_key]
 632                return {str(key): str(value) for key, value in metadata.items()}
 633
 634        normalized = normalize_artifact_key(artifact_key)
 635        for registry in registries:
 636            if normalized in registry:
 637                metadata = registry[normalized]
 638                return {str(key): str(value) for key, value in metadata.items()}
 639
 640        return {
 641            "name": artifact_key,
 642            "description": "No artifact description available.",
 643            "analysis_hint": "No specific analysis guidance is available for this artifact.",
 644        }
 645
 646    # ------------------------------------------------------------------
 647    # Metadata registration
 648    # ------------------------------------------------------------------
 649
 650    def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None:
 651        """Extract and register artifact CSV paths from run metadata.
 652
 653        Args:
 654            metadata: Optional metadata mapping.
 655        """
 656        if not isinstance(metadata, Mapping):
 657            return
 658
 659        artifact_csv_paths = metadata.get("artifact_csv_paths")
 660        if isinstance(artifact_csv_paths, Mapping):
 661            for artifact_key, csv_path in artifact_csv_paths.items():
 662                if isinstance(csv_path, list) and len(csv_path) > 1:
 663                    self.artifact_csv_paths[str(artifact_key)] = [
 664                        Path(str(p)) for p in csv_path
 665                    ]
 666                elif isinstance(csv_path, list) and csv_path:
 667                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0]))
 668                else:
 669                    self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 670
 671        for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"):
 672            container = metadata.get(container_key)
 673            if isinstance(container, Mapping):
 674                for artifact_key, value in container.items():
 675                    self._register_artifact_path_entry(artifact_key=artifact_key, value=value)
 676            elif isinstance(container, list):
 677                for item in container:
 678                    if isinstance(item, Mapping):
 679                        artifact_key = item.get("artifact_key") or item.get("key")
 680                        if artifact_key:
 681                            self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item)
 682
 683    def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None:
 684        """Register a single artifact CSV path from a metadata entry.
 685
 686        Args:
 687            artifact_key: Artifact identifier.
 688            value: Metadata entry (mapping, string, or Path).
 689        """
 690        if artifact_key in (None, ""):
 691            return
 692
 693        if isinstance(value, Mapping):
 694            csv_path = value.get("csv_path")
 695            csv_paths = value.get("csv_paths")
 696            if isinstance(csv_paths, list) and len(csv_paths) > 1:
 697                self.artifact_csv_paths[str(artifact_key)] = [
 698                    Path(str(p)) for p in csv_paths
 699                ]
 700                return
 701            if csv_path:
 702                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path))
 703                return
 704            if isinstance(csv_paths, list) and csv_paths:
 705                self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0]))
 706                return
 707
 708        if isinstance(value, (str, Path)):
 709            self.artifact_csv_paths[str(artifact_key)] = Path(str(value))
 710
 711    # ------------------------------------------------------------------
 712    # Core analysis pipeline
 713    # ------------------------------------------------------------------
 714
 715    def _prepare_artifact_data(
 716        self, artifact_key: str, investigation_context: str, csv_path: Path | None = None,
 717    ) -> str:
 718        """Prepare one artifact CSV as a bounded, analysis-ready prompt.
 719
 720        Args:
 721            artifact_key: Unique identifier for the artifact.
 722            investigation_context: Free-text investigation context.
 723            csv_path: Explicit path to the artifact CSV.
 724
 725        Returns:
 726            The fully rendered prompt string.
 727
 728        Raises:
 729            FileNotFoundError: If the artifact CSV cannot be located.
 730        """
 731        resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key)
 732        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 733
 734        prompt_text, analysis_csv_path, _ = prepare_artifact_data(
 735            artifact_key=artifact_key,
 736            investigation_context=investigation_context,
 737            csv_path=resolved_csv_path,
 738            artifact_metadata=artifact_metadata,
 739            artifact_prompt_template=self.artifact_prompt_template,
 740            artifact_prompt_template_small_context=self.artifact_prompt_template_small_context,
 741            artifact_instruction_prompts=self.artifact_instruction_prompts,
 742            artifact_ai_column_projections=self.artifact_ai_column_projections,
 743            artifact_deduplication_enabled=self.artifact_deduplication_enabled,
 744            ai_max_tokens=self.ai_max_tokens,
 745            shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens,
 746            case_dir=self.case_dir,
 747            audit_log_fn=self._audit_log,
 748        )
 749        self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path)
 750        return prompt_text
 751
 752    def analyze_artifact(
 753        self,
 754        artifact_key: str,
 755        investigation_context: str,
 756        progress_callback: Any | None = None,
 757    ) -> dict[str, Any]:
 758        """Analyze a single artifact's CSV data and return AI findings.
 759
 760        Args:
 761            artifact_key: Unique identifier for the artifact.
 762            investigation_context: Free-text investigation context.
 763            progress_callback: Optional callable for streaming progress.
 764
 765        Returns:
 766            A dict with ``artifact_key``, ``artifact_name``, ``analysis``,
 767            ``model``, and optionally ``citation_warnings``.
 768        """
 769        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
 770        artifact_name = artifact_metadata.get("name", artifact_key)
 771        model = self.model_info.get("model", "unknown")
 772        provider = self.model_info.get("provider", "unknown")
 773
 774        self._audit_log("analysis_started", {
 775            "artifact_key": artifact_key, "artifact_name": artifact_name,
 776            "provider": provider, "model": model,
 777        })
 778
 779        start_time = perf_counter()
 780        try:
 781            all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key)
 782            if len(all_csv_paths) > 1:
 783                csv_path = self._combine_csv_files(artifact_key, all_csv_paths)
 784            else:
 785                csv_path = all_csv_paths[0]
 786            artifact_prompt = self._prepare_artifact_data(
 787                artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path,
 788            )
 789            analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path)
 790            attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)]
 791
 792            safe_key = sanitize_filename(artifact_key)
 793            self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt)
 794
 795            prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt)
 796            if prompt_tokens_estimate > self.ai_max_tokens:
 797                self.logger.info(
 798                    "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.",
 799                    artifact_key, prompt_tokens_estimate, self.ai_max_tokens,
 800                )
 801                if progress_callback is not None:
 802                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 803                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 804                    })
 805                analysis_text = analyze_artifact_chunked(
 806                    artifact_prompt=artifact_prompt,
 807                    artifact_key=artifact_key,
 808                    artifact_name=artifact_name,
 809                    investigation_context=investigation_context,
 810                    model=model,
 811                    system_prompt=self.system_prompt,
 812                    ai_response_max_tokens=self.ai_response_max_tokens,
 813                    chunk_csv_budget=self.chunk_csv_budget,
 814                    chunk_merge_prompt_template=self.chunk_merge_prompt_template,
 815                    max_merge_rounds=self.max_merge_rounds,
 816                    call_ai_with_retry_fn=self._call_ai_with_retry,
 817                    ai_provider=self.ai_provider,
 818                    audit_log_fn=self._audit_log,
 819                    save_case_prompt_fn=self._save_case_prompt,
 820                    progress_callback=progress_callback,
 821                )
 822                duration_seconds = perf_counter() - start_time
 823                self._audit_log("analysis_completed", {
 824                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 825                    "token_count": self._estimate_tokens(analysis_text),
 826                    "duration_seconds": round(duration_seconds, 6),
 827                    "status": "success", "chunked": True,
 828                })
 829                citation_warnings = self._validate_citations(artifact_key, analysis_text)
 830                result: dict[str, Any] = {
 831                    "artifact_key": artifact_key, "artifact_name": artifact_name,
 832                    "analysis": analysis_text, "model": model,
 833                }
 834                if citation_warnings:
 835                    result["citation_warnings"] = citation_warnings
 836                return result
 837
 838            analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None)
 839            if callable(analyze_with_progress) and progress_callback is not None:
 840                emit_analysis_progress(progress_callback, artifact_key, "started", {
 841                    "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 842                })
 843
 844                def _provider_progress(payload: Mapping[str, Any]) -> None:
 845                    """Forward provider progress to the frontend."""
 846                    if not isinstance(payload, Mapping):
 847                        return
 848                    emit_analysis_progress(progress_callback, artifact_key, "thinking", {
 849                        "artifact_key": artifact_key, "artifact_name": artifact_name,
 850                        "thinking_text": str(payload.get("thinking_text", "")),
 851                        "partial_text": str(payload.get("partial_text", "")),
 852                        "model": model,
 853                    })
 854
 855                try:
 856                    analysis_text = analyze_with_progress(
 857                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 858                        progress_callback=_provider_progress, attachments=attachments,
 859                        max_tokens=self.ai_response_max_tokens,
 860                    )
 861                except TypeError:
 862                    analysis_text = analyze_with_progress(
 863                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 864                        progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens,
 865                    )
 866            else:
 867                if progress_callback is not None:
 868                    emit_analysis_progress(progress_callback, artifact_key, "started", {
 869                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
 870                    })
 871                analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None)
 872                if callable(analyze_with_attachments):
 873                    analysis_text = self._call_ai_with_retry(
 874                        lambda: analyze_with_attachments(
 875                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 876                            attachments=attachments, max_tokens=self.ai_response_max_tokens,
 877                        )
 878                    )
 879                else:
 880                    analysis_text = self._call_ai_with_retry(
 881                        lambda: self.ai_provider.analyze(
 882                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
 883                            max_tokens=self.ai_response_max_tokens,
 884                        )
 885                    )
 886            duration_seconds = perf_counter() - start_time
 887            self._audit_log("analysis_completed", {
 888                "artifact_key": artifact_key, "artifact_name": artifact_name,
 889                "token_count": self._estimate_tokens(analysis_text),
 890                "duration_seconds": round(duration_seconds, 6), "status": "success",
 891            })
 892        except Exception as error:
 893            duration_seconds = perf_counter() - start_time
 894            analysis_text = f"Analysis failed: {error}"
 895            self._audit_log("analysis_completed", {
 896                "artifact_key": artifact_key, "artifact_name": artifact_name,
 897                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 898                "status": "failed", "error": str(error),
 899            })
 900
 901        citation_warnings = self._validate_citations(artifact_key, analysis_text)
 902
 903        result = {
 904            "artifact_key": artifact_key, "artifact_name": artifact_name,
 905            "analysis": analysis_text, "model": model,
 906        }
 907        if citation_warnings:
 908            result["citation_warnings"] = citation_warnings
 909        return result
 910
 911    def generate_summary(
 912        self,
 913        per_artifact_results: list[Mapping[str, Any]],
 914        investigation_context: str,
 915        metadata: Mapping[str, Any] | None,
 916    ) -> str:
 917        """Generate a cross-artifact summary by correlating findings.
 918
 919        Args:
 920            per_artifact_results: List of per-artifact result dicts.
 921            investigation_context: The user's investigation context.
 922            metadata: Optional host metadata mapping.
 923
 924        Returns:
 925            The AI-generated summary text, or an error message.
 926        """
 927        metadata_map = metadata if isinstance(metadata, Mapping) else {}
 928        summary_prompt = build_summary_prompt(
 929            summary_prompt_template=self.summary_prompt_template,
 930            investigation_context=investigation_context,
 931            per_artifact_results=per_artifact_results,
 932            metadata_map=metadata_map,
 933        )
 934
 935        model = self.model_info.get("model", "unknown")
 936        provider = self.model_info.get("provider", "unknown")
 937        summary_artifact_key = "cross_artifact_summary"
 938        summary_artifact_name = "Cross-Artifact Summary"
 939        summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md"
 940
 941        self._audit_log("analysis_started", {
 942            "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 943            "provider": provider, "model": model,
 944        })
 945        self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt)
 946
 947        start_time = perf_counter()
 948        try:
 949            summary = self._call_ai_with_retry(
 950                lambda: self.ai_provider.analyze(
 951                    system_prompt=self.system_prompt, user_prompt=summary_prompt,
 952                    max_tokens=self.ai_response_max_tokens,
 953                )
 954            )
 955            duration_seconds = perf_counter() - start_time
 956            self._audit_log("analysis_completed", {
 957                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 958                "token_count": self._estimate_tokens(summary),
 959                "duration_seconds": round(duration_seconds, 6), "status": "success",
 960            })
 961            return summary
 962        except Exception as error:
 963            duration_seconds = perf_counter() - start_time
 964            summary = f"Analysis failed: {error}"
 965            self._audit_log("analysis_completed", {
 966                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
 967                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
 968                "status": "failed", "error": str(error),
 969            })
 970            return summary
 971
 972    def run_full_analysis(
 973        self,
 974        artifact_keys: Iterable[str],
 975        investigation_context: str,
 976        metadata: Mapping[str, Any] | None,
 977        progress_callback: Any | None = None,
 978        cancel_check: Any | None = None,
 979    ) -> dict[str, Any]:
 980        """Run the complete analysis pipeline: per-artifact then summary.
 981
 982        Args:
 983            artifact_keys: Iterable of artifact key strings.
 984            investigation_context: The user's investigation context.
 985            metadata: Optional metadata mapping.
 986            progress_callback: Optional callable for streaming progress.
 987            cancel_check: Optional callable returning ``True`` when the
 988                analysis should be aborted early.
 989
 990        Returns:
 991            A dict with ``per_artifact``, ``summary``, and ``model_info``.
 992
 993        Raises:
 994            AnalysisCancelledError: If *cancel_check* returns ``True``.
 995        """
 996        if isinstance(self.ai_provider, UnavailableProvider):
 997            raise AIProviderError(self.ai_provider._error_message)
 998
 999        self._register_artifact_paths_from_metadata(metadata)
1000        per_artifact_results: list[dict[str, Any]] = []
1001        for artifact_key in artifact_keys:
1002            if cancel_check is not None and cancel_check():
1003                raise AnalysisCancelledError("Analysis cancelled by user.")
1004            result = self.analyze_artifact(
1005                artifact_key=str(artifact_key),
1006                investigation_context=investigation_context,
1007                progress_callback=progress_callback,
1008            )
1009            per_artifact_results.append(result)
1010            if progress_callback is not None:
1011                emit_analysis_progress(progress_callback, str(artifact_key), "complete", result)
1012
1013        summary = self.generate_summary(
1014            per_artifact_results=per_artifact_results,
1015            investigation_context=investigation_context,
1016            metadata=metadata,
1017        )
1018        return {
1019            "per_artifact": per_artifact_results,
1020            "summary": summary,
1021            "model_info": dict(self.model_info),
1022        }

Orchestrates AI-powered forensic analysis of parsed artifact CSV data.

Central analysis engine for AIFT: reads parsed artifact CSV files, applies column projection, and deduplication, builds token-budgeted prompts, sends them to a configured AI provider, and validates citations.

Attributes:
  • case_dir: Path to the case directory, or None.
  • config: Merged configuration dictionary.
  • ai_provider: The configured AI provider instance.
  • model_info: Dict with provider and model keys.
ForensicAnalyzer( case_dir: Union[str, pathlib.Path, Mapping[str, str | pathlib.Path], NoneType] = None, config: Optional[Mapping[str, Any]] = None, audit_logger: typing.Any | None = None, artifact_csv_paths: Optional[Mapping[str, str | pathlib.Path]] = None, prompts_dir: str | pathlib.Path | None = None, random_seed: int | None = None, os_type: str = 'windows')
107    def __init__(
108        self,
109        case_dir: str | Path | Mapping[str, str | Path] | None = None,
110        config: Mapping[str, Any] | None = None,
111        audit_logger: Any | None = None,
112        artifact_csv_paths: Mapping[str, str | Path] | None = None,
113        prompts_dir: str | Path | None = None,
114        random_seed: int | None = None,
115        os_type: str = "windows",
116    ) -> None:
117        """Initialize the forensic analyzer with case context and configuration.
118
119        Args:
120            case_dir: Path to the case directory, or a mapping of artifact
121                keys to CSV paths (convenience shorthand).
122            config: Application configuration dictionary.
123            audit_logger: Optional object with a ``log(action, details)``
124                method.
125            artifact_csv_paths: Mapping of artifact keys to CSV paths.
126            prompts_dir: Directory containing prompt template files.
127            random_seed: Optional seed for the internal RNG.
128            os_type: Detected operating system type (``"windows"``,
129                ``"linux"``, etc.).  Controls which artifact instruction
130                prompts are loaded.
131        """
132        if (
133            isinstance(case_dir, Mapping)
134            and config is None
135            and audit_logger is None
136            and artifact_csv_paths is None
137        ):
138            artifact_csv_paths = case_dir
139            case_dir = None
140
141        self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None
142        self.logger = LOGGER
143        self.config = dict(config) if isinstance(config, Mapping) else {}
144        self.audit_logger = audit_logger
145        self.artifact_csv_paths: dict[str, Path | list[Path]] = {}
146        for artifact_key, csv_path in (artifact_csv_paths or {}).items():
147            key = str(artifact_key)
148            if isinstance(csv_path, list):
149                self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path]
150            else:
151                self.artifact_csv_paths[key] = Path(str(csv_path))
152        self._analysis_input_csv_paths: dict[str, Path] = {}
153        self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts"
154        self.os_type = normalize_os_type(os_type)
155        import random
156        self._random = random.Random(random_seed)
157        self._load_analysis_settings()
158        self.artifact_ai_column_projections = self._load_artifact_ai_column_projections()
159        self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT)
160        self.artifact_prompt_template = self._load_prompt_template(
161            "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE,
162        )
163        self.artifact_prompt_template_small_context = self._load_prompt_template(
164            "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT,
165        )
166        self.artifact_instruction_prompts = self._load_artifact_instruction_prompts()
167        self.summary_prompt_template = self._load_prompt_template(
168            "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE,
169        )
170        self.chunk_merge_prompt_template = self._load_prompt_template(
171            "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE,
172        )
173        self.ai_provider = self._create_ai_provider()
174        self.model_info = self._read_model_info()

Initialize the forensic analyzer with case context and configuration.

Arguments:
  • case_dir: Path to the case directory, or a mapping of artifact keys to CSV paths (convenience shorthand).
  • config: Application configuration dictionary.
  • audit_logger: Optional object with a log(action, details) method.
  • artifact_csv_paths: Mapping of artifact keys to CSV paths.
  • prompts_dir: Directory containing prompt template files.
  • random_seed: Optional seed for the internal RNG.
  • os_type: Detected operating system type ("windows", "linux", etc.). Controls which artifact instruction prompts are loaded.
case_dir
logger
config
audit_logger
artifact_csv_paths: dict[str, pathlib.Path | list[pathlib.Path]]
prompts_dir
os_type
artifact_ai_column_projections
system_prompt
artifact_prompt_template
artifact_prompt_template_small_context
artifact_instruction_prompts
summary_prompt_template
chunk_merge_prompt_template
ai_provider
model_info
def analyze_artifact( self, artifact_key: str, investigation_context: str, progress_callback: typing.Any | None = None) -> dict[str, typing.Any]:
752    def analyze_artifact(
753        self,
754        artifact_key: str,
755        investigation_context: str,
756        progress_callback: Any | None = None,
757    ) -> dict[str, Any]:
758        """Analyze a single artifact's CSV data and return AI findings.
759
760        Args:
761            artifact_key: Unique identifier for the artifact.
762            investigation_context: Free-text investigation context.
763            progress_callback: Optional callable for streaming progress.
764
765        Returns:
766            A dict with ``artifact_key``, ``artifact_name``, ``analysis``,
767            ``model``, and optionally ``citation_warnings``.
768        """
769        artifact_metadata = self._resolve_artifact_metadata(artifact_key)
770        artifact_name = artifact_metadata.get("name", artifact_key)
771        model = self.model_info.get("model", "unknown")
772        provider = self.model_info.get("provider", "unknown")
773
774        self._audit_log("analysis_started", {
775            "artifact_key": artifact_key, "artifact_name": artifact_name,
776            "provider": provider, "model": model,
777        })
778
779        start_time = perf_counter()
780        try:
781            all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key)
782            if len(all_csv_paths) > 1:
783                csv_path = self._combine_csv_files(artifact_key, all_csv_paths)
784            else:
785                csv_path = all_csv_paths[0]
786            artifact_prompt = self._prepare_artifact_data(
787                artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path,
788            )
789            analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path)
790            attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)]
791
792            safe_key = sanitize_filename(artifact_key)
793            self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt)
794
795            prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt)
796            if prompt_tokens_estimate > self.ai_max_tokens:
797                self.logger.info(
798                    "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.",
799                    artifact_key, prompt_tokens_estimate, self.ai_max_tokens,
800                )
801                if progress_callback is not None:
802                    emit_analysis_progress(progress_callback, artifact_key, "started", {
803                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
804                    })
805                analysis_text = analyze_artifact_chunked(
806                    artifact_prompt=artifact_prompt,
807                    artifact_key=artifact_key,
808                    artifact_name=artifact_name,
809                    investigation_context=investigation_context,
810                    model=model,
811                    system_prompt=self.system_prompt,
812                    ai_response_max_tokens=self.ai_response_max_tokens,
813                    chunk_csv_budget=self.chunk_csv_budget,
814                    chunk_merge_prompt_template=self.chunk_merge_prompt_template,
815                    max_merge_rounds=self.max_merge_rounds,
816                    call_ai_with_retry_fn=self._call_ai_with_retry,
817                    ai_provider=self.ai_provider,
818                    audit_log_fn=self._audit_log,
819                    save_case_prompt_fn=self._save_case_prompt,
820                    progress_callback=progress_callback,
821                )
822                duration_seconds = perf_counter() - start_time
823                self._audit_log("analysis_completed", {
824                    "artifact_key": artifact_key, "artifact_name": artifact_name,
825                    "token_count": self._estimate_tokens(analysis_text),
826                    "duration_seconds": round(duration_seconds, 6),
827                    "status": "success", "chunked": True,
828                })
829                citation_warnings = self._validate_citations(artifact_key, analysis_text)
830                result: dict[str, Any] = {
831                    "artifact_key": artifact_key, "artifact_name": artifact_name,
832                    "analysis": analysis_text, "model": model,
833                }
834                if citation_warnings:
835                    result["citation_warnings"] = citation_warnings
836                return result
837
838            analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None)
839            if callable(analyze_with_progress) and progress_callback is not None:
840                emit_analysis_progress(progress_callback, artifact_key, "started", {
841                    "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
842                })
843
844                def _provider_progress(payload: Mapping[str, Any]) -> None:
845                    """Forward provider progress to the frontend."""
846                    if not isinstance(payload, Mapping):
847                        return
848                    emit_analysis_progress(progress_callback, artifact_key, "thinking", {
849                        "artifact_key": artifact_key, "artifact_name": artifact_name,
850                        "thinking_text": str(payload.get("thinking_text", "")),
851                        "partial_text": str(payload.get("partial_text", "")),
852                        "model": model,
853                    })
854
855                try:
856                    analysis_text = analyze_with_progress(
857                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
858                        progress_callback=_provider_progress, attachments=attachments,
859                        max_tokens=self.ai_response_max_tokens,
860                    )
861                except TypeError:
862                    analysis_text = analyze_with_progress(
863                        system_prompt=self.system_prompt, user_prompt=artifact_prompt,
864                        progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens,
865                    )
866            else:
867                if progress_callback is not None:
868                    emit_analysis_progress(progress_callback, artifact_key, "started", {
869                        "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model,
870                    })
871                analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None)
872                if callable(analyze_with_attachments):
873                    analysis_text = self._call_ai_with_retry(
874                        lambda: analyze_with_attachments(
875                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
876                            attachments=attachments, max_tokens=self.ai_response_max_tokens,
877                        )
878                    )
879                else:
880                    analysis_text = self._call_ai_with_retry(
881                        lambda: self.ai_provider.analyze(
882                            system_prompt=self.system_prompt, user_prompt=artifact_prompt,
883                            max_tokens=self.ai_response_max_tokens,
884                        )
885                    )
886            duration_seconds = perf_counter() - start_time
887            self._audit_log("analysis_completed", {
888                "artifact_key": artifact_key, "artifact_name": artifact_name,
889                "token_count": self._estimate_tokens(analysis_text),
890                "duration_seconds": round(duration_seconds, 6), "status": "success",
891            })
892        except Exception as error:
893            duration_seconds = perf_counter() - start_time
894            analysis_text = f"Analysis failed: {error}"
895            self._audit_log("analysis_completed", {
896                "artifact_key": artifact_key, "artifact_name": artifact_name,
897                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
898                "status": "failed", "error": str(error),
899            })
900
901        citation_warnings = self._validate_citations(artifact_key, analysis_text)
902
903        result = {
904            "artifact_key": artifact_key, "artifact_name": artifact_name,
905            "analysis": analysis_text, "model": model,
906        }
907        if citation_warnings:
908            result["citation_warnings"] = citation_warnings
909        return result

Analyze a single artifact's CSV data and return AI findings.

Arguments:
  • artifact_key: Unique identifier for the artifact.
  • investigation_context: Free-text investigation context.
  • progress_callback: Optional callable for streaming progress.
Returns:

A dict with artifact_key, artifact_name, analysis, model, and optionally citation_warnings.

def generate_summary( self, per_artifact_results: list[typing.Mapping[str, typing.Any]], investigation_context: str, metadata: Optional[Mapping[str, Any]]) -> str:
911    def generate_summary(
912        self,
913        per_artifact_results: list[Mapping[str, Any]],
914        investigation_context: str,
915        metadata: Mapping[str, Any] | None,
916    ) -> str:
917        """Generate a cross-artifact summary by correlating findings.
918
919        Args:
920            per_artifact_results: List of per-artifact result dicts.
921            investigation_context: The user's investigation context.
922            metadata: Optional host metadata mapping.
923
924        Returns:
925            The AI-generated summary text, or an error message.
926        """
927        metadata_map = metadata if isinstance(metadata, Mapping) else {}
928        summary_prompt = build_summary_prompt(
929            summary_prompt_template=self.summary_prompt_template,
930            investigation_context=investigation_context,
931            per_artifact_results=per_artifact_results,
932            metadata_map=metadata_map,
933        )
934
935        model = self.model_info.get("model", "unknown")
936        provider = self.model_info.get("provider", "unknown")
937        summary_artifact_key = "cross_artifact_summary"
938        summary_artifact_name = "Cross-Artifact Summary"
939        summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md"
940
941        self._audit_log("analysis_started", {
942            "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
943            "provider": provider, "model": model,
944        })
945        self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt)
946
947        start_time = perf_counter()
948        try:
949            summary = self._call_ai_with_retry(
950                lambda: self.ai_provider.analyze(
951                    system_prompt=self.system_prompt, user_prompt=summary_prompt,
952                    max_tokens=self.ai_response_max_tokens,
953                )
954            )
955            duration_seconds = perf_counter() - start_time
956            self._audit_log("analysis_completed", {
957                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
958                "token_count": self._estimate_tokens(summary),
959                "duration_seconds": round(duration_seconds, 6), "status": "success",
960            })
961            return summary
962        except Exception as error:
963            duration_seconds = perf_counter() - start_time
964            summary = f"Analysis failed: {error}"
965            self._audit_log("analysis_completed", {
966                "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name,
967                "token_count": 0, "duration_seconds": round(duration_seconds, 6),
968                "status": "failed", "error": str(error),
969            })
970            return summary

Generate a cross-artifact summary by correlating findings.

Arguments:
  • per_artifact_results: List of per-artifact result dicts.
  • investigation_context: The user's investigation context.
  • metadata: Optional host metadata mapping.
Returns:

The AI-generated summary text, or an error message.

def run_full_analysis( self, artifact_keys: Iterable[str], investigation_context: str, metadata: Optional[Mapping[str, Any]], progress_callback: typing.Any | None = None, cancel_check: typing.Any | None = None) -> dict[str, typing.Any]:
 972    def run_full_analysis(
 973        self,
 974        artifact_keys: Iterable[str],
 975        investigation_context: str,
 976        metadata: Mapping[str, Any] | None,
 977        progress_callback: Any | None = None,
 978        cancel_check: Any | None = None,
 979    ) -> dict[str, Any]:
 980        """Run the complete analysis pipeline: per-artifact then summary.
 981
 982        Args:
 983            artifact_keys: Iterable of artifact key strings.
 984            investigation_context: The user's investigation context.
 985            metadata: Optional metadata mapping.
 986            progress_callback: Optional callable for streaming progress.
 987            cancel_check: Optional callable returning ``True`` when the
 988                analysis should be aborted early.
 989
 990        Returns:
 991            A dict with ``per_artifact``, ``summary``, and ``model_info``.
 992
 993        Raises:
 994            AnalysisCancelledError: If *cancel_check* returns ``True``.
 995        """
 996        if isinstance(self.ai_provider, UnavailableProvider):
 997            raise AIProviderError(self.ai_provider._error_message)
 998
 999        self._register_artifact_paths_from_metadata(metadata)
1000        per_artifact_results: list[dict[str, Any]] = []
1001        for artifact_key in artifact_keys:
1002            if cancel_check is not None and cancel_check():
1003                raise AnalysisCancelledError("Analysis cancelled by user.")
1004            result = self.analyze_artifact(
1005                artifact_key=str(artifact_key),
1006                investigation_context=investigation_context,
1007                progress_callback=progress_callback,
1008            )
1009            per_artifact_results.append(result)
1010            if progress_callback is not None:
1011                emit_analysis_progress(progress_callback, str(artifact_key), "complete", result)
1012
1013        summary = self.generate_summary(
1014            per_artifact_results=per_artifact_results,
1015            investigation_context=investigation_context,
1016            metadata=metadata,
1017        )
1018        return {
1019            "per_artifact": per_artifact_results,
1020            "summary": summary,
1021            "model_info": dict(self.model_info),
1022        }

Run the complete analysis pipeline: per-artifact then summary.

Arguments:
  • artifact_keys: Iterable of artifact key strings.
  • investigation_context: The user's investigation context.
  • metadata: Optional metadata mapping.
  • progress_callback: Optional callable for streaming progress.
  • cancel_check: Optional callable returning True when the analysis should be aborted early.
Returns:

A dict with per_artifact, summary, and model_info.

Raises:
  • AnalysisCancelledError: If cancel_check returns True.
PROJECT_ROOT = PosixPath('/home/runner/work/AIFT/AIFT')