app.analyzer.core
AI analysis orchestration module for forensic triage.
Implements the ForensicAnalyzer class that orchestrates the full analysis
pipeline: token budgeting, column projection, deduplication,
chunked analysis, citation validation, IOC extraction, and audit logging.
Sub-module organisation:
analyzer_constants: Compile-time constants, regex, prompt templates.analyzer_utils: Pure utility functions (string, datetime, CSV).analyzer_ioc: IOC extraction and prompt-building helpers.analyzer_citations: Citation validation against source CSV.analyzer_data_prep: Dedup, statistics, prompt assembly.analyzer_chunking: Chunked analysis and hierarchical merge.analyzer_prompts: Prompt template loading and construction.
Attributes:
- PROJECT_ROOT (Path): Re-exported from
analyzer_constants.
1"""AI analysis orchestration module for forensic triage. 2 3Implements the ``ForensicAnalyzer`` class that orchestrates the full analysis 4pipeline: token budgeting, column projection, deduplication, 5chunked analysis, citation validation, IOC extraction, and audit logging. 6 7Sub-module organisation: 8 9- ``analyzer_constants``: Compile-time constants, regex, prompt templates. 10- ``analyzer_utils``: Pure utility functions (string, datetime, CSV). 11- ``analyzer_ioc``: IOC extraction and prompt-building helpers. 12- ``analyzer_citations``: Citation validation against source CSV. 13- ``analyzer_data_prep``: Dedup, statistics, prompt assembly. 14- ``analyzer_chunking``: Chunked analysis and hierarchical merge. 15- ``analyzer_prompts``: Prompt template loading and construction. 16 17Attributes: 18 PROJECT_ROOT (Path): Re-exported from ``analyzer_constants``. 19""" 20 21from __future__ import annotations 22 23import logging 24from pathlib import Path 25from time import perf_counter, sleep 26from typing import Any, Callable, Iterable, Mapping 27 28from ..ai_providers import AIProviderError, create_provider 29from .chunking import analyze_artifact_chunked, split_csv_and_suffix, split_csv_into_chunks 30from .citations import match_column_name, timestamp_found_in_csv, timestamp_lookup_keys, validate_citations 31from .constants import ( 32 AI_MAX_TOKENS, AI_RETRY_ATTEMPTS, AI_RETRY_BASE_DELAY, 33 ARTIFACT_DEDUPLICATION_ENABLED, CITATION_SPOT_CHECK_LIMIT, 34 DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH, DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 35 DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 36 DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, DEFAULT_SUMMARY_PROMPT_TEMPLATE, 37 DEFAULT_SYSTEM_PROMPT, MAX_MERGE_ROUNDS, PROJECT_ROOT, TOKEN_CHAR_RATIO, 38 UnavailableProvider, 39) 40from .data_prep import ( 41 build_artifact_csv_attachment, build_full_data_csv, compute_statistics, 42 deduplicate_rows_for_analysis, prepare_artifact_data, 43) 44from .ioc import build_priority_directives, extract_ioc_targets, format_ioc_targets 45from .prompts import ( 46 build_summary_prompt, load_artifact_ai_column_projections, 47 load_artifact_instruction_prompts, load_prompt_template, 48 resolve_artifact_ai_columns_config_path, 49) 50from .utils import ( 51 build_datetime, coerce_projection_columns, emit_analysis_progress, 52 estimate_tokens, is_dedup_safe_identifier_column, normalize_artifact_key, 53 normalize_os_type, read_bool_setting, read_int_setting, read_path_setting, 54 sanitize_filename, stringify_value, 55) 56 57LOGGER = logging.getLogger(__name__) 58 59try: 60 from ..parser import LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY 61except Exception as error: 62 LOGGER.warning( 63 "Failed to import artifact registries from app.parser: %s. " 64 "Artifact metadata lookups will be unavailable.", 65 error, 66 ) 67 WINDOWS_ARTIFACT_REGISTRY: dict[str, dict[str, str]] = {} 68 LINUX_ARTIFACT_REGISTRY: dict[str, dict[str, str]] = {} 69 70__all__ = ["AnalysisCancelledError", "ForensicAnalyzer"] 71 72 73class AnalysisCancelledError(Exception): 74 """Raised when analysis is cancelled by the user.""" 75 76 77class ForensicAnalyzer: 78 """Orchestrates AI-powered forensic analysis of parsed artifact CSV data. 79 80 Central analysis engine for AIFT: reads parsed artifact CSV files, applies 81 column projection, and deduplication, builds token-budgeted 82 prompts, sends them to a configured AI provider, and validates citations. 83 84 Attributes: 85 case_dir: Path to the case directory, or ``None``. 86 config: Merged configuration dictionary. 87 ai_provider: The configured AI provider instance. 88 model_info: Dict with ``provider`` and ``model`` keys. 89 """ 90 91 # Expose extracted functions as static methods for backward compatibility 92 # with tests and callers that use ForensicAnalyzer._method_name(). 93 _stringify_value = staticmethod(stringify_value) 94 _build_datetime = staticmethod(build_datetime) 95 _normalize_artifact_key = staticmethod(normalize_artifact_key) 96 _sanitize_filename = staticmethod(sanitize_filename) 97 _split_csv_into_chunks = staticmethod(split_csv_into_chunks) 98 _split_csv_and_suffix = staticmethod(split_csv_and_suffix) 99 _coerce_projection_columns = staticmethod(coerce_projection_columns) 100 _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column) 101 _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys) 102 _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv) 103 _match_column_name = staticmethod(match_column_name) 104 _emit_analysis_progress = staticmethod(emit_analysis_progress) 105 106 def __init__( 107 self, 108 case_dir: str | Path | Mapping[str, str | Path] | None = None, 109 config: Mapping[str, Any] | None = None, 110 audit_logger: Any | None = None, 111 artifact_csv_paths: Mapping[str, str | Path] | None = None, 112 prompts_dir: str | Path | None = None, 113 random_seed: int | None = None, 114 os_type: str = "windows", 115 ) -> None: 116 """Initialize the forensic analyzer with case context and configuration. 117 118 Args: 119 case_dir: Path to the case directory, or a mapping of artifact 120 keys to CSV paths (convenience shorthand). 121 config: Application configuration dictionary. 122 audit_logger: Optional object with a ``log(action, details)`` 123 method. 124 artifact_csv_paths: Mapping of artifact keys to CSV paths. 125 prompts_dir: Directory containing prompt template files. 126 random_seed: Optional seed for the internal RNG. 127 os_type: Detected operating system type (``"windows"``, 128 ``"linux"``, etc.). Controls which artifact instruction 129 prompts are loaded. 130 """ 131 if ( 132 isinstance(case_dir, Mapping) 133 and config is None 134 and audit_logger is None 135 and artifact_csv_paths is None 136 ): 137 artifact_csv_paths = case_dir 138 case_dir = None 139 140 self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None 141 self.logger = LOGGER 142 self.config = dict(config) if isinstance(config, Mapping) else {} 143 self.audit_logger = audit_logger 144 self.artifact_csv_paths: dict[str, Path | list[Path]] = {} 145 for artifact_key, csv_path in (artifact_csv_paths or {}).items(): 146 key = str(artifact_key) 147 if isinstance(csv_path, list): 148 self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path] 149 else: 150 self.artifact_csv_paths[key] = Path(str(csv_path)) 151 self._analysis_input_csv_paths: dict[str, Path] = {} 152 self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts" 153 self.os_type = normalize_os_type(os_type) 154 import random 155 self._random = random.Random(random_seed) 156 self._load_analysis_settings() 157 self.artifact_ai_column_projections = self._load_artifact_ai_column_projections() 158 self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT) 159 self.artifact_prompt_template = self._load_prompt_template( 160 "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 161 ) 162 self.artifact_prompt_template_small_context = self._load_prompt_template( 163 "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, 164 ) 165 self.artifact_instruction_prompts = self._load_artifact_instruction_prompts() 166 self.summary_prompt_template = self._load_prompt_template( 167 "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE, 168 ) 169 self.chunk_merge_prompt_template = self._load_prompt_template( 170 "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 171 ) 172 self.ai_provider = self._create_ai_provider() 173 self.model_info = self._read_model_info() 174 175 # ------------------------------------------------------------------ 176 # Configuration loading 177 # ------------------------------------------------------------------ 178 179 def _load_analysis_settings(self) -> None: 180 """Load and validate analysis tuning parameters from the config dict.""" 181 analysis_config = self.config.get("analysis") 182 if not isinstance(analysis_config, Mapping): 183 analysis_config = {} 184 185 self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1) 186 self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2)) 187 legacy_shortened = read_int_setting( 188 analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1, 189 ) 190 self.shortened_prompt_cutoff_tokens = read_int_setting( 191 analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1, 192 ) 193 self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6) 194 self.citation_spot_check_limit = read_int_setting( 195 analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1, 196 ) 197 self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1) 198 self.artifact_deduplication_enabled = read_bool_setting( 199 analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED, 200 ) 201 self.artifact_ai_columns_config_path = read_path_setting( 202 analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH), 203 ) 204 205 # Backward-compatible aliases for the extracted config readers. 206 _read_int_setting = staticmethod(read_int_setting) 207 _read_bool_setting = staticmethod(read_bool_setting) 208 _read_path_setting = staticmethod(read_path_setting) 209 210 def _resolve_artifact_ai_columns_config_path(self) -> Path: 211 """Resolve the artifact AI columns config path to an absolute Path. 212 213 Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`. 214 215 Returns: 216 Resolved absolute ``Path`` to the YAML config file. 217 """ 218 return resolve_artifact_ai_columns_config_path( 219 self.artifact_ai_columns_config_path, self.case_dir, 220 ) 221 222 def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]: 223 """Load per-artifact column projection configuration from YAML. 224 225 Delegates to :func:`prompts.load_artifact_ai_column_projections`. 226 227 Returns: 228 A dict mapping normalized artifact keys to tuples of column names. 229 """ 230 config_path = self._resolve_artifact_ai_columns_config_path() 231 return load_artifact_ai_column_projections(config_path, os_type=self.os_type) 232 233 def _load_prompt_template(self, filename: str, default: str) -> str: 234 """Read a prompt template file from the prompts directory. 235 236 Delegates to :func:`prompts.load_prompt_template`. 237 238 Args: 239 filename: Name of the template file. 240 default: Fallback template string. 241 242 Returns: 243 The template text. 244 """ 245 return load_prompt_template(self.prompts_dir, filename, default) 246 247 def _load_artifact_instruction_prompts(self) -> dict[str, str]: 248 """Load per-artifact analysis instruction prompts. 249 250 Delegates to :func:`prompts.load_artifact_instruction_prompts`, 251 passing :attr:`os_type` so the correct OS-specific instruction 252 directory is selected. 253 254 Returns: 255 A dict mapping artifact keys to instruction prompt text. 256 """ 257 return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type) 258 259 # ------------------------------------------------------------------ 260 # AI provider 261 # ------------------------------------------------------------------ 262 263 def _create_ai_provider(self) -> Any: 264 """Instantiate the configured AI provider, or a fallback on failure. 265 266 Returns: 267 An AI provider instance, or an ``UnavailableProvider``. 268 """ 269 provider_config: Mapping[str, Any] 270 if self.config: 271 provider_config = self.config 272 else: 273 provider_config = { 274 "ai": { 275 "provider": "local", 276 "local": { 277 "base_url": "http://localhost:11434/v1", 278 "model": "llama3.1:70b", 279 "api_key": "not-needed", 280 }, 281 } 282 } 283 try: 284 return create_provider(dict(provider_config)) 285 except Exception as error: 286 return UnavailableProvider(str(error)) 287 288 def _read_model_info(self) -> dict[str, str]: 289 """Read provider and model metadata from the AI provider. 290 291 Returns: 292 A dict with at least ``provider`` and ``model`` keys. 293 """ 294 try: 295 model_info = self.ai_provider.get_model_info() 296 except Exception: 297 return {"provider": "unknown", "model": "unknown"} 298 299 if not isinstance(model_info, Mapping): 300 return {"provider": "unknown", "model": "unknown"} 301 302 return {str(key): str(value) for key, value in model_info.items()} 303 304 def _call_ai_with_retry(self, call: Callable[[], str]) -> str: 305 """Call the AI provider with retry on transient failures. 306 307 Args: 308 call: A zero-argument callable that invokes the AI provider. 309 310 Returns: 311 The AI provider's response string. 312 313 Raises: 314 AIProviderError: If the provider raises a permanent error. 315 Exception: The last transient error after all retries. 316 """ 317 last_error: Exception | None = None 318 for attempt in range(AI_RETRY_ATTEMPTS): 319 try: 320 return call() 321 except AIProviderError: 322 raise 323 except Exception as error: 324 last_error = error 325 if attempt < AI_RETRY_ATTEMPTS - 1: 326 delay = AI_RETRY_BASE_DELAY * (2 ** attempt) 327 self.logger.warning( 328 "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s", 329 attempt + 1, AI_RETRY_ATTEMPTS, delay, error, 330 ) 331 sleep(delay) 332 raise last_error # type: ignore[misc] 333 334 # ------------------------------------------------------------------ 335 # Audit / prompt saving 336 # ------------------------------------------------------------------ 337 338 def _audit_log(self, action: str, details: dict[str, Any]) -> None: 339 """Write an entry to the forensic audit trail. 340 341 Args: 342 action: The audit action name. 343 details: Key-value details for the audit entry. 344 """ 345 if self.audit_logger is None: 346 return 347 logger = getattr(self.audit_logger, "log", None) 348 if not callable(logger): 349 return 350 try: 351 logger(action, details) 352 except Exception: 353 return 354 355 def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None: 356 """Save a prompt to the case prompts directory for audit. 357 358 Args: 359 filename: Output filename. 360 system_prompt: The system prompt text. 361 user_prompt: The user prompt text. 362 """ 363 if self.case_dir is None: 364 return 365 prompts_dir = self.case_dir / "prompts" 366 try: 367 prompts_dir.mkdir(parents=True, exist_ok=True) 368 prompt_path = prompts_dir / filename 369 prompt_path.write_text( 370 f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n", 371 encoding="utf-8", 372 ) 373 except OSError: 374 self.logger.warning("Failed to save prompt to %s", prompts_dir / filename) 375 376 # ------------------------------------------------------------------ 377 # Delegation methods — thin wrappers for backward compatibility. 378 # Methods that don't use self are exposed as staticmethod assignments 379 # at the class level (see above). The methods below need self. 380 # ------------------------------------------------------------------ 381 382 def _estimate_tokens(self, text: str) -> int: 383 """Estimate the token count of *text* using model-specific info.""" 384 return estimate_tokens(text, model_info=self.model_info) 385 386 # These are also exposed as staticmethods on the class (see above) 387 # but tests may call them on instances, so they work either way. 388 _extract_ioc_targets = staticmethod(extract_ioc_targets) 389 _format_ioc_targets = staticmethod(format_ioc_targets) 390 _build_priority_directives = staticmethod(build_priority_directives) 391 _compute_statistics = staticmethod(compute_statistics) 392 _build_full_data_csv = staticmethod(build_full_data_csv) 393 _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis) 394 395 def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]: 396 """Spot-check AI-cited values against source CSV. 397 398 Args: 399 artifact_key: Artifact identifier. 400 analysis_text: The AI's analysis text. 401 402 Returns: 403 List of warning strings. 404 """ 405 if analysis_text.startswith("Analysis failed:"): 406 return [] 407 try: 408 original_path = self._resolve_artifact_csv_path(artifact_key) 409 except FileNotFoundError: 410 return [] 411 csv_path = self._resolve_analysis_input_csv_path( 412 artifact_key, fallback=original_path, 413 ) 414 return validate_citations( 415 artifact_key=artifact_key, 416 analysis_text=analysis_text, 417 csv_path=csv_path, 418 citation_spot_check_limit=self.citation_spot_check_limit, 419 audit_log_fn=self._audit_log, 420 ) 421 422 # ------------------------------------------------------------------ 423 # Path resolution 424 # ------------------------------------------------------------------ 425 426 def _resolve_artifact_csv_path(self, artifact_key: str) -> Path: 427 """Resolve the CSV file path for a given artifact key. 428 429 For split artifacts with multiple CSV files, returns the first 430 path. Use :meth:`_resolve_all_artifact_csv_paths` to get every 431 path for a split artifact. 432 433 Args: 434 artifact_key: Artifact identifier to resolve. 435 436 Returns: 437 A ``Path`` to the artifact's CSV file. 438 439 Raises: 440 FileNotFoundError: If no CSV path can be found. 441 """ 442 mapped = self.artifact_csv_paths.get(artifact_key) 443 if mapped is not None: 444 if isinstance(mapped, list): 445 return mapped[0] 446 return mapped 447 448 normalized = normalize_artifact_key(artifact_key) 449 mapped_normalized = self.artifact_csv_paths.get(normalized) 450 if mapped_normalized is not None: 451 if isinstance(mapped_normalized, list): 452 return mapped_normalized[0] 453 return mapped_normalized 454 455 candidate_path = Path(artifact_key) 456 if candidate_path.exists(): 457 return candidate_path 458 459 if self.case_dir is not None: 460 parsed_dir = self.case_dir / "parsed" 461 if parsed_dir.exists(): 462 normalized = normalize_artifact_key(artifact_key) 463 file_stubs = { 464 artifact_key, normalized, 465 sanitize_filename(artifact_key), 466 sanitize_filename(normalized), 467 } 468 for file_stub in file_stubs: 469 direct_csv_path = parsed_dir / f"{file_stub}.csv" 470 if direct_csv_path.exists(): 471 return direct_csv_path 472 for file_stub in file_stubs: 473 prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv")) 474 if prefixed_paths: 475 return prefixed_paths[0] 476 477 raise FileNotFoundError( 478 f"No CSV path mapped for artifact '{artifact_key}'. " 479 "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths." 480 ) 481 482 def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]: 483 """Resolve all CSV file paths for a given artifact key. 484 485 For single-file artifacts returns a one-element list. For split 486 artifacts (e.g. EVTX) returns all constituent CSV paths. 487 488 Args: 489 artifact_key: Artifact identifier to resolve. 490 491 Returns: 492 A non-empty list of ``Path`` objects. 493 494 Raises: 495 FileNotFoundError: If no CSV path can be found. 496 """ 497 for key in (artifact_key, normalize_artifact_key(artifact_key)): 498 mapped = self.artifact_csv_paths.get(key) 499 if mapped is not None: 500 if isinstance(mapped, list): 501 return list(mapped) 502 return [mapped] 503 504 # Filesystem fallback: search case_dir/parsed for all matching parts. 505 if self.case_dir is not None: 506 parsed_dir = self.case_dir / "parsed" 507 if parsed_dir.exists(): 508 normalized = normalize_artifact_key(artifact_key) 509 file_stubs = { 510 artifact_key, normalized, 511 sanitize_filename(artifact_key), 512 sanitize_filename(normalized), 513 } 514 for file_stub in file_stubs: 515 direct_csv_path = parsed_dir / f"{file_stub}.csv" 516 combined_csv_path = parsed_dir / f"{file_stub}_combined.csv" 517 prefixed_paths = sorted( 518 path 519 for path in parsed_dir.glob(f"{file_stub}_*.csv") 520 if path != combined_csv_path 521 ) 522 if direct_csv_path.exists() and prefixed_paths: 523 return sorted([direct_csv_path] + prefixed_paths) 524 if prefixed_paths: 525 return prefixed_paths 526 if direct_csv_path.exists(): 527 return [direct_csv_path] 528 529 # Final fallback: delegate to single-path resolver. 530 return [self._resolve_artifact_csv_path(artifact_key)] 531 532 def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path: 533 """Concatenate multiple CSV files into a single combined CSV. 534 535 All input files are assumed to share the same schema (column names). 536 The combined file is written to the case's ``parsed/`` directory (or 537 next to the first input file) with a ``_combined`` suffix. 538 539 Args: 540 artifact_key: Artifact identifier (used for the output filename). 541 csv_paths: List of CSV file paths to combine. 542 543 Returns: 544 Path to the combined CSV file. 545 """ 546 import csv as csv_mod 547 548 output_dir = csv_paths[0].parent 549 safe_key = sanitize_filename(artifact_key) 550 combined_path = output_dir / f"{safe_key}_combined.csv" 551 552 fieldnames: list[str] = [] 553 fieldnames_set: set[str] = set() 554 555 for csv_path in csv_paths: 556 if not csv_path.exists(): 557 continue 558 with csv_path.open("r", newline="", encoding="utf-8") as fh: 559 reader = csv_mod.DictReader(fh) 560 if reader.fieldnames: 561 for fn in reader.fieldnames: 562 if fn not in fieldnames_set: 563 fieldnames.append(fn) 564 fieldnames_set.add(fn) 565 566 with combined_path.open("w", newline="", encoding="utf-8") as out: 567 writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore") 568 writer.writeheader() 569 for csv_path in csv_paths: 570 if not csv_path.exists(): 571 continue 572 with csv_path.open("r", newline="", encoding="utf-8") as fh: 573 reader = csv_mod.DictReader(fh) 574 for row in reader: 575 writer.writerow(row) 576 577 return combined_path 578 579 def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None: 580 """Store the analysis-input CSV path for an artifact. 581 582 Args: 583 artifact_key: Artifact identifier. 584 csv_path: Path to the analysis-input CSV. 585 """ 586 self._analysis_input_csv_paths[artifact_key] = csv_path 587 normalized = normalize_artifact_key(artifact_key) 588 self._analysis_input_csv_paths[normalized] = csv_path 589 590 def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path: 591 """Retrieve the analysis-input CSV path, with fallback. 592 593 Args: 594 artifact_key: Artifact identifier. 595 fallback: Default path if not stored. 596 597 Returns: 598 The stored analysis-input CSV path, or *fallback*. 599 """ 600 mapped = self._analysis_input_csv_paths.get(artifact_key) 601 if mapped is not None: 602 return mapped 603 normalized = normalize_artifact_key(artifact_key) 604 mapped = self._analysis_input_csv_paths.get(normalized) 605 if mapped is not None: 606 return mapped 607 return fallback 608 609 def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]: 610 """Look up artifact metadata from the OS-appropriate registry first. 611 612 Searches the registry matching :attr:`os_type` first so that 613 shared keys like ``services`` resolve to the correct OS-specific 614 entry. Falls back to the other registry for cross-OS lookups. 615 616 Args: 617 artifact_key: Artifact identifier. 618 619 Returns: 620 A dict with at least ``name``, ``description``, and 621 ``analysis_hint`` keys. 622 """ 623 if self.os_type == "linux": 624 registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY) 625 else: 626 registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY) 627 628 for registry in registries: 629 if artifact_key in registry: 630 metadata = registry[artifact_key] 631 return {str(key): str(value) for key, value in metadata.items()} 632 633 normalized = normalize_artifact_key(artifact_key) 634 for registry in registries: 635 if normalized in registry: 636 metadata = registry[normalized] 637 return {str(key): str(value) for key, value in metadata.items()} 638 639 return { 640 "name": artifact_key, 641 "description": "No artifact description available.", 642 "analysis_hint": "No specific analysis guidance is available for this artifact.", 643 } 644 645 # ------------------------------------------------------------------ 646 # Metadata registration 647 # ------------------------------------------------------------------ 648 649 def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None: 650 """Extract and register artifact CSV paths from run metadata. 651 652 Args: 653 metadata: Optional metadata mapping. 654 """ 655 if not isinstance(metadata, Mapping): 656 return 657 658 artifact_csv_paths = metadata.get("artifact_csv_paths") 659 if isinstance(artifact_csv_paths, Mapping): 660 for artifact_key, csv_path in artifact_csv_paths.items(): 661 if isinstance(csv_path, list) and len(csv_path) > 1: 662 self.artifact_csv_paths[str(artifact_key)] = [ 663 Path(str(p)) for p in csv_path 664 ] 665 elif isinstance(csv_path, list) and csv_path: 666 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0])) 667 else: 668 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 669 670 for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"): 671 container = metadata.get(container_key) 672 if isinstance(container, Mapping): 673 for artifact_key, value in container.items(): 674 self._register_artifact_path_entry(artifact_key=artifact_key, value=value) 675 elif isinstance(container, list): 676 for item in container: 677 if isinstance(item, Mapping): 678 artifact_key = item.get("artifact_key") or item.get("key") 679 if artifact_key: 680 self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item) 681 682 def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None: 683 """Register a single artifact CSV path from a metadata entry. 684 685 Args: 686 artifact_key: Artifact identifier. 687 value: Metadata entry (mapping, string, or Path). 688 """ 689 if artifact_key in (None, ""): 690 return 691 692 if isinstance(value, Mapping): 693 csv_path = value.get("csv_path") 694 csv_paths = value.get("csv_paths") 695 if isinstance(csv_paths, list) and len(csv_paths) > 1: 696 self.artifact_csv_paths[str(artifact_key)] = [ 697 Path(str(p)) for p in csv_paths 698 ] 699 return 700 if csv_path: 701 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 702 return 703 if isinstance(csv_paths, list) and csv_paths: 704 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0])) 705 return 706 707 if isinstance(value, (str, Path)): 708 self.artifact_csv_paths[str(artifact_key)] = Path(str(value)) 709 710 # ------------------------------------------------------------------ 711 # Core analysis pipeline 712 # ------------------------------------------------------------------ 713 714 def _prepare_artifact_data( 715 self, artifact_key: str, investigation_context: str, csv_path: Path | None = None, 716 ) -> str: 717 """Prepare one artifact CSV as a bounded, analysis-ready prompt. 718 719 Args: 720 artifact_key: Unique identifier for the artifact. 721 investigation_context: Free-text investigation context. 722 csv_path: Explicit path to the artifact CSV. 723 724 Returns: 725 The fully rendered prompt string. 726 727 Raises: 728 FileNotFoundError: If the artifact CSV cannot be located. 729 """ 730 resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key) 731 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 732 733 prompt_text, analysis_csv_path, _ = prepare_artifact_data( 734 artifact_key=artifact_key, 735 investigation_context=investigation_context, 736 csv_path=resolved_csv_path, 737 artifact_metadata=artifact_metadata, 738 artifact_prompt_template=self.artifact_prompt_template, 739 artifact_prompt_template_small_context=self.artifact_prompt_template_small_context, 740 artifact_instruction_prompts=self.artifact_instruction_prompts, 741 artifact_ai_column_projections=self.artifact_ai_column_projections, 742 artifact_deduplication_enabled=self.artifact_deduplication_enabled, 743 ai_max_tokens=self.ai_max_tokens, 744 shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens, 745 case_dir=self.case_dir, 746 audit_log_fn=self._audit_log, 747 ) 748 self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path) 749 return prompt_text 750 751 def analyze_artifact( 752 self, 753 artifact_key: str, 754 investigation_context: str, 755 progress_callback: Any | None = None, 756 ) -> dict[str, Any]: 757 """Analyze a single artifact's CSV data and return AI findings. 758 759 Args: 760 artifact_key: Unique identifier for the artifact. 761 investigation_context: Free-text investigation context. 762 progress_callback: Optional callable for streaming progress. 763 764 Returns: 765 A dict with ``artifact_key``, ``artifact_name``, ``analysis``, 766 ``model``, and optionally ``citation_warnings``. 767 """ 768 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 769 artifact_name = artifact_metadata.get("name", artifact_key) 770 model = self.model_info.get("model", "unknown") 771 provider = self.model_info.get("provider", "unknown") 772 773 self._audit_log("analysis_started", { 774 "artifact_key": artifact_key, "artifact_name": artifact_name, 775 "provider": provider, "model": model, 776 }) 777 778 start_time = perf_counter() 779 try: 780 all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key) 781 if len(all_csv_paths) > 1: 782 csv_path = self._combine_csv_files(artifact_key, all_csv_paths) 783 else: 784 csv_path = all_csv_paths[0] 785 artifact_prompt = self._prepare_artifact_data( 786 artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path, 787 ) 788 analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path) 789 attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)] 790 791 safe_key = sanitize_filename(artifact_key) 792 self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt) 793 794 prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt) 795 if prompt_tokens_estimate > self.ai_max_tokens: 796 self.logger.info( 797 "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.", 798 artifact_key, prompt_tokens_estimate, self.ai_max_tokens, 799 ) 800 if progress_callback is not None: 801 emit_analysis_progress(progress_callback, artifact_key, "started", { 802 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 803 }) 804 analysis_text = analyze_artifact_chunked( 805 artifact_prompt=artifact_prompt, 806 artifact_key=artifact_key, 807 artifact_name=artifact_name, 808 investigation_context=investigation_context, 809 model=model, 810 system_prompt=self.system_prompt, 811 ai_response_max_tokens=self.ai_response_max_tokens, 812 chunk_csv_budget=self.chunk_csv_budget, 813 chunk_merge_prompt_template=self.chunk_merge_prompt_template, 814 max_merge_rounds=self.max_merge_rounds, 815 call_ai_with_retry_fn=self._call_ai_with_retry, 816 ai_provider=self.ai_provider, 817 audit_log_fn=self._audit_log, 818 save_case_prompt_fn=self._save_case_prompt, 819 progress_callback=progress_callback, 820 ) 821 duration_seconds = perf_counter() - start_time 822 self._audit_log("analysis_completed", { 823 "artifact_key": artifact_key, "artifact_name": artifact_name, 824 "token_count": self._estimate_tokens(analysis_text), 825 "duration_seconds": round(duration_seconds, 6), 826 "status": "success", "chunked": True, 827 }) 828 citation_warnings = self._validate_citations(artifact_key, analysis_text) 829 result: dict[str, Any] = { 830 "artifact_key": artifact_key, "artifact_name": artifact_name, 831 "analysis": analysis_text, "model": model, 832 } 833 if citation_warnings: 834 result["citation_warnings"] = citation_warnings 835 return result 836 837 analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None) 838 if callable(analyze_with_progress) and progress_callback is not None: 839 emit_analysis_progress(progress_callback, artifact_key, "started", { 840 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 841 }) 842 843 def _provider_progress(payload: Mapping[str, Any]) -> None: 844 """Forward provider progress to the frontend.""" 845 if not isinstance(payload, Mapping): 846 return 847 emit_analysis_progress(progress_callback, artifact_key, "thinking", { 848 "artifact_key": artifact_key, "artifact_name": artifact_name, 849 "thinking_text": str(payload.get("thinking_text", "")), 850 "partial_text": str(payload.get("partial_text", "")), 851 "model": model, 852 }) 853 854 try: 855 analysis_text = analyze_with_progress( 856 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 857 progress_callback=_provider_progress, attachments=attachments, 858 max_tokens=self.ai_response_max_tokens, 859 ) 860 except TypeError: 861 analysis_text = analyze_with_progress( 862 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 863 progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens, 864 ) 865 else: 866 if progress_callback is not None: 867 emit_analysis_progress(progress_callback, artifact_key, "started", { 868 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 869 }) 870 analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None) 871 if callable(analyze_with_attachments): 872 analysis_text = self._call_ai_with_retry( 873 lambda: analyze_with_attachments( 874 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 875 attachments=attachments, max_tokens=self.ai_response_max_tokens, 876 ) 877 ) 878 else: 879 analysis_text = self._call_ai_with_retry( 880 lambda: self.ai_provider.analyze( 881 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 882 max_tokens=self.ai_response_max_tokens, 883 ) 884 ) 885 duration_seconds = perf_counter() - start_time 886 self._audit_log("analysis_completed", { 887 "artifact_key": artifact_key, "artifact_name": artifact_name, 888 "token_count": self._estimate_tokens(analysis_text), 889 "duration_seconds": round(duration_seconds, 6), "status": "success", 890 }) 891 except Exception as error: 892 duration_seconds = perf_counter() - start_time 893 analysis_text = f"Analysis failed: {error}" 894 self._audit_log("analysis_completed", { 895 "artifact_key": artifact_key, "artifact_name": artifact_name, 896 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 897 "status": "failed", "error": str(error), 898 }) 899 900 citation_warnings = self._validate_citations(artifact_key, analysis_text) 901 902 result = { 903 "artifact_key": artifact_key, "artifact_name": artifact_name, 904 "analysis": analysis_text, "model": model, 905 } 906 if citation_warnings: 907 result["citation_warnings"] = citation_warnings 908 return result 909 910 def generate_summary( 911 self, 912 per_artifact_results: list[Mapping[str, Any]], 913 investigation_context: str, 914 metadata: Mapping[str, Any] | None, 915 ) -> str: 916 """Generate a cross-artifact summary by correlating findings. 917 918 Args: 919 per_artifact_results: List of per-artifact result dicts. 920 investigation_context: The user's investigation context. 921 metadata: Optional host metadata mapping. 922 923 Returns: 924 The AI-generated summary text, or an error message. 925 """ 926 metadata_map = metadata if isinstance(metadata, Mapping) else {} 927 summary_prompt = build_summary_prompt( 928 summary_prompt_template=self.summary_prompt_template, 929 investigation_context=investigation_context, 930 per_artifact_results=per_artifact_results, 931 metadata_map=metadata_map, 932 ) 933 934 model = self.model_info.get("model", "unknown") 935 provider = self.model_info.get("provider", "unknown") 936 summary_artifact_key = "cross_artifact_summary" 937 summary_artifact_name = "Cross-Artifact Summary" 938 summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md" 939 940 self._audit_log("analysis_started", { 941 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 942 "provider": provider, "model": model, 943 }) 944 self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt) 945 946 start_time = perf_counter() 947 try: 948 summary = self._call_ai_with_retry( 949 lambda: self.ai_provider.analyze( 950 system_prompt=self.system_prompt, user_prompt=summary_prompt, 951 max_tokens=self.ai_response_max_tokens, 952 ) 953 ) 954 duration_seconds = perf_counter() - start_time 955 self._audit_log("analysis_completed", { 956 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 957 "token_count": self._estimate_tokens(summary), 958 "duration_seconds": round(duration_seconds, 6), "status": "success", 959 }) 960 return summary 961 except Exception as error: 962 duration_seconds = perf_counter() - start_time 963 summary = f"Analysis failed: {error}" 964 self._audit_log("analysis_completed", { 965 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 966 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 967 "status": "failed", "error": str(error), 968 }) 969 return summary 970 971 def run_full_analysis( 972 self, 973 artifact_keys: Iterable[str], 974 investigation_context: str, 975 metadata: Mapping[str, Any] | None, 976 progress_callback: Any | None = None, 977 cancel_check: Any | None = None, 978 ) -> dict[str, Any]: 979 """Run the complete analysis pipeline: per-artifact then summary. 980 981 Args: 982 artifact_keys: Iterable of artifact key strings. 983 investigation_context: The user's investigation context. 984 metadata: Optional metadata mapping. 985 progress_callback: Optional callable for streaming progress. 986 cancel_check: Optional callable returning ``True`` when the 987 analysis should be aborted early. 988 989 Returns: 990 A dict with ``per_artifact``, ``summary``, and ``model_info``. 991 992 Raises: 993 AnalysisCancelledError: If *cancel_check* returns ``True``. 994 """ 995 if isinstance(self.ai_provider, UnavailableProvider): 996 raise AIProviderError(self.ai_provider._error_message) 997 998 self._register_artifact_paths_from_metadata(metadata) 999 per_artifact_results: list[dict[str, Any]] = [] 1000 for artifact_key in artifact_keys: 1001 if cancel_check is not None and cancel_check(): 1002 raise AnalysisCancelledError("Analysis cancelled by user.") 1003 result = self.analyze_artifact( 1004 artifact_key=str(artifact_key), 1005 investigation_context=investigation_context, 1006 progress_callback=progress_callback, 1007 ) 1008 per_artifact_results.append(result) 1009 if progress_callback is not None: 1010 emit_analysis_progress(progress_callback, str(artifact_key), "complete", result) 1011 1012 summary = self.generate_summary( 1013 per_artifact_results=per_artifact_results, 1014 investigation_context=investigation_context, 1015 metadata=metadata, 1016 ) 1017 return { 1018 "per_artifact": per_artifact_results, 1019 "summary": summary, 1020 "model_info": dict(self.model_info), 1021 }
Raised when analysis is cancelled by the user.
78class ForensicAnalyzer: 79 """Orchestrates AI-powered forensic analysis of parsed artifact CSV data. 80 81 Central analysis engine for AIFT: reads parsed artifact CSV files, applies 82 column projection, and deduplication, builds token-budgeted 83 prompts, sends them to a configured AI provider, and validates citations. 84 85 Attributes: 86 case_dir: Path to the case directory, or ``None``. 87 config: Merged configuration dictionary. 88 ai_provider: The configured AI provider instance. 89 model_info: Dict with ``provider`` and ``model`` keys. 90 """ 91 92 # Expose extracted functions as static methods for backward compatibility 93 # with tests and callers that use ForensicAnalyzer._method_name(). 94 _stringify_value = staticmethod(stringify_value) 95 _build_datetime = staticmethod(build_datetime) 96 _normalize_artifact_key = staticmethod(normalize_artifact_key) 97 _sanitize_filename = staticmethod(sanitize_filename) 98 _split_csv_into_chunks = staticmethod(split_csv_into_chunks) 99 _split_csv_and_suffix = staticmethod(split_csv_and_suffix) 100 _coerce_projection_columns = staticmethod(coerce_projection_columns) 101 _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column) 102 _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys) 103 _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv) 104 _match_column_name = staticmethod(match_column_name) 105 _emit_analysis_progress = staticmethod(emit_analysis_progress) 106 107 def __init__( 108 self, 109 case_dir: str | Path | Mapping[str, str | Path] | None = None, 110 config: Mapping[str, Any] | None = None, 111 audit_logger: Any | None = None, 112 artifact_csv_paths: Mapping[str, str | Path] | None = None, 113 prompts_dir: str | Path | None = None, 114 random_seed: int | None = None, 115 os_type: str = "windows", 116 ) -> None: 117 """Initialize the forensic analyzer with case context and configuration. 118 119 Args: 120 case_dir: Path to the case directory, or a mapping of artifact 121 keys to CSV paths (convenience shorthand). 122 config: Application configuration dictionary. 123 audit_logger: Optional object with a ``log(action, details)`` 124 method. 125 artifact_csv_paths: Mapping of artifact keys to CSV paths. 126 prompts_dir: Directory containing prompt template files. 127 random_seed: Optional seed for the internal RNG. 128 os_type: Detected operating system type (``"windows"``, 129 ``"linux"``, etc.). Controls which artifact instruction 130 prompts are loaded. 131 """ 132 if ( 133 isinstance(case_dir, Mapping) 134 and config is None 135 and audit_logger is None 136 and artifact_csv_paths is None 137 ): 138 artifact_csv_paths = case_dir 139 case_dir = None 140 141 self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None 142 self.logger = LOGGER 143 self.config = dict(config) if isinstance(config, Mapping) else {} 144 self.audit_logger = audit_logger 145 self.artifact_csv_paths: dict[str, Path | list[Path]] = {} 146 for artifact_key, csv_path in (artifact_csv_paths or {}).items(): 147 key = str(artifact_key) 148 if isinstance(csv_path, list): 149 self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path] 150 else: 151 self.artifact_csv_paths[key] = Path(str(csv_path)) 152 self._analysis_input_csv_paths: dict[str, Path] = {} 153 self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts" 154 self.os_type = normalize_os_type(os_type) 155 import random 156 self._random = random.Random(random_seed) 157 self._load_analysis_settings() 158 self.artifact_ai_column_projections = self._load_artifact_ai_column_projections() 159 self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT) 160 self.artifact_prompt_template = self._load_prompt_template( 161 "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 162 ) 163 self.artifact_prompt_template_small_context = self._load_prompt_template( 164 "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, 165 ) 166 self.artifact_instruction_prompts = self._load_artifact_instruction_prompts() 167 self.summary_prompt_template = self._load_prompt_template( 168 "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE, 169 ) 170 self.chunk_merge_prompt_template = self._load_prompt_template( 171 "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 172 ) 173 self.ai_provider = self._create_ai_provider() 174 self.model_info = self._read_model_info() 175 176 # ------------------------------------------------------------------ 177 # Configuration loading 178 # ------------------------------------------------------------------ 179 180 def _load_analysis_settings(self) -> None: 181 """Load and validate analysis tuning parameters from the config dict.""" 182 analysis_config = self.config.get("analysis") 183 if not isinstance(analysis_config, Mapping): 184 analysis_config = {} 185 186 self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1) 187 self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2)) 188 legacy_shortened = read_int_setting( 189 analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1, 190 ) 191 self.shortened_prompt_cutoff_tokens = read_int_setting( 192 analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1, 193 ) 194 self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6) 195 self.citation_spot_check_limit = read_int_setting( 196 analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1, 197 ) 198 self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1) 199 self.artifact_deduplication_enabled = read_bool_setting( 200 analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED, 201 ) 202 self.artifact_ai_columns_config_path = read_path_setting( 203 analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH), 204 ) 205 206 # Backward-compatible aliases for the extracted config readers. 207 _read_int_setting = staticmethod(read_int_setting) 208 _read_bool_setting = staticmethod(read_bool_setting) 209 _read_path_setting = staticmethod(read_path_setting) 210 211 def _resolve_artifact_ai_columns_config_path(self) -> Path: 212 """Resolve the artifact AI columns config path to an absolute Path. 213 214 Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`. 215 216 Returns: 217 Resolved absolute ``Path`` to the YAML config file. 218 """ 219 return resolve_artifact_ai_columns_config_path( 220 self.artifact_ai_columns_config_path, self.case_dir, 221 ) 222 223 def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]: 224 """Load per-artifact column projection configuration from YAML. 225 226 Delegates to :func:`prompts.load_artifact_ai_column_projections`. 227 228 Returns: 229 A dict mapping normalized artifact keys to tuples of column names. 230 """ 231 config_path = self._resolve_artifact_ai_columns_config_path() 232 return load_artifact_ai_column_projections(config_path, os_type=self.os_type) 233 234 def _load_prompt_template(self, filename: str, default: str) -> str: 235 """Read a prompt template file from the prompts directory. 236 237 Delegates to :func:`prompts.load_prompt_template`. 238 239 Args: 240 filename: Name of the template file. 241 default: Fallback template string. 242 243 Returns: 244 The template text. 245 """ 246 return load_prompt_template(self.prompts_dir, filename, default) 247 248 def _load_artifact_instruction_prompts(self) -> dict[str, str]: 249 """Load per-artifact analysis instruction prompts. 250 251 Delegates to :func:`prompts.load_artifact_instruction_prompts`, 252 passing :attr:`os_type` so the correct OS-specific instruction 253 directory is selected. 254 255 Returns: 256 A dict mapping artifact keys to instruction prompt text. 257 """ 258 return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type) 259 260 # ------------------------------------------------------------------ 261 # AI provider 262 # ------------------------------------------------------------------ 263 264 def _create_ai_provider(self) -> Any: 265 """Instantiate the configured AI provider, or a fallback on failure. 266 267 Returns: 268 An AI provider instance, or an ``UnavailableProvider``. 269 """ 270 provider_config: Mapping[str, Any] 271 if self.config: 272 provider_config = self.config 273 else: 274 provider_config = { 275 "ai": { 276 "provider": "local", 277 "local": { 278 "base_url": "http://localhost:11434/v1", 279 "model": "llama3.1:70b", 280 "api_key": "not-needed", 281 }, 282 } 283 } 284 try: 285 return create_provider(dict(provider_config)) 286 except Exception as error: 287 return UnavailableProvider(str(error)) 288 289 def _read_model_info(self) -> dict[str, str]: 290 """Read provider and model metadata from the AI provider. 291 292 Returns: 293 A dict with at least ``provider`` and ``model`` keys. 294 """ 295 try: 296 model_info = self.ai_provider.get_model_info() 297 except Exception: 298 return {"provider": "unknown", "model": "unknown"} 299 300 if not isinstance(model_info, Mapping): 301 return {"provider": "unknown", "model": "unknown"} 302 303 return {str(key): str(value) for key, value in model_info.items()} 304 305 def _call_ai_with_retry(self, call: Callable[[], str]) -> str: 306 """Call the AI provider with retry on transient failures. 307 308 Args: 309 call: A zero-argument callable that invokes the AI provider. 310 311 Returns: 312 The AI provider's response string. 313 314 Raises: 315 AIProviderError: If the provider raises a permanent error. 316 Exception: The last transient error after all retries. 317 """ 318 last_error: Exception | None = None 319 for attempt in range(AI_RETRY_ATTEMPTS): 320 try: 321 return call() 322 except AIProviderError: 323 raise 324 except Exception as error: 325 last_error = error 326 if attempt < AI_RETRY_ATTEMPTS - 1: 327 delay = AI_RETRY_BASE_DELAY * (2 ** attempt) 328 self.logger.warning( 329 "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s", 330 attempt + 1, AI_RETRY_ATTEMPTS, delay, error, 331 ) 332 sleep(delay) 333 raise last_error # type: ignore[misc] 334 335 # ------------------------------------------------------------------ 336 # Audit / prompt saving 337 # ------------------------------------------------------------------ 338 339 def _audit_log(self, action: str, details: dict[str, Any]) -> None: 340 """Write an entry to the forensic audit trail. 341 342 Args: 343 action: The audit action name. 344 details: Key-value details for the audit entry. 345 """ 346 if self.audit_logger is None: 347 return 348 logger = getattr(self.audit_logger, "log", None) 349 if not callable(logger): 350 return 351 try: 352 logger(action, details) 353 except Exception: 354 return 355 356 def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None: 357 """Save a prompt to the case prompts directory for audit. 358 359 Args: 360 filename: Output filename. 361 system_prompt: The system prompt text. 362 user_prompt: The user prompt text. 363 """ 364 if self.case_dir is None: 365 return 366 prompts_dir = self.case_dir / "prompts" 367 try: 368 prompts_dir.mkdir(parents=True, exist_ok=True) 369 prompt_path = prompts_dir / filename 370 prompt_path.write_text( 371 f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n", 372 encoding="utf-8", 373 ) 374 except OSError: 375 self.logger.warning("Failed to save prompt to %s", prompts_dir / filename) 376 377 # ------------------------------------------------------------------ 378 # Delegation methods — thin wrappers for backward compatibility. 379 # Methods that don't use self are exposed as staticmethod assignments 380 # at the class level (see above). The methods below need self. 381 # ------------------------------------------------------------------ 382 383 def _estimate_tokens(self, text: str) -> int: 384 """Estimate the token count of *text* using model-specific info.""" 385 return estimate_tokens(text, model_info=self.model_info) 386 387 # These are also exposed as staticmethods on the class (see above) 388 # but tests may call them on instances, so they work either way. 389 _extract_ioc_targets = staticmethod(extract_ioc_targets) 390 _format_ioc_targets = staticmethod(format_ioc_targets) 391 _build_priority_directives = staticmethod(build_priority_directives) 392 _compute_statistics = staticmethod(compute_statistics) 393 _build_full_data_csv = staticmethod(build_full_data_csv) 394 _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis) 395 396 def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]: 397 """Spot-check AI-cited values against source CSV. 398 399 Args: 400 artifact_key: Artifact identifier. 401 analysis_text: The AI's analysis text. 402 403 Returns: 404 List of warning strings. 405 """ 406 if analysis_text.startswith("Analysis failed:"): 407 return [] 408 try: 409 original_path = self._resolve_artifact_csv_path(artifact_key) 410 except FileNotFoundError: 411 return [] 412 csv_path = self._resolve_analysis_input_csv_path( 413 artifact_key, fallback=original_path, 414 ) 415 return validate_citations( 416 artifact_key=artifact_key, 417 analysis_text=analysis_text, 418 csv_path=csv_path, 419 citation_spot_check_limit=self.citation_spot_check_limit, 420 audit_log_fn=self._audit_log, 421 ) 422 423 # ------------------------------------------------------------------ 424 # Path resolution 425 # ------------------------------------------------------------------ 426 427 def _resolve_artifact_csv_path(self, artifact_key: str) -> Path: 428 """Resolve the CSV file path for a given artifact key. 429 430 For split artifacts with multiple CSV files, returns the first 431 path. Use :meth:`_resolve_all_artifact_csv_paths` to get every 432 path for a split artifact. 433 434 Args: 435 artifact_key: Artifact identifier to resolve. 436 437 Returns: 438 A ``Path`` to the artifact's CSV file. 439 440 Raises: 441 FileNotFoundError: If no CSV path can be found. 442 """ 443 mapped = self.artifact_csv_paths.get(artifact_key) 444 if mapped is not None: 445 if isinstance(mapped, list): 446 return mapped[0] 447 return mapped 448 449 normalized = normalize_artifact_key(artifact_key) 450 mapped_normalized = self.artifact_csv_paths.get(normalized) 451 if mapped_normalized is not None: 452 if isinstance(mapped_normalized, list): 453 return mapped_normalized[0] 454 return mapped_normalized 455 456 candidate_path = Path(artifact_key) 457 if candidate_path.exists(): 458 return candidate_path 459 460 if self.case_dir is not None: 461 parsed_dir = self.case_dir / "parsed" 462 if parsed_dir.exists(): 463 normalized = normalize_artifact_key(artifact_key) 464 file_stubs = { 465 artifact_key, normalized, 466 sanitize_filename(artifact_key), 467 sanitize_filename(normalized), 468 } 469 for file_stub in file_stubs: 470 direct_csv_path = parsed_dir / f"{file_stub}.csv" 471 if direct_csv_path.exists(): 472 return direct_csv_path 473 for file_stub in file_stubs: 474 prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv")) 475 if prefixed_paths: 476 return prefixed_paths[0] 477 478 raise FileNotFoundError( 479 f"No CSV path mapped for artifact '{artifact_key}'. " 480 "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths." 481 ) 482 483 def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]: 484 """Resolve all CSV file paths for a given artifact key. 485 486 For single-file artifacts returns a one-element list. For split 487 artifacts (e.g. EVTX) returns all constituent CSV paths. 488 489 Args: 490 artifact_key: Artifact identifier to resolve. 491 492 Returns: 493 A non-empty list of ``Path`` objects. 494 495 Raises: 496 FileNotFoundError: If no CSV path can be found. 497 """ 498 for key in (artifact_key, normalize_artifact_key(artifact_key)): 499 mapped = self.artifact_csv_paths.get(key) 500 if mapped is not None: 501 if isinstance(mapped, list): 502 return list(mapped) 503 return [mapped] 504 505 # Filesystem fallback: search case_dir/parsed for all matching parts. 506 if self.case_dir is not None: 507 parsed_dir = self.case_dir / "parsed" 508 if parsed_dir.exists(): 509 normalized = normalize_artifact_key(artifact_key) 510 file_stubs = { 511 artifact_key, normalized, 512 sanitize_filename(artifact_key), 513 sanitize_filename(normalized), 514 } 515 for file_stub in file_stubs: 516 direct_csv_path = parsed_dir / f"{file_stub}.csv" 517 combined_csv_path = parsed_dir / f"{file_stub}_combined.csv" 518 prefixed_paths = sorted( 519 path 520 for path in parsed_dir.glob(f"{file_stub}_*.csv") 521 if path != combined_csv_path 522 ) 523 if direct_csv_path.exists() and prefixed_paths: 524 return sorted([direct_csv_path] + prefixed_paths) 525 if prefixed_paths: 526 return prefixed_paths 527 if direct_csv_path.exists(): 528 return [direct_csv_path] 529 530 # Final fallback: delegate to single-path resolver. 531 return [self._resolve_artifact_csv_path(artifact_key)] 532 533 def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path: 534 """Concatenate multiple CSV files into a single combined CSV. 535 536 All input files are assumed to share the same schema (column names). 537 The combined file is written to the case's ``parsed/`` directory (or 538 next to the first input file) with a ``_combined`` suffix. 539 540 Args: 541 artifact_key: Artifact identifier (used for the output filename). 542 csv_paths: List of CSV file paths to combine. 543 544 Returns: 545 Path to the combined CSV file. 546 """ 547 import csv as csv_mod 548 549 output_dir = csv_paths[0].parent 550 safe_key = sanitize_filename(artifact_key) 551 combined_path = output_dir / f"{safe_key}_combined.csv" 552 553 fieldnames: list[str] = [] 554 fieldnames_set: set[str] = set() 555 556 for csv_path in csv_paths: 557 if not csv_path.exists(): 558 continue 559 with csv_path.open("r", newline="", encoding="utf-8") as fh: 560 reader = csv_mod.DictReader(fh) 561 if reader.fieldnames: 562 for fn in reader.fieldnames: 563 if fn not in fieldnames_set: 564 fieldnames.append(fn) 565 fieldnames_set.add(fn) 566 567 with combined_path.open("w", newline="", encoding="utf-8") as out: 568 writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore") 569 writer.writeheader() 570 for csv_path in csv_paths: 571 if not csv_path.exists(): 572 continue 573 with csv_path.open("r", newline="", encoding="utf-8") as fh: 574 reader = csv_mod.DictReader(fh) 575 for row in reader: 576 writer.writerow(row) 577 578 return combined_path 579 580 def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None: 581 """Store the analysis-input CSV path for an artifact. 582 583 Args: 584 artifact_key: Artifact identifier. 585 csv_path: Path to the analysis-input CSV. 586 """ 587 self._analysis_input_csv_paths[artifact_key] = csv_path 588 normalized = normalize_artifact_key(artifact_key) 589 self._analysis_input_csv_paths[normalized] = csv_path 590 591 def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path: 592 """Retrieve the analysis-input CSV path, with fallback. 593 594 Args: 595 artifact_key: Artifact identifier. 596 fallback: Default path if not stored. 597 598 Returns: 599 The stored analysis-input CSV path, or *fallback*. 600 """ 601 mapped = self._analysis_input_csv_paths.get(artifact_key) 602 if mapped is not None: 603 return mapped 604 normalized = normalize_artifact_key(artifact_key) 605 mapped = self._analysis_input_csv_paths.get(normalized) 606 if mapped is not None: 607 return mapped 608 return fallback 609 610 def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]: 611 """Look up artifact metadata from the OS-appropriate registry first. 612 613 Searches the registry matching :attr:`os_type` first so that 614 shared keys like ``services`` resolve to the correct OS-specific 615 entry. Falls back to the other registry for cross-OS lookups. 616 617 Args: 618 artifact_key: Artifact identifier. 619 620 Returns: 621 A dict with at least ``name``, ``description``, and 622 ``analysis_hint`` keys. 623 """ 624 if self.os_type == "linux": 625 registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY) 626 else: 627 registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY) 628 629 for registry in registries: 630 if artifact_key in registry: 631 metadata = registry[artifact_key] 632 return {str(key): str(value) for key, value in metadata.items()} 633 634 normalized = normalize_artifact_key(artifact_key) 635 for registry in registries: 636 if normalized in registry: 637 metadata = registry[normalized] 638 return {str(key): str(value) for key, value in metadata.items()} 639 640 return { 641 "name": artifact_key, 642 "description": "No artifact description available.", 643 "analysis_hint": "No specific analysis guidance is available for this artifact.", 644 } 645 646 # ------------------------------------------------------------------ 647 # Metadata registration 648 # ------------------------------------------------------------------ 649 650 def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None: 651 """Extract and register artifact CSV paths from run metadata. 652 653 Args: 654 metadata: Optional metadata mapping. 655 """ 656 if not isinstance(metadata, Mapping): 657 return 658 659 artifact_csv_paths = metadata.get("artifact_csv_paths") 660 if isinstance(artifact_csv_paths, Mapping): 661 for artifact_key, csv_path in artifact_csv_paths.items(): 662 if isinstance(csv_path, list) and len(csv_path) > 1: 663 self.artifact_csv_paths[str(artifact_key)] = [ 664 Path(str(p)) for p in csv_path 665 ] 666 elif isinstance(csv_path, list) and csv_path: 667 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0])) 668 else: 669 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 670 671 for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"): 672 container = metadata.get(container_key) 673 if isinstance(container, Mapping): 674 for artifact_key, value in container.items(): 675 self._register_artifact_path_entry(artifact_key=artifact_key, value=value) 676 elif isinstance(container, list): 677 for item in container: 678 if isinstance(item, Mapping): 679 artifact_key = item.get("artifact_key") or item.get("key") 680 if artifact_key: 681 self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item) 682 683 def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None: 684 """Register a single artifact CSV path from a metadata entry. 685 686 Args: 687 artifact_key: Artifact identifier. 688 value: Metadata entry (mapping, string, or Path). 689 """ 690 if artifact_key in (None, ""): 691 return 692 693 if isinstance(value, Mapping): 694 csv_path = value.get("csv_path") 695 csv_paths = value.get("csv_paths") 696 if isinstance(csv_paths, list) and len(csv_paths) > 1: 697 self.artifact_csv_paths[str(artifact_key)] = [ 698 Path(str(p)) for p in csv_paths 699 ] 700 return 701 if csv_path: 702 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 703 return 704 if isinstance(csv_paths, list) and csv_paths: 705 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0])) 706 return 707 708 if isinstance(value, (str, Path)): 709 self.artifact_csv_paths[str(artifact_key)] = Path(str(value)) 710 711 # ------------------------------------------------------------------ 712 # Core analysis pipeline 713 # ------------------------------------------------------------------ 714 715 def _prepare_artifact_data( 716 self, artifact_key: str, investigation_context: str, csv_path: Path | None = None, 717 ) -> str: 718 """Prepare one artifact CSV as a bounded, analysis-ready prompt. 719 720 Args: 721 artifact_key: Unique identifier for the artifact. 722 investigation_context: Free-text investigation context. 723 csv_path: Explicit path to the artifact CSV. 724 725 Returns: 726 The fully rendered prompt string. 727 728 Raises: 729 FileNotFoundError: If the artifact CSV cannot be located. 730 """ 731 resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key) 732 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 733 734 prompt_text, analysis_csv_path, _ = prepare_artifact_data( 735 artifact_key=artifact_key, 736 investigation_context=investigation_context, 737 csv_path=resolved_csv_path, 738 artifact_metadata=artifact_metadata, 739 artifact_prompt_template=self.artifact_prompt_template, 740 artifact_prompt_template_small_context=self.artifact_prompt_template_small_context, 741 artifact_instruction_prompts=self.artifact_instruction_prompts, 742 artifact_ai_column_projections=self.artifact_ai_column_projections, 743 artifact_deduplication_enabled=self.artifact_deduplication_enabled, 744 ai_max_tokens=self.ai_max_tokens, 745 shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens, 746 case_dir=self.case_dir, 747 audit_log_fn=self._audit_log, 748 ) 749 self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path) 750 return prompt_text 751 752 def analyze_artifact( 753 self, 754 artifact_key: str, 755 investigation_context: str, 756 progress_callback: Any | None = None, 757 ) -> dict[str, Any]: 758 """Analyze a single artifact's CSV data and return AI findings. 759 760 Args: 761 artifact_key: Unique identifier for the artifact. 762 investigation_context: Free-text investigation context. 763 progress_callback: Optional callable for streaming progress. 764 765 Returns: 766 A dict with ``artifact_key``, ``artifact_name``, ``analysis``, 767 ``model``, and optionally ``citation_warnings``. 768 """ 769 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 770 artifact_name = artifact_metadata.get("name", artifact_key) 771 model = self.model_info.get("model", "unknown") 772 provider = self.model_info.get("provider", "unknown") 773 774 self._audit_log("analysis_started", { 775 "artifact_key": artifact_key, "artifact_name": artifact_name, 776 "provider": provider, "model": model, 777 }) 778 779 start_time = perf_counter() 780 try: 781 all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key) 782 if len(all_csv_paths) > 1: 783 csv_path = self._combine_csv_files(artifact_key, all_csv_paths) 784 else: 785 csv_path = all_csv_paths[0] 786 artifact_prompt = self._prepare_artifact_data( 787 artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path, 788 ) 789 analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path) 790 attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)] 791 792 safe_key = sanitize_filename(artifact_key) 793 self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt) 794 795 prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt) 796 if prompt_tokens_estimate > self.ai_max_tokens: 797 self.logger.info( 798 "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.", 799 artifact_key, prompt_tokens_estimate, self.ai_max_tokens, 800 ) 801 if progress_callback is not None: 802 emit_analysis_progress(progress_callback, artifact_key, "started", { 803 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 804 }) 805 analysis_text = analyze_artifact_chunked( 806 artifact_prompt=artifact_prompt, 807 artifact_key=artifact_key, 808 artifact_name=artifact_name, 809 investigation_context=investigation_context, 810 model=model, 811 system_prompt=self.system_prompt, 812 ai_response_max_tokens=self.ai_response_max_tokens, 813 chunk_csv_budget=self.chunk_csv_budget, 814 chunk_merge_prompt_template=self.chunk_merge_prompt_template, 815 max_merge_rounds=self.max_merge_rounds, 816 call_ai_with_retry_fn=self._call_ai_with_retry, 817 ai_provider=self.ai_provider, 818 audit_log_fn=self._audit_log, 819 save_case_prompt_fn=self._save_case_prompt, 820 progress_callback=progress_callback, 821 ) 822 duration_seconds = perf_counter() - start_time 823 self._audit_log("analysis_completed", { 824 "artifact_key": artifact_key, "artifact_name": artifact_name, 825 "token_count": self._estimate_tokens(analysis_text), 826 "duration_seconds": round(duration_seconds, 6), 827 "status": "success", "chunked": True, 828 }) 829 citation_warnings = self._validate_citations(artifact_key, analysis_text) 830 result: dict[str, Any] = { 831 "artifact_key": artifact_key, "artifact_name": artifact_name, 832 "analysis": analysis_text, "model": model, 833 } 834 if citation_warnings: 835 result["citation_warnings"] = citation_warnings 836 return result 837 838 analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None) 839 if callable(analyze_with_progress) and progress_callback is not None: 840 emit_analysis_progress(progress_callback, artifact_key, "started", { 841 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 842 }) 843 844 def _provider_progress(payload: Mapping[str, Any]) -> None: 845 """Forward provider progress to the frontend.""" 846 if not isinstance(payload, Mapping): 847 return 848 emit_analysis_progress(progress_callback, artifact_key, "thinking", { 849 "artifact_key": artifact_key, "artifact_name": artifact_name, 850 "thinking_text": str(payload.get("thinking_text", "")), 851 "partial_text": str(payload.get("partial_text", "")), 852 "model": model, 853 }) 854 855 try: 856 analysis_text = analyze_with_progress( 857 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 858 progress_callback=_provider_progress, attachments=attachments, 859 max_tokens=self.ai_response_max_tokens, 860 ) 861 except TypeError: 862 analysis_text = analyze_with_progress( 863 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 864 progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens, 865 ) 866 else: 867 if progress_callback is not None: 868 emit_analysis_progress(progress_callback, artifact_key, "started", { 869 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 870 }) 871 analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None) 872 if callable(analyze_with_attachments): 873 analysis_text = self._call_ai_with_retry( 874 lambda: analyze_with_attachments( 875 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 876 attachments=attachments, max_tokens=self.ai_response_max_tokens, 877 ) 878 ) 879 else: 880 analysis_text = self._call_ai_with_retry( 881 lambda: self.ai_provider.analyze( 882 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 883 max_tokens=self.ai_response_max_tokens, 884 ) 885 ) 886 duration_seconds = perf_counter() - start_time 887 self._audit_log("analysis_completed", { 888 "artifact_key": artifact_key, "artifact_name": artifact_name, 889 "token_count": self._estimate_tokens(analysis_text), 890 "duration_seconds": round(duration_seconds, 6), "status": "success", 891 }) 892 except Exception as error: 893 duration_seconds = perf_counter() - start_time 894 analysis_text = f"Analysis failed: {error}" 895 self._audit_log("analysis_completed", { 896 "artifact_key": artifact_key, "artifact_name": artifact_name, 897 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 898 "status": "failed", "error": str(error), 899 }) 900 901 citation_warnings = self._validate_citations(artifact_key, analysis_text) 902 903 result = { 904 "artifact_key": artifact_key, "artifact_name": artifact_name, 905 "analysis": analysis_text, "model": model, 906 } 907 if citation_warnings: 908 result["citation_warnings"] = citation_warnings 909 return result 910 911 def generate_summary( 912 self, 913 per_artifact_results: list[Mapping[str, Any]], 914 investigation_context: str, 915 metadata: Mapping[str, Any] | None, 916 ) -> str: 917 """Generate a cross-artifact summary by correlating findings. 918 919 Args: 920 per_artifact_results: List of per-artifact result dicts. 921 investigation_context: The user's investigation context. 922 metadata: Optional host metadata mapping. 923 924 Returns: 925 The AI-generated summary text, or an error message. 926 """ 927 metadata_map = metadata if isinstance(metadata, Mapping) else {} 928 summary_prompt = build_summary_prompt( 929 summary_prompt_template=self.summary_prompt_template, 930 investigation_context=investigation_context, 931 per_artifact_results=per_artifact_results, 932 metadata_map=metadata_map, 933 ) 934 935 model = self.model_info.get("model", "unknown") 936 provider = self.model_info.get("provider", "unknown") 937 summary_artifact_key = "cross_artifact_summary" 938 summary_artifact_name = "Cross-Artifact Summary" 939 summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md" 940 941 self._audit_log("analysis_started", { 942 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 943 "provider": provider, "model": model, 944 }) 945 self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt) 946 947 start_time = perf_counter() 948 try: 949 summary = self._call_ai_with_retry( 950 lambda: self.ai_provider.analyze( 951 system_prompt=self.system_prompt, user_prompt=summary_prompt, 952 max_tokens=self.ai_response_max_tokens, 953 ) 954 ) 955 duration_seconds = perf_counter() - start_time 956 self._audit_log("analysis_completed", { 957 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 958 "token_count": self._estimate_tokens(summary), 959 "duration_seconds": round(duration_seconds, 6), "status": "success", 960 }) 961 return summary 962 except Exception as error: 963 duration_seconds = perf_counter() - start_time 964 summary = f"Analysis failed: {error}" 965 self._audit_log("analysis_completed", { 966 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 967 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 968 "status": "failed", "error": str(error), 969 }) 970 return summary 971 972 def run_full_analysis( 973 self, 974 artifact_keys: Iterable[str], 975 investigation_context: str, 976 metadata: Mapping[str, Any] | None, 977 progress_callback: Any | None = None, 978 cancel_check: Any | None = None, 979 ) -> dict[str, Any]: 980 """Run the complete analysis pipeline: per-artifact then summary. 981 982 Args: 983 artifact_keys: Iterable of artifact key strings. 984 investigation_context: The user's investigation context. 985 metadata: Optional metadata mapping. 986 progress_callback: Optional callable for streaming progress. 987 cancel_check: Optional callable returning ``True`` when the 988 analysis should be aborted early. 989 990 Returns: 991 A dict with ``per_artifact``, ``summary``, and ``model_info``. 992 993 Raises: 994 AnalysisCancelledError: If *cancel_check* returns ``True``. 995 """ 996 if isinstance(self.ai_provider, UnavailableProvider): 997 raise AIProviderError(self.ai_provider._error_message) 998 999 self._register_artifact_paths_from_metadata(metadata) 1000 per_artifact_results: list[dict[str, Any]] = [] 1001 for artifact_key in artifact_keys: 1002 if cancel_check is not None and cancel_check(): 1003 raise AnalysisCancelledError("Analysis cancelled by user.") 1004 result = self.analyze_artifact( 1005 artifact_key=str(artifact_key), 1006 investigation_context=investigation_context, 1007 progress_callback=progress_callback, 1008 ) 1009 per_artifact_results.append(result) 1010 if progress_callback is not None: 1011 emit_analysis_progress(progress_callback, str(artifact_key), "complete", result) 1012 1013 summary = self.generate_summary( 1014 per_artifact_results=per_artifact_results, 1015 investigation_context=investigation_context, 1016 metadata=metadata, 1017 ) 1018 return { 1019 "per_artifact": per_artifact_results, 1020 "summary": summary, 1021 "model_info": dict(self.model_info), 1022 }
Orchestrates AI-powered forensic analysis of parsed artifact CSV data.
Central analysis engine for AIFT: reads parsed artifact CSV files, applies column projection, and deduplication, builds token-budgeted prompts, sends them to a configured AI provider, and validates citations.
Attributes:
- case_dir: Path to the case directory, or
None. - config: Merged configuration dictionary.
- ai_provider: The configured AI provider instance.
- model_info: Dict with
providerandmodelkeys.
107 def __init__( 108 self, 109 case_dir: str | Path | Mapping[str, str | Path] | None = None, 110 config: Mapping[str, Any] | None = None, 111 audit_logger: Any | None = None, 112 artifact_csv_paths: Mapping[str, str | Path] | None = None, 113 prompts_dir: str | Path | None = None, 114 random_seed: int | None = None, 115 os_type: str = "windows", 116 ) -> None: 117 """Initialize the forensic analyzer with case context and configuration. 118 119 Args: 120 case_dir: Path to the case directory, or a mapping of artifact 121 keys to CSV paths (convenience shorthand). 122 config: Application configuration dictionary. 123 audit_logger: Optional object with a ``log(action, details)`` 124 method. 125 artifact_csv_paths: Mapping of artifact keys to CSV paths. 126 prompts_dir: Directory containing prompt template files. 127 random_seed: Optional seed for the internal RNG. 128 os_type: Detected operating system type (``"windows"``, 129 ``"linux"``, etc.). Controls which artifact instruction 130 prompts are loaded. 131 """ 132 if ( 133 isinstance(case_dir, Mapping) 134 and config is None 135 and audit_logger is None 136 and artifact_csv_paths is None 137 ): 138 artifact_csv_paths = case_dir 139 case_dir = None 140 141 self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None 142 self.logger = LOGGER 143 self.config = dict(config) if isinstance(config, Mapping) else {} 144 self.audit_logger = audit_logger 145 self.artifact_csv_paths: dict[str, Path | list[Path]] = {} 146 for artifact_key, csv_path in (artifact_csv_paths or {}).items(): 147 key = str(artifact_key) 148 if isinstance(csv_path, list): 149 self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path] 150 else: 151 self.artifact_csv_paths[key] = Path(str(csv_path)) 152 self._analysis_input_csv_paths: dict[str, Path] = {} 153 self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts" 154 self.os_type = normalize_os_type(os_type) 155 import random 156 self._random = random.Random(random_seed) 157 self._load_analysis_settings() 158 self.artifact_ai_column_projections = self._load_artifact_ai_column_projections() 159 self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT) 160 self.artifact_prompt_template = self._load_prompt_template( 161 "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 162 ) 163 self.artifact_prompt_template_small_context = self._load_prompt_template( 164 "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, 165 ) 166 self.artifact_instruction_prompts = self._load_artifact_instruction_prompts() 167 self.summary_prompt_template = self._load_prompt_template( 168 "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE, 169 ) 170 self.chunk_merge_prompt_template = self._load_prompt_template( 171 "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 172 ) 173 self.ai_provider = self._create_ai_provider() 174 self.model_info = self._read_model_info()
Initialize the forensic analyzer with case context and configuration.
Arguments:
- case_dir: Path to the case directory, or a mapping of artifact keys to CSV paths (convenience shorthand).
- config: Application configuration dictionary.
- audit_logger: Optional object with a
log(action, details)method. - artifact_csv_paths: Mapping of artifact keys to CSV paths.
- prompts_dir: Directory containing prompt template files.
- random_seed: Optional seed for the internal RNG.
- os_type: Detected operating system type (
"windows","linux", etc.). Controls which artifact instruction prompts are loaded.
752 def analyze_artifact( 753 self, 754 artifact_key: str, 755 investigation_context: str, 756 progress_callback: Any | None = None, 757 ) -> dict[str, Any]: 758 """Analyze a single artifact's CSV data and return AI findings. 759 760 Args: 761 artifact_key: Unique identifier for the artifact. 762 investigation_context: Free-text investigation context. 763 progress_callback: Optional callable for streaming progress. 764 765 Returns: 766 A dict with ``artifact_key``, ``artifact_name``, ``analysis``, 767 ``model``, and optionally ``citation_warnings``. 768 """ 769 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 770 artifact_name = artifact_metadata.get("name", artifact_key) 771 model = self.model_info.get("model", "unknown") 772 provider = self.model_info.get("provider", "unknown") 773 774 self._audit_log("analysis_started", { 775 "artifact_key": artifact_key, "artifact_name": artifact_name, 776 "provider": provider, "model": model, 777 }) 778 779 start_time = perf_counter() 780 try: 781 all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key) 782 if len(all_csv_paths) > 1: 783 csv_path = self._combine_csv_files(artifact_key, all_csv_paths) 784 else: 785 csv_path = all_csv_paths[0] 786 artifact_prompt = self._prepare_artifact_data( 787 artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path, 788 ) 789 analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path) 790 attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)] 791 792 safe_key = sanitize_filename(artifact_key) 793 self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt) 794 795 prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt) 796 if prompt_tokens_estimate > self.ai_max_tokens: 797 self.logger.info( 798 "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.", 799 artifact_key, prompt_tokens_estimate, self.ai_max_tokens, 800 ) 801 if progress_callback is not None: 802 emit_analysis_progress(progress_callback, artifact_key, "started", { 803 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 804 }) 805 analysis_text = analyze_artifact_chunked( 806 artifact_prompt=artifact_prompt, 807 artifact_key=artifact_key, 808 artifact_name=artifact_name, 809 investigation_context=investigation_context, 810 model=model, 811 system_prompt=self.system_prompt, 812 ai_response_max_tokens=self.ai_response_max_tokens, 813 chunk_csv_budget=self.chunk_csv_budget, 814 chunk_merge_prompt_template=self.chunk_merge_prompt_template, 815 max_merge_rounds=self.max_merge_rounds, 816 call_ai_with_retry_fn=self._call_ai_with_retry, 817 ai_provider=self.ai_provider, 818 audit_log_fn=self._audit_log, 819 save_case_prompt_fn=self._save_case_prompt, 820 progress_callback=progress_callback, 821 ) 822 duration_seconds = perf_counter() - start_time 823 self._audit_log("analysis_completed", { 824 "artifact_key": artifact_key, "artifact_name": artifact_name, 825 "token_count": self._estimate_tokens(analysis_text), 826 "duration_seconds": round(duration_seconds, 6), 827 "status": "success", "chunked": True, 828 }) 829 citation_warnings = self._validate_citations(artifact_key, analysis_text) 830 result: dict[str, Any] = { 831 "artifact_key": artifact_key, "artifact_name": artifact_name, 832 "analysis": analysis_text, "model": model, 833 } 834 if citation_warnings: 835 result["citation_warnings"] = citation_warnings 836 return result 837 838 analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None) 839 if callable(analyze_with_progress) and progress_callback is not None: 840 emit_analysis_progress(progress_callback, artifact_key, "started", { 841 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 842 }) 843 844 def _provider_progress(payload: Mapping[str, Any]) -> None: 845 """Forward provider progress to the frontend.""" 846 if not isinstance(payload, Mapping): 847 return 848 emit_analysis_progress(progress_callback, artifact_key, "thinking", { 849 "artifact_key": artifact_key, "artifact_name": artifact_name, 850 "thinking_text": str(payload.get("thinking_text", "")), 851 "partial_text": str(payload.get("partial_text", "")), 852 "model": model, 853 }) 854 855 try: 856 analysis_text = analyze_with_progress( 857 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 858 progress_callback=_provider_progress, attachments=attachments, 859 max_tokens=self.ai_response_max_tokens, 860 ) 861 except TypeError: 862 analysis_text = analyze_with_progress( 863 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 864 progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens, 865 ) 866 else: 867 if progress_callback is not None: 868 emit_analysis_progress(progress_callback, artifact_key, "started", { 869 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 870 }) 871 analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None) 872 if callable(analyze_with_attachments): 873 analysis_text = self._call_ai_with_retry( 874 lambda: analyze_with_attachments( 875 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 876 attachments=attachments, max_tokens=self.ai_response_max_tokens, 877 ) 878 ) 879 else: 880 analysis_text = self._call_ai_with_retry( 881 lambda: self.ai_provider.analyze( 882 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 883 max_tokens=self.ai_response_max_tokens, 884 ) 885 ) 886 duration_seconds = perf_counter() - start_time 887 self._audit_log("analysis_completed", { 888 "artifact_key": artifact_key, "artifact_name": artifact_name, 889 "token_count": self._estimate_tokens(analysis_text), 890 "duration_seconds": round(duration_seconds, 6), "status": "success", 891 }) 892 except Exception as error: 893 duration_seconds = perf_counter() - start_time 894 analysis_text = f"Analysis failed: {error}" 895 self._audit_log("analysis_completed", { 896 "artifact_key": artifact_key, "artifact_name": artifact_name, 897 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 898 "status": "failed", "error": str(error), 899 }) 900 901 citation_warnings = self._validate_citations(artifact_key, analysis_text) 902 903 result = { 904 "artifact_key": artifact_key, "artifact_name": artifact_name, 905 "analysis": analysis_text, "model": model, 906 } 907 if citation_warnings: 908 result["citation_warnings"] = citation_warnings 909 return result
Analyze a single artifact's CSV data and return AI findings.
Arguments:
- artifact_key: Unique identifier for the artifact.
- investigation_context: Free-text investigation context.
- progress_callback: Optional callable for streaming progress.
Returns:
A dict with
artifact_key,artifact_name,analysis,model, and optionallycitation_warnings.
911 def generate_summary( 912 self, 913 per_artifact_results: list[Mapping[str, Any]], 914 investigation_context: str, 915 metadata: Mapping[str, Any] | None, 916 ) -> str: 917 """Generate a cross-artifact summary by correlating findings. 918 919 Args: 920 per_artifact_results: List of per-artifact result dicts. 921 investigation_context: The user's investigation context. 922 metadata: Optional host metadata mapping. 923 924 Returns: 925 The AI-generated summary text, or an error message. 926 """ 927 metadata_map = metadata if isinstance(metadata, Mapping) else {} 928 summary_prompt = build_summary_prompt( 929 summary_prompt_template=self.summary_prompt_template, 930 investigation_context=investigation_context, 931 per_artifact_results=per_artifact_results, 932 metadata_map=metadata_map, 933 ) 934 935 model = self.model_info.get("model", "unknown") 936 provider = self.model_info.get("provider", "unknown") 937 summary_artifact_key = "cross_artifact_summary" 938 summary_artifact_name = "Cross-Artifact Summary" 939 summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md" 940 941 self._audit_log("analysis_started", { 942 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 943 "provider": provider, "model": model, 944 }) 945 self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt) 946 947 start_time = perf_counter() 948 try: 949 summary = self._call_ai_with_retry( 950 lambda: self.ai_provider.analyze( 951 system_prompt=self.system_prompt, user_prompt=summary_prompt, 952 max_tokens=self.ai_response_max_tokens, 953 ) 954 ) 955 duration_seconds = perf_counter() - start_time 956 self._audit_log("analysis_completed", { 957 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 958 "token_count": self._estimate_tokens(summary), 959 "duration_seconds": round(duration_seconds, 6), "status": "success", 960 }) 961 return summary 962 except Exception as error: 963 duration_seconds = perf_counter() - start_time 964 summary = f"Analysis failed: {error}" 965 self._audit_log("analysis_completed", { 966 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 967 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 968 "status": "failed", "error": str(error), 969 }) 970 return summary
Generate a cross-artifact summary by correlating findings.
Arguments:
- per_artifact_results: List of per-artifact result dicts.
- investigation_context: The user's investigation context.
- metadata: Optional host metadata mapping.
Returns:
The AI-generated summary text, or an error message.
972 def run_full_analysis( 973 self, 974 artifact_keys: Iterable[str], 975 investigation_context: str, 976 metadata: Mapping[str, Any] | None, 977 progress_callback: Any | None = None, 978 cancel_check: Any | None = None, 979 ) -> dict[str, Any]: 980 """Run the complete analysis pipeline: per-artifact then summary. 981 982 Args: 983 artifact_keys: Iterable of artifact key strings. 984 investigation_context: The user's investigation context. 985 metadata: Optional metadata mapping. 986 progress_callback: Optional callable for streaming progress. 987 cancel_check: Optional callable returning ``True`` when the 988 analysis should be aborted early. 989 990 Returns: 991 A dict with ``per_artifact``, ``summary``, and ``model_info``. 992 993 Raises: 994 AnalysisCancelledError: If *cancel_check* returns ``True``. 995 """ 996 if isinstance(self.ai_provider, UnavailableProvider): 997 raise AIProviderError(self.ai_provider._error_message) 998 999 self._register_artifact_paths_from_metadata(metadata) 1000 per_artifact_results: list[dict[str, Any]] = [] 1001 for artifact_key in artifact_keys: 1002 if cancel_check is not None and cancel_check(): 1003 raise AnalysisCancelledError("Analysis cancelled by user.") 1004 result = self.analyze_artifact( 1005 artifact_key=str(artifact_key), 1006 investigation_context=investigation_context, 1007 progress_callback=progress_callback, 1008 ) 1009 per_artifact_results.append(result) 1010 if progress_callback is not None: 1011 emit_analysis_progress(progress_callback, str(artifact_key), "complete", result) 1012 1013 summary = self.generate_summary( 1014 per_artifact_results=per_artifact_results, 1015 investigation_context=investigation_context, 1016 metadata=metadata, 1017 ) 1018 return { 1019 "per_artifact": per_artifact_results, 1020 "summary": summary, 1021 "model_info": dict(self.model_info), 1022 }
Run the complete analysis pipeline: per-artifact then summary.
Arguments:
- artifact_keys: Iterable of artifact key strings.
- investigation_context: The user's investigation context.
- metadata: Optional metadata mapping.
- progress_callback: Optional callable for streaming progress.
- cancel_check: Optional callable returning
Truewhen the analysis should be aborted early.
Returns:
A dict with
per_artifact,summary, andmodel_info.
Raises:
- AnalysisCancelledError: If cancel_check returns
True.