app.analyzer
AI analysis orchestration package for forensic triage.
Provides the ForensicAnalyzer class and supporting sub-modules for
token budgeting, date filtering, column projection, deduplication,
chunked analysis, citation validation, IOC extraction, and audit logging.
Sub-modules:
constants: Compile-time constants, regex, prompt templates.utils: Pure utility functions (string, datetime, CSV, config readers).ioc: IOC extraction and prompt-building helpers.citations: Citation validation against source CSV.data_prep: Date filtering, dedup, statistics, prompt assembly.chunking: Chunked analysis and hierarchical merge.prompts: Prompt template loading and construction.core: TheForensicAnalyzerclass itself.
1"""AI analysis orchestration package for forensic triage. 2 3Provides the ``ForensicAnalyzer`` class and supporting sub-modules for 4token budgeting, date filtering, column projection, deduplication, 5chunked analysis, citation validation, IOC extraction, and audit logging. 6 7Sub-modules: 8 9- ``constants``: Compile-time constants, regex, prompt templates. 10- ``utils``: Pure utility functions (string, datetime, CSV, config readers). 11- ``ioc``: IOC extraction and prompt-building helpers. 12- ``citations``: Citation validation against source CSV. 13- ``data_prep``: Date filtering, dedup, statistics, prompt assembly. 14- ``chunking``: Chunked analysis and hierarchical merge. 15- ``prompts``: Prompt template loading and construction. 16- ``core``: The ``ForensicAnalyzer`` class itself. 17""" 18 19from __future__ import annotations 20 21from .constants import PROJECT_ROOT 22from .core import ForensicAnalyzer 23 24__all__ = [ 25 "ForensicAnalyzer", 26 "PROJECT_ROOT", 27 "chunking", 28 "citations", 29 "constants", 30 "core", 31 "data_prep", 32 "ioc", 33 "prompts", 34 "utils", 35]
78class ForensicAnalyzer: 79 """Orchestrates AI-powered forensic analysis of parsed artifact CSV data. 80 81 Central analysis engine for AIFT: reads parsed artifact CSV files, applies 82 column projection, and deduplication, builds token-budgeted 83 prompts, sends them to a configured AI provider, and validates citations. 84 85 Attributes: 86 case_dir: Path to the case directory, or ``None``. 87 config: Merged configuration dictionary. 88 ai_provider: The configured AI provider instance. 89 model_info: Dict with ``provider`` and ``model`` keys. 90 """ 91 92 # Expose extracted functions as static methods for backward compatibility 93 # with tests and callers that use ForensicAnalyzer._method_name(). 94 _stringify_value = staticmethod(stringify_value) 95 _build_datetime = staticmethod(build_datetime) 96 _normalize_artifact_key = staticmethod(normalize_artifact_key) 97 _sanitize_filename = staticmethod(sanitize_filename) 98 _split_csv_into_chunks = staticmethod(split_csv_into_chunks) 99 _split_csv_and_suffix = staticmethod(split_csv_and_suffix) 100 _coerce_projection_columns = staticmethod(coerce_projection_columns) 101 _is_dedup_safe_identifier_column = staticmethod(is_dedup_safe_identifier_column) 102 _timestamp_lookup_keys = staticmethod(timestamp_lookup_keys) 103 _timestamp_found_in_csv = staticmethod(timestamp_found_in_csv) 104 _match_column_name = staticmethod(match_column_name) 105 _emit_analysis_progress = staticmethod(emit_analysis_progress) 106 107 def __init__( 108 self, 109 case_dir: str | Path | Mapping[str, str | Path] | None = None, 110 config: Mapping[str, Any] | None = None, 111 audit_logger: Any | None = None, 112 artifact_csv_paths: Mapping[str, str | Path] | None = None, 113 prompts_dir: str | Path | None = None, 114 random_seed: int | None = None, 115 os_type: str = "windows", 116 ) -> None: 117 """Initialize the forensic analyzer with case context and configuration. 118 119 Args: 120 case_dir: Path to the case directory, or a mapping of artifact 121 keys to CSV paths (convenience shorthand). 122 config: Application configuration dictionary. 123 audit_logger: Optional object with a ``log(action, details)`` 124 method. 125 artifact_csv_paths: Mapping of artifact keys to CSV paths. 126 prompts_dir: Directory containing prompt template files. 127 random_seed: Optional seed for the internal RNG. 128 os_type: Detected operating system type (``"windows"``, 129 ``"linux"``, etc.). Controls which artifact instruction 130 prompts are loaded. 131 """ 132 if ( 133 isinstance(case_dir, Mapping) 134 and config is None 135 and audit_logger is None 136 and artifact_csv_paths is None 137 ): 138 artifact_csv_paths = case_dir 139 case_dir = None 140 141 self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None 142 self.logger = LOGGER 143 self.config = dict(config) if isinstance(config, Mapping) else {} 144 self.audit_logger = audit_logger 145 self.artifact_csv_paths: dict[str, Path | list[Path]] = {} 146 for artifact_key, csv_path in (artifact_csv_paths or {}).items(): 147 key = str(artifact_key) 148 if isinstance(csv_path, list): 149 self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path] 150 else: 151 self.artifact_csv_paths[key] = Path(str(csv_path)) 152 self._analysis_input_csv_paths: dict[str, Path] = {} 153 self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts" 154 self.os_type = normalize_os_type(os_type) 155 import random 156 self._random = random.Random(random_seed) 157 self._load_analysis_settings() 158 self.artifact_ai_column_projections = self._load_artifact_ai_column_projections() 159 self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT) 160 self.artifact_prompt_template = self._load_prompt_template( 161 "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 162 ) 163 self.artifact_prompt_template_small_context = self._load_prompt_template( 164 "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, 165 ) 166 self.artifact_instruction_prompts = self._load_artifact_instruction_prompts() 167 self.summary_prompt_template = self._load_prompt_template( 168 "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE, 169 ) 170 self.chunk_merge_prompt_template = self._load_prompt_template( 171 "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 172 ) 173 self.ai_provider = self._create_ai_provider() 174 self.model_info = self._read_model_info() 175 176 # ------------------------------------------------------------------ 177 # Configuration loading 178 # ------------------------------------------------------------------ 179 180 def _load_analysis_settings(self) -> None: 181 """Load and validate analysis tuning parameters from the config dict.""" 182 analysis_config = self.config.get("analysis") 183 if not isinstance(analysis_config, Mapping): 184 analysis_config = {} 185 186 self.ai_max_tokens = read_int_setting(analysis_config, "ai_max_tokens", AI_MAX_TOKENS, minimum=1) 187 self.ai_response_max_tokens = max(1, int(self.ai_max_tokens * 0.2)) 188 legacy_shortened = read_int_setting( 189 analysis_config, "statistics_section_cutoff_tokens", DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS, minimum=1, 190 ) 191 self.shortened_prompt_cutoff_tokens = read_int_setting( 192 analysis_config, "shortened_prompt_cutoff_tokens", legacy_shortened, minimum=1, 193 ) 194 self.chunk_csv_budget = int(self.ai_max_tokens * TOKEN_CHAR_RATIO * 0.6) 195 self.citation_spot_check_limit = read_int_setting( 196 analysis_config, "citation_spot_check_limit", CITATION_SPOT_CHECK_LIMIT, minimum=1, 197 ) 198 self.max_merge_rounds = read_int_setting(analysis_config, "max_merge_rounds", MAX_MERGE_ROUNDS, minimum=1) 199 self.artifact_deduplication_enabled = read_bool_setting( 200 analysis_config, "artifact_deduplication_enabled", ARTIFACT_DEDUPLICATION_ENABLED, 201 ) 202 self.artifact_ai_columns_config_path = read_path_setting( 203 analysis_config, "artifact_ai_columns_config_path", str(DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH), 204 ) 205 206 # Backward-compatible aliases for the extracted config readers. 207 _read_int_setting = staticmethod(read_int_setting) 208 _read_bool_setting = staticmethod(read_bool_setting) 209 _read_path_setting = staticmethod(read_path_setting) 210 211 def _resolve_artifact_ai_columns_config_path(self) -> Path: 212 """Resolve the artifact AI columns config path to an absolute Path. 213 214 Delegates to :func:`prompts.resolve_artifact_ai_columns_config_path`. 215 216 Returns: 217 Resolved absolute ``Path`` to the YAML config file. 218 """ 219 return resolve_artifact_ai_columns_config_path( 220 self.artifact_ai_columns_config_path, self.case_dir, 221 ) 222 223 def _load_artifact_ai_column_projections(self) -> dict[str, tuple[str, ...]]: 224 """Load per-artifact column projection configuration from YAML. 225 226 Delegates to :func:`prompts.load_artifact_ai_column_projections`. 227 228 Returns: 229 A dict mapping normalized artifact keys to tuples of column names. 230 """ 231 config_path = self._resolve_artifact_ai_columns_config_path() 232 return load_artifact_ai_column_projections(config_path, os_type=self.os_type) 233 234 def _load_prompt_template(self, filename: str, default: str) -> str: 235 """Read a prompt template file from the prompts directory. 236 237 Delegates to :func:`prompts.load_prompt_template`. 238 239 Args: 240 filename: Name of the template file. 241 default: Fallback template string. 242 243 Returns: 244 The template text. 245 """ 246 return load_prompt_template(self.prompts_dir, filename, default) 247 248 def _load_artifact_instruction_prompts(self) -> dict[str, str]: 249 """Load per-artifact analysis instruction prompts. 250 251 Delegates to :func:`prompts.load_artifact_instruction_prompts`, 252 passing :attr:`os_type` so the correct OS-specific instruction 253 directory is selected. 254 255 Returns: 256 A dict mapping artifact keys to instruction prompt text. 257 """ 258 return load_artifact_instruction_prompts(self.prompts_dir, os_type=self.os_type) 259 260 # ------------------------------------------------------------------ 261 # AI provider 262 # ------------------------------------------------------------------ 263 264 def _create_ai_provider(self) -> Any: 265 """Instantiate the configured AI provider, or a fallback on failure. 266 267 Returns: 268 An AI provider instance, or an ``UnavailableProvider``. 269 """ 270 provider_config: Mapping[str, Any] 271 if self.config: 272 provider_config = self.config 273 else: 274 provider_config = { 275 "ai": { 276 "provider": "local", 277 "local": { 278 "base_url": "http://localhost:11434/v1", 279 "model": "llama3.1:70b", 280 "api_key": "not-needed", 281 }, 282 } 283 } 284 try: 285 return create_provider(dict(provider_config)) 286 except Exception as error: 287 return UnavailableProvider(str(error)) 288 289 def _read_model_info(self) -> dict[str, str]: 290 """Read provider and model metadata from the AI provider. 291 292 Returns: 293 A dict with at least ``provider`` and ``model`` keys. 294 """ 295 try: 296 model_info = self.ai_provider.get_model_info() 297 except Exception: 298 return {"provider": "unknown", "model": "unknown"} 299 300 if not isinstance(model_info, Mapping): 301 return {"provider": "unknown", "model": "unknown"} 302 303 return {str(key): str(value) for key, value in model_info.items()} 304 305 def _call_ai_with_retry(self, call: Callable[[], str]) -> str: 306 """Call the AI provider with retry on transient failures. 307 308 Args: 309 call: A zero-argument callable that invokes the AI provider. 310 311 Returns: 312 The AI provider's response string. 313 314 Raises: 315 AIProviderError: If the provider raises a permanent error. 316 Exception: The last transient error after all retries. 317 """ 318 last_error: Exception | None = None 319 for attempt in range(AI_RETRY_ATTEMPTS): 320 try: 321 return call() 322 except AIProviderError: 323 raise 324 except Exception as error: 325 last_error = error 326 if attempt < AI_RETRY_ATTEMPTS - 1: 327 delay = AI_RETRY_BASE_DELAY * (2 ** attempt) 328 self.logger.warning( 329 "AI provider call failed (attempt %d/%d), retrying in %.1fs: %s", 330 attempt + 1, AI_RETRY_ATTEMPTS, delay, error, 331 ) 332 sleep(delay) 333 raise last_error # type: ignore[misc] 334 335 # ------------------------------------------------------------------ 336 # Audit / prompt saving 337 # ------------------------------------------------------------------ 338 339 def _audit_log(self, action: str, details: dict[str, Any]) -> None: 340 """Write an entry to the forensic audit trail. 341 342 Args: 343 action: The audit action name. 344 details: Key-value details for the audit entry. 345 """ 346 if self.audit_logger is None: 347 return 348 logger = getattr(self.audit_logger, "log", None) 349 if not callable(logger): 350 return 351 try: 352 logger(action, details) 353 except Exception: 354 return 355 356 def _save_case_prompt(self, filename: str, system_prompt: str, user_prompt: str) -> None: 357 """Save a prompt to the case prompts directory for audit. 358 359 Args: 360 filename: Output filename. 361 system_prompt: The system prompt text. 362 user_prompt: The user prompt text. 363 """ 364 if self.case_dir is None: 365 return 366 prompts_dir = self.case_dir / "prompts" 367 try: 368 prompts_dir.mkdir(parents=True, exist_ok=True) 369 prompt_path = prompts_dir / filename 370 prompt_path.write_text( 371 f"# System Prompt\n\n{system_prompt}\n\n---\n\n# User Prompt\n\n{user_prompt}\n", 372 encoding="utf-8", 373 ) 374 except OSError: 375 self.logger.warning("Failed to save prompt to %s", prompts_dir / filename) 376 377 # ------------------------------------------------------------------ 378 # Delegation methods — thin wrappers for backward compatibility. 379 # Methods that don't use self are exposed as staticmethod assignments 380 # at the class level (see above). The methods below need self. 381 # ------------------------------------------------------------------ 382 383 def _estimate_tokens(self, text: str) -> int: 384 """Estimate the token count of *text* using model-specific info.""" 385 return estimate_tokens(text, model_info=self.model_info) 386 387 # These are also exposed as staticmethods on the class (see above) 388 # but tests may call them on instances, so they work either way. 389 _extract_ioc_targets = staticmethod(extract_ioc_targets) 390 _format_ioc_targets = staticmethod(format_ioc_targets) 391 _build_priority_directives = staticmethod(build_priority_directives) 392 _compute_statistics = staticmethod(compute_statistics) 393 _build_full_data_csv = staticmethod(build_full_data_csv) 394 _deduplicate_rows_for_analysis = staticmethod(deduplicate_rows_for_analysis) 395 396 def _validate_citations(self, artifact_key: str, analysis_text: str) -> list[str]: 397 """Spot-check AI-cited values against source CSV. 398 399 Args: 400 artifact_key: Artifact identifier. 401 analysis_text: The AI's analysis text. 402 403 Returns: 404 List of warning strings. 405 """ 406 if analysis_text.startswith("Analysis failed:"): 407 return [] 408 try: 409 original_path = self._resolve_artifact_csv_path(artifact_key) 410 except FileNotFoundError: 411 return [] 412 csv_path = self._resolve_analysis_input_csv_path( 413 artifact_key, fallback=original_path, 414 ) 415 return validate_citations( 416 artifact_key=artifact_key, 417 analysis_text=analysis_text, 418 csv_path=csv_path, 419 citation_spot_check_limit=self.citation_spot_check_limit, 420 audit_log_fn=self._audit_log, 421 ) 422 423 # ------------------------------------------------------------------ 424 # Path resolution 425 # ------------------------------------------------------------------ 426 427 def _resolve_artifact_csv_path(self, artifact_key: str) -> Path: 428 """Resolve the CSV file path for a given artifact key. 429 430 For split artifacts with multiple CSV files, returns the first 431 path. Use :meth:`_resolve_all_artifact_csv_paths` to get every 432 path for a split artifact. 433 434 Args: 435 artifact_key: Artifact identifier to resolve. 436 437 Returns: 438 A ``Path`` to the artifact's CSV file. 439 440 Raises: 441 FileNotFoundError: If no CSV path can be found. 442 """ 443 mapped = self.artifact_csv_paths.get(artifact_key) 444 if mapped is not None: 445 if isinstance(mapped, list): 446 return mapped[0] 447 return mapped 448 449 normalized = normalize_artifact_key(artifact_key) 450 mapped_normalized = self.artifact_csv_paths.get(normalized) 451 if mapped_normalized is not None: 452 if isinstance(mapped_normalized, list): 453 return mapped_normalized[0] 454 return mapped_normalized 455 456 candidate_path = Path(artifact_key) 457 if candidate_path.exists(): 458 return candidate_path 459 460 if self.case_dir is not None: 461 parsed_dir = self.case_dir / "parsed" 462 if parsed_dir.exists(): 463 normalized = normalize_artifact_key(artifact_key) 464 file_stubs = { 465 artifact_key, normalized, 466 sanitize_filename(artifact_key), 467 sanitize_filename(normalized), 468 } 469 for file_stub in file_stubs: 470 direct_csv_path = parsed_dir / f"{file_stub}.csv" 471 if direct_csv_path.exists(): 472 return direct_csv_path 473 for file_stub in file_stubs: 474 prefixed_paths = sorted(parsed_dir.glob(f"{file_stub}_*.csv")) 475 if prefixed_paths: 476 return prefixed_paths[0] 477 478 raise FileNotFoundError( 479 f"No CSV path mapped for artifact '{artifact_key}'. " 480 "Provide it in ForensicAnalyzer(artifact_csv_paths=...) or use case_dir/parsed CSV paths." 481 ) 482 483 def _resolve_all_artifact_csv_paths(self, artifact_key: str) -> list[Path]: 484 """Resolve all CSV file paths for a given artifact key. 485 486 For single-file artifacts returns a one-element list. For split 487 artifacts (e.g. EVTX) returns all constituent CSV paths. 488 489 Args: 490 artifact_key: Artifact identifier to resolve. 491 492 Returns: 493 A non-empty list of ``Path`` objects. 494 495 Raises: 496 FileNotFoundError: If no CSV path can be found. 497 """ 498 for key in (artifact_key, normalize_artifact_key(artifact_key)): 499 mapped = self.artifact_csv_paths.get(key) 500 if mapped is not None: 501 if isinstance(mapped, list): 502 return list(mapped) 503 return [mapped] 504 505 # Filesystem fallback: search case_dir/parsed for all matching parts. 506 if self.case_dir is not None: 507 parsed_dir = self.case_dir / "parsed" 508 if parsed_dir.exists(): 509 normalized = normalize_artifact_key(artifact_key) 510 file_stubs = { 511 artifact_key, normalized, 512 sanitize_filename(artifact_key), 513 sanitize_filename(normalized), 514 } 515 for file_stub in file_stubs: 516 direct_csv_path = parsed_dir / f"{file_stub}.csv" 517 combined_csv_path = parsed_dir / f"{file_stub}_combined.csv" 518 prefixed_paths = sorted( 519 path 520 for path in parsed_dir.glob(f"{file_stub}_*.csv") 521 if path != combined_csv_path 522 ) 523 if direct_csv_path.exists() and prefixed_paths: 524 return sorted([direct_csv_path] + prefixed_paths) 525 if prefixed_paths: 526 return prefixed_paths 527 if direct_csv_path.exists(): 528 return [direct_csv_path] 529 530 # Final fallback: delegate to single-path resolver. 531 return [self._resolve_artifact_csv_path(artifact_key)] 532 533 def _combine_csv_files(self, artifact_key: str, csv_paths: list[Path]) -> Path: 534 """Concatenate multiple CSV files into a single combined CSV. 535 536 All input files are assumed to share the same schema (column names). 537 The combined file is written to the case's ``parsed/`` directory (or 538 next to the first input file) with a ``_combined`` suffix. 539 540 Args: 541 artifact_key: Artifact identifier (used for the output filename). 542 csv_paths: List of CSV file paths to combine. 543 544 Returns: 545 Path to the combined CSV file. 546 """ 547 import csv as csv_mod 548 549 output_dir = csv_paths[0].parent 550 safe_key = sanitize_filename(artifact_key) 551 combined_path = output_dir / f"{safe_key}_combined.csv" 552 553 fieldnames: list[str] = [] 554 fieldnames_set: set[str] = set() 555 556 for csv_path in csv_paths: 557 if not csv_path.exists(): 558 continue 559 with csv_path.open("r", newline="", encoding="utf-8") as fh: 560 reader = csv_mod.DictReader(fh) 561 if reader.fieldnames: 562 for fn in reader.fieldnames: 563 if fn not in fieldnames_set: 564 fieldnames.append(fn) 565 fieldnames_set.add(fn) 566 567 with combined_path.open("w", newline="", encoding="utf-8") as out: 568 writer = csv_mod.DictWriter(out, fieldnames=fieldnames, restval="", extrasaction="ignore") 569 writer.writeheader() 570 for csv_path in csv_paths: 571 if not csv_path.exists(): 572 continue 573 with csv_path.open("r", newline="", encoding="utf-8") as fh: 574 reader = csv_mod.DictReader(fh) 575 for row in reader: 576 writer.writerow(row) 577 578 return combined_path 579 580 def _set_analysis_input_csv_path(self, artifact_key: str, csv_path: Path) -> None: 581 """Store the analysis-input CSV path for an artifact. 582 583 Args: 584 artifact_key: Artifact identifier. 585 csv_path: Path to the analysis-input CSV. 586 """ 587 self._analysis_input_csv_paths[artifact_key] = csv_path 588 normalized = normalize_artifact_key(artifact_key) 589 self._analysis_input_csv_paths[normalized] = csv_path 590 591 def _resolve_analysis_input_csv_path(self, artifact_key: str, fallback: Path) -> Path: 592 """Retrieve the analysis-input CSV path, with fallback. 593 594 Args: 595 artifact_key: Artifact identifier. 596 fallback: Default path if not stored. 597 598 Returns: 599 The stored analysis-input CSV path, or *fallback*. 600 """ 601 mapped = self._analysis_input_csv_paths.get(artifact_key) 602 if mapped is not None: 603 return mapped 604 normalized = normalize_artifact_key(artifact_key) 605 mapped = self._analysis_input_csv_paths.get(normalized) 606 if mapped is not None: 607 return mapped 608 return fallback 609 610 def _resolve_artifact_metadata(self, artifact_key: str) -> dict[str, str]: 611 """Look up artifact metadata from the OS-appropriate registry first. 612 613 Searches the registry matching :attr:`os_type` first so that 614 shared keys like ``services`` resolve to the correct OS-specific 615 entry. Falls back to the other registry for cross-OS lookups. 616 617 Args: 618 artifact_key: Artifact identifier. 619 620 Returns: 621 A dict with at least ``name``, ``description``, and 622 ``analysis_hint`` keys. 623 """ 624 if self.os_type == "linux": 625 registries = (LINUX_ARTIFACT_REGISTRY, WINDOWS_ARTIFACT_REGISTRY) 626 else: 627 registries = (WINDOWS_ARTIFACT_REGISTRY, LINUX_ARTIFACT_REGISTRY) 628 629 for registry in registries: 630 if artifact_key in registry: 631 metadata = registry[artifact_key] 632 return {str(key): str(value) for key, value in metadata.items()} 633 634 normalized = normalize_artifact_key(artifact_key) 635 for registry in registries: 636 if normalized in registry: 637 metadata = registry[normalized] 638 return {str(key): str(value) for key, value in metadata.items()} 639 640 return { 641 "name": artifact_key, 642 "description": "No artifact description available.", 643 "analysis_hint": "No specific analysis guidance is available for this artifact.", 644 } 645 646 # ------------------------------------------------------------------ 647 # Metadata registration 648 # ------------------------------------------------------------------ 649 650 def _register_artifact_paths_from_metadata(self, metadata: Mapping[str, Any] | None) -> None: 651 """Extract and register artifact CSV paths from run metadata. 652 653 Args: 654 metadata: Optional metadata mapping. 655 """ 656 if not isinstance(metadata, Mapping): 657 return 658 659 artifact_csv_paths = metadata.get("artifact_csv_paths") 660 if isinstance(artifact_csv_paths, Mapping): 661 for artifact_key, csv_path in artifact_csv_paths.items(): 662 if isinstance(csv_path, list) and len(csv_path) > 1: 663 self.artifact_csv_paths[str(artifact_key)] = [ 664 Path(str(p)) for p in csv_path 665 ] 666 elif isinstance(csv_path, list) and csv_path: 667 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path[0])) 668 else: 669 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 670 671 for container_key in ("artifacts", "artifact_results", "parse_results", "parsed_artifacts"): 672 container = metadata.get(container_key) 673 if isinstance(container, Mapping): 674 for artifact_key, value in container.items(): 675 self._register_artifact_path_entry(artifact_key=artifact_key, value=value) 676 elif isinstance(container, list): 677 for item in container: 678 if isinstance(item, Mapping): 679 artifact_key = item.get("artifact_key") or item.get("key") 680 if artifact_key: 681 self._register_artifact_path_entry(artifact_key=str(artifact_key), value=item) 682 683 def _register_artifact_path_entry(self, artifact_key: Any, value: Any) -> None: 684 """Register a single artifact CSV path from a metadata entry. 685 686 Args: 687 artifact_key: Artifact identifier. 688 value: Metadata entry (mapping, string, or Path). 689 """ 690 if artifact_key in (None, ""): 691 return 692 693 if isinstance(value, Mapping): 694 csv_path = value.get("csv_path") 695 csv_paths = value.get("csv_paths") 696 if isinstance(csv_paths, list) and len(csv_paths) > 1: 697 self.artifact_csv_paths[str(artifact_key)] = [ 698 Path(str(p)) for p in csv_paths 699 ] 700 return 701 if csv_path: 702 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_path)) 703 return 704 if isinstance(csv_paths, list) and csv_paths: 705 self.artifact_csv_paths[str(artifact_key)] = Path(str(csv_paths[0])) 706 return 707 708 if isinstance(value, (str, Path)): 709 self.artifact_csv_paths[str(artifact_key)] = Path(str(value)) 710 711 # ------------------------------------------------------------------ 712 # Core analysis pipeline 713 # ------------------------------------------------------------------ 714 715 def _prepare_artifact_data( 716 self, artifact_key: str, investigation_context: str, csv_path: Path | None = None, 717 ) -> str: 718 """Prepare one artifact CSV as a bounded, analysis-ready prompt. 719 720 Args: 721 artifact_key: Unique identifier for the artifact. 722 investigation_context: Free-text investigation context. 723 csv_path: Explicit path to the artifact CSV. 724 725 Returns: 726 The fully rendered prompt string. 727 728 Raises: 729 FileNotFoundError: If the artifact CSV cannot be located. 730 """ 731 resolved_csv_path = csv_path if csv_path is not None else self._resolve_artifact_csv_path(artifact_key) 732 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 733 734 prompt_text, analysis_csv_path, _ = prepare_artifact_data( 735 artifact_key=artifact_key, 736 investigation_context=investigation_context, 737 csv_path=resolved_csv_path, 738 artifact_metadata=artifact_metadata, 739 artifact_prompt_template=self.artifact_prompt_template, 740 artifact_prompt_template_small_context=self.artifact_prompt_template_small_context, 741 artifact_instruction_prompts=self.artifact_instruction_prompts, 742 artifact_ai_column_projections=self.artifact_ai_column_projections, 743 artifact_deduplication_enabled=self.artifact_deduplication_enabled, 744 ai_max_tokens=self.ai_max_tokens, 745 shortened_prompt_cutoff_tokens=self.shortened_prompt_cutoff_tokens, 746 case_dir=self.case_dir, 747 audit_log_fn=self._audit_log, 748 ) 749 self._set_analysis_input_csv_path(artifact_key=artifact_key, csv_path=analysis_csv_path) 750 return prompt_text 751 752 def analyze_artifact( 753 self, 754 artifact_key: str, 755 investigation_context: str, 756 progress_callback: Any | None = None, 757 ) -> dict[str, Any]: 758 """Analyze a single artifact's CSV data and return AI findings. 759 760 Args: 761 artifact_key: Unique identifier for the artifact. 762 investigation_context: Free-text investigation context. 763 progress_callback: Optional callable for streaming progress. 764 765 Returns: 766 A dict with ``artifact_key``, ``artifact_name``, ``analysis``, 767 ``model``, and optionally ``citation_warnings``. 768 """ 769 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 770 artifact_name = artifact_metadata.get("name", artifact_key) 771 model = self.model_info.get("model", "unknown") 772 provider = self.model_info.get("provider", "unknown") 773 774 self._audit_log("analysis_started", { 775 "artifact_key": artifact_key, "artifact_name": artifact_name, 776 "provider": provider, "model": model, 777 }) 778 779 start_time = perf_counter() 780 try: 781 all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key) 782 if len(all_csv_paths) > 1: 783 csv_path = self._combine_csv_files(artifact_key, all_csv_paths) 784 else: 785 csv_path = all_csv_paths[0] 786 artifact_prompt = self._prepare_artifact_data( 787 artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path, 788 ) 789 analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path) 790 attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)] 791 792 safe_key = sanitize_filename(artifact_key) 793 self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt) 794 795 prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt) 796 if prompt_tokens_estimate > self.ai_max_tokens: 797 self.logger.info( 798 "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.", 799 artifact_key, prompt_tokens_estimate, self.ai_max_tokens, 800 ) 801 if progress_callback is not None: 802 emit_analysis_progress(progress_callback, artifact_key, "started", { 803 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 804 }) 805 analysis_text = analyze_artifact_chunked( 806 artifact_prompt=artifact_prompt, 807 artifact_key=artifact_key, 808 artifact_name=artifact_name, 809 investigation_context=investigation_context, 810 model=model, 811 system_prompt=self.system_prompt, 812 ai_response_max_tokens=self.ai_response_max_tokens, 813 chunk_csv_budget=self.chunk_csv_budget, 814 chunk_merge_prompt_template=self.chunk_merge_prompt_template, 815 max_merge_rounds=self.max_merge_rounds, 816 call_ai_with_retry_fn=self._call_ai_with_retry, 817 ai_provider=self.ai_provider, 818 audit_log_fn=self._audit_log, 819 save_case_prompt_fn=self._save_case_prompt, 820 progress_callback=progress_callback, 821 ) 822 duration_seconds = perf_counter() - start_time 823 self._audit_log("analysis_completed", { 824 "artifact_key": artifact_key, "artifact_name": artifact_name, 825 "token_count": self._estimate_tokens(analysis_text), 826 "duration_seconds": round(duration_seconds, 6), 827 "status": "success", "chunked": True, 828 }) 829 citation_warnings = self._validate_citations(artifact_key, analysis_text) 830 result: dict[str, Any] = { 831 "artifact_key": artifact_key, "artifact_name": artifact_name, 832 "analysis": analysis_text, "model": model, 833 } 834 if citation_warnings: 835 result["citation_warnings"] = citation_warnings 836 return result 837 838 analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None) 839 if callable(analyze_with_progress) and progress_callback is not None: 840 emit_analysis_progress(progress_callback, artifact_key, "started", { 841 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 842 }) 843 844 def _provider_progress(payload: Mapping[str, Any]) -> None: 845 """Forward provider progress to the frontend.""" 846 if not isinstance(payload, Mapping): 847 return 848 emit_analysis_progress(progress_callback, artifact_key, "thinking", { 849 "artifact_key": artifact_key, "artifact_name": artifact_name, 850 "thinking_text": str(payload.get("thinking_text", "")), 851 "partial_text": str(payload.get("partial_text", "")), 852 "model": model, 853 }) 854 855 try: 856 analysis_text = analyze_with_progress( 857 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 858 progress_callback=_provider_progress, attachments=attachments, 859 max_tokens=self.ai_response_max_tokens, 860 ) 861 except TypeError: 862 analysis_text = analyze_with_progress( 863 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 864 progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens, 865 ) 866 else: 867 if progress_callback is not None: 868 emit_analysis_progress(progress_callback, artifact_key, "started", { 869 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 870 }) 871 analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None) 872 if callable(analyze_with_attachments): 873 analysis_text = self._call_ai_with_retry( 874 lambda: analyze_with_attachments( 875 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 876 attachments=attachments, max_tokens=self.ai_response_max_tokens, 877 ) 878 ) 879 else: 880 analysis_text = self._call_ai_with_retry( 881 lambda: self.ai_provider.analyze( 882 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 883 max_tokens=self.ai_response_max_tokens, 884 ) 885 ) 886 duration_seconds = perf_counter() - start_time 887 self._audit_log("analysis_completed", { 888 "artifact_key": artifact_key, "artifact_name": artifact_name, 889 "token_count": self._estimate_tokens(analysis_text), 890 "duration_seconds": round(duration_seconds, 6), "status": "success", 891 }) 892 except Exception as error: 893 duration_seconds = perf_counter() - start_time 894 analysis_text = f"Analysis failed: {error}" 895 self._audit_log("analysis_completed", { 896 "artifact_key": artifact_key, "artifact_name": artifact_name, 897 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 898 "status": "failed", "error": str(error), 899 }) 900 901 citation_warnings = self._validate_citations(artifact_key, analysis_text) 902 903 result = { 904 "artifact_key": artifact_key, "artifact_name": artifact_name, 905 "analysis": analysis_text, "model": model, 906 } 907 if citation_warnings: 908 result["citation_warnings"] = citation_warnings 909 return result 910 911 def generate_summary( 912 self, 913 per_artifact_results: list[Mapping[str, Any]], 914 investigation_context: str, 915 metadata: Mapping[str, Any] | None, 916 ) -> str: 917 """Generate a cross-artifact summary by correlating findings. 918 919 Args: 920 per_artifact_results: List of per-artifact result dicts. 921 investigation_context: The user's investigation context. 922 metadata: Optional host metadata mapping. 923 924 Returns: 925 The AI-generated summary text, or an error message. 926 """ 927 metadata_map = metadata if isinstance(metadata, Mapping) else {} 928 summary_prompt = build_summary_prompt( 929 summary_prompt_template=self.summary_prompt_template, 930 investigation_context=investigation_context, 931 per_artifact_results=per_artifact_results, 932 metadata_map=metadata_map, 933 ) 934 935 model = self.model_info.get("model", "unknown") 936 provider = self.model_info.get("provider", "unknown") 937 summary_artifact_key = "cross_artifact_summary" 938 summary_artifact_name = "Cross-Artifact Summary" 939 summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md" 940 941 self._audit_log("analysis_started", { 942 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 943 "provider": provider, "model": model, 944 }) 945 self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt) 946 947 start_time = perf_counter() 948 try: 949 summary = self._call_ai_with_retry( 950 lambda: self.ai_provider.analyze( 951 system_prompt=self.system_prompt, user_prompt=summary_prompt, 952 max_tokens=self.ai_response_max_tokens, 953 ) 954 ) 955 duration_seconds = perf_counter() - start_time 956 self._audit_log("analysis_completed", { 957 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 958 "token_count": self._estimate_tokens(summary), 959 "duration_seconds": round(duration_seconds, 6), "status": "success", 960 }) 961 return summary 962 except Exception as error: 963 duration_seconds = perf_counter() - start_time 964 summary = f"Analysis failed: {error}" 965 self._audit_log("analysis_completed", { 966 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 967 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 968 "status": "failed", "error": str(error), 969 }) 970 return summary 971 972 def run_full_analysis( 973 self, 974 artifact_keys: Iterable[str], 975 investigation_context: str, 976 metadata: Mapping[str, Any] | None, 977 progress_callback: Any | None = None, 978 cancel_check: Any | None = None, 979 ) -> dict[str, Any]: 980 """Run the complete analysis pipeline: per-artifact then summary. 981 982 Args: 983 artifact_keys: Iterable of artifact key strings. 984 investigation_context: The user's investigation context. 985 metadata: Optional metadata mapping. 986 progress_callback: Optional callable for streaming progress. 987 cancel_check: Optional callable returning ``True`` when the 988 analysis should be aborted early. 989 990 Returns: 991 A dict with ``per_artifact``, ``summary``, and ``model_info``. 992 993 Raises: 994 AnalysisCancelledError: If *cancel_check* returns ``True``. 995 """ 996 if isinstance(self.ai_provider, UnavailableProvider): 997 raise AIProviderError(self.ai_provider._error_message) 998 999 self._register_artifact_paths_from_metadata(metadata) 1000 per_artifact_results: list[dict[str, Any]] = [] 1001 for artifact_key in artifact_keys: 1002 if cancel_check is not None and cancel_check(): 1003 raise AnalysisCancelledError("Analysis cancelled by user.") 1004 result = self.analyze_artifact( 1005 artifact_key=str(artifact_key), 1006 investigation_context=investigation_context, 1007 progress_callback=progress_callback, 1008 ) 1009 per_artifact_results.append(result) 1010 if progress_callback is not None: 1011 emit_analysis_progress(progress_callback, str(artifact_key), "complete", result) 1012 1013 summary = self.generate_summary( 1014 per_artifact_results=per_artifact_results, 1015 investigation_context=investigation_context, 1016 metadata=metadata, 1017 ) 1018 return { 1019 "per_artifact": per_artifact_results, 1020 "summary": summary, 1021 "model_info": dict(self.model_info), 1022 }
Orchestrates AI-powered forensic analysis of parsed artifact CSV data.
Central analysis engine for AIFT: reads parsed artifact CSV files, applies column projection, and deduplication, builds token-budgeted prompts, sends them to a configured AI provider, and validates citations.
Attributes:
- case_dir: Path to the case directory, or
None. - config: Merged configuration dictionary.
- ai_provider: The configured AI provider instance.
- model_info: Dict with
providerandmodelkeys.
107 def __init__( 108 self, 109 case_dir: str | Path | Mapping[str, str | Path] | None = None, 110 config: Mapping[str, Any] | None = None, 111 audit_logger: Any | None = None, 112 artifact_csv_paths: Mapping[str, str | Path] | None = None, 113 prompts_dir: str | Path | None = None, 114 random_seed: int | None = None, 115 os_type: str = "windows", 116 ) -> None: 117 """Initialize the forensic analyzer with case context and configuration. 118 119 Args: 120 case_dir: Path to the case directory, or a mapping of artifact 121 keys to CSV paths (convenience shorthand). 122 config: Application configuration dictionary. 123 audit_logger: Optional object with a ``log(action, details)`` 124 method. 125 artifact_csv_paths: Mapping of artifact keys to CSV paths. 126 prompts_dir: Directory containing prompt template files. 127 random_seed: Optional seed for the internal RNG. 128 os_type: Detected operating system type (``"windows"``, 129 ``"linux"``, etc.). Controls which artifact instruction 130 prompts are loaded. 131 """ 132 if ( 133 isinstance(case_dir, Mapping) 134 and config is None 135 and audit_logger is None 136 and artifact_csv_paths is None 137 ): 138 artifact_csv_paths = case_dir 139 case_dir = None 140 141 self.case_dir = Path(case_dir) if case_dir is not None and not isinstance(case_dir, Mapping) else None 142 self.logger = LOGGER 143 self.config = dict(config) if isinstance(config, Mapping) else {} 144 self.audit_logger = audit_logger 145 self.artifact_csv_paths: dict[str, Path | list[Path]] = {} 146 for artifact_key, csv_path in (artifact_csv_paths or {}).items(): 147 key = str(artifact_key) 148 if isinstance(csv_path, list): 149 self.artifact_csv_paths[key] = [Path(str(p)) for p in csv_path] 150 else: 151 self.artifact_csv_paths[key] = Path(str(csv_path)) 152 self._analysis_input_csv_paths: dict[str, Path] = {} 153 self.prompts_dir = Path(prompts_dir) if prompts_dir is not None else PROJECT_ROOT / "prompts" 154 self.os_type = normalize_os_type(os_type) 155 import random 156 self._random = random.Random(random_seed) 157 self._load_analysis_settings() 158 self.artifact_ai_column_projections = self._load_artifact_ai_column_projections() 159 self.system_prompt = self._load_prompt_template("system_prompt.md", default=DEFAULT_SYSTEM_PROMPT) 160 self.artifact_prompt_template = self._load_prompt_template( 161 "artifact_analysis.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE, 162 ) 163 self.artifact_prompt_template_small_context = self._load_prompt_template( 164 "artifact_analysis_small_context.md", default=DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT, 165 ) 166 self.artifact_instruction_prompts = self._load_artifact_instruction_prompts() 167 self.summary_prompt_template = self._load_prompt_template( 168 "summary_prompt.md", default=DEFAULT_SUMMARY_PROMPT_TEMPLATE, 169 ) 170 self.chunk_merge_prompt_template = self._load_prompt_template( 171 "chunk_merge.md", default=DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE, 172 ) 173 self.ai_provider = self._create_ai_provider() 174 self.model_info = self._read_model_info()
Initialize the forensic analyzer with case context and configuration.
Arguments:
- case_dir: Path to the case directory, or a mapping of artifact keys to CSV paths (convenience shorthand).
- config: Application configuration dictionary.
- audit_logger: Optional object with a
log(action, details)method. - artifact_csv_paths: Mapping of artifact keys to CSV paths.
- prompts_dir: Directory containing prompt template files.
- random_seed: Optional seed for the internal RNG.
- os_type: Detected operating system type (
"windows","linux", etc.). Controls which artifact instruction prompts are loaded.
752 def analyze_artifact( 753 self, 754 artifact_key: str, 755 investigation_context: str, 756 progress_callback: Any | None = None, 757 ) -> dict[str, Any]: 758 """Analyze a single artifact's CSV data and return AI findings. 759 760 Args: 761 artifact_key: Unique identifier for the artifact. 762 investigation_context: Free-text investigation context. 763 progress_callback: Optional callable for streaming progress. 764 765 Returns: 766 A dict with ``artifact_key``, ``artifact_name``, ``analysis``, 767 ``model``, and optionally ``citation_warnings``. 768 """ 769 artifact_metadata = self._resolve_artifact_metadata(artifact_key) 770 artifact_name = artifact_metadata.get("name", artifact_key) 771 model = self.model_info.get("model", "unknown") 772 provider = self.model_info.get("provider", "unknown") 773 774 self._audit_log("analysis_started", { 775 "artifact_key": artifact_key, "artifact_name": artifact_name, 776 "provider": provider, "model": model, 777 }) 778 779 start_time = perf_counter() 780 try: 781 all_csv_paths = self._resolve_all_artifact_csv_paths(artifact_key) 782 if len(all_csv_paths) > 1: 783 csv_path = self._combine_csv_files(artifact_key, all_csv_paths) 784 else: 785 csv_path = all_csv_paths[0] 786 artifact_prompt = self._prepare_artifact_data( 787 artifact_key=artifact_key, investigation_context=investigation_context, csv_path=csv_path, 788 ) 789 analysis_csv_path = self._resolve_analysis_input_csv_path(artifact_key=artifact_key, fallback=csv_path) 790 attachments = [build_artifact_csv_attachment(artifact_key=artifact_key, csv_path=analysis_csv_path)] 791 792 safe_key = sanitize_filename(artifact_key) 793 self._save_case_prompt(f"artifact_{safe_key}.md", self.system_prompt, artifact_prompt) 794 795 prompt_tokens_estimate = self._estimate_tokens(artifact_prompt) + self._estimate_tokens(self.system_prompt) 796 if prompt_tokens_estimate > self.ai_max_tokens: 797 self.logger.info( 798 "Prompt for %s (~%d tokens) exceeds ai_max_tokens (%d); using chunked analysis.", 799 artifact_key, prompt_tokens_estimate, self.ai_max_tokens, 800 ) 801 if progress_callback is not None: 802 emit_analysis_progress(progress_callback, artifact_key, "started", { 803 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 804 }) 805 analysis_text = analyze_artifact_chunked( 806 artifact_prompt=artifact_prompt, 807 artifact_key=artifact_key, 808 artifact_name=artifact_name, 809 investigation_context=investigation_context, 810 model=model, 811 system_prompt=self.system_prompt, 812 ai_response_max_tokens=self.ai_response_max_tokens, 813 chunk_csv_budget=self.chunk_csv_budget, 814 chunk_merge_prompt_template=self.chunk_merge_prompt_template, 815 max_merge_rounds=self.max_merge_rounds, 816 call_ai_with_retry_fn=self._call_ai_with_retry, 817 ai_provider=self.ai_provider, 818 audit_log_fn=self._audit_log, 819 save_case_prompt_fn=self._save_case_prompt, 820 progress_callback=progress_callback, 821 ) 822 duration_seconds = perf_counter() - start_time 823 self._audit_log("analysis_completed", { 824 "artifact_key": artifact_key, "artifact_name": artifact_name, 825 "token_count": self._estimate_tokens(analysis_text), 826 "duration_seconds": round(duration_seconds, 6), 827 "status": "success", "chunked": True, 828 }) 829 citation_warnings = self._validate_citations(artifact_key, analysis_text) 830 result: dict[str, Any] = { 831 "artifact_key": artifact_key, "artifact_name": artifact_name, 832 "analysis": analysis_text, "model": model, 833 } 834 if citation_warnings: 835 result["citation_warnings"] = citation_warnings 836 return result 837 838 analyze_with_progress = getattr(self.ai_provider, "analyze_with_progress", None) 839 if callable(analyze_with_progress) and progress_callback is not None: 840 emit_analysis_progress(progress_callback, artifact_key, "started", { 841 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 842 }) 843 844 def _provider_progress(payload: Mapping[str, Any]) -> None: 845 """Forward provider progress to the frontend.""" 846 if not isinstance(payload, Mapping): 847 return 848 emit_analysis_progress(progress_callback, artifact_key, "thinking", { 849 "artifact_key": artifact_key, "artifact_name": artifact_name, 850 "thinking_text": str(payload.get("thinking_text", "")), 851 "partial_text": str(payload.get("partial_text", "")), 852 "model": model, 853 }) 854 855 try: 856 analysis_text = analyze_with_progress( 857 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 858 progress_callback=_provider_progress, attachments=attachments, 859 max_tokens=self.ai_response_max_tokens, 860 ) 861 except TypeError: 862 analysis_text = analyze_with_progress( 863 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 864 progress_callback=_provider_progress, max_tokens=self.ai_response_max_tokens, 865 ) 866 else: 867 if progress_callback is not None: 868 emit_analysis_progress(progress_callback, artifact_key, "started", { 869 "artifact_key": artifact_key, "artifact_name": artifact_name, "model": model, 870 }) 871 analyze_with_attachments = getattr(self.ai_provider, "analyze_with_attachments", None) 872 if callable(analyze_with_attachments): 873 analysis_text = self._call_ai_with_retry( 874 lambda: analyze_with_attachments( 875 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 876 attachments=attachments, max_tokens=self.ai_response_max_tokens, 877 ) 878 ) 879 else: 880 analysis_text = self._call_ai_with_retry( 881 lambda: self.ai_provider.analyze( 882 system_prompt=self.system_prompt, user_prompt=artifact_prompt, 883 max_tokens=self.ai_response_max_tokens, 884 ) 885 ) 886 duration_seconds = perf_counter() - start_time 887 self._audit_log("analysis_completed", { 888 "artifact_key": artifact_key, "artifact_name": artifact_name, 889 "token_count": self._estimate_tokens(analysis_text), 890 "duration_seconds": round(duration_seconds, 6), "status": "success", 891 }) 892 except Exception as error: 893 duration_seconds = perf_counter() - start_time 894 analysis_text = f"Analysis failed: {error}" 895 self._audit_log("analysis_completed", { 896 "artifact_key": artifact_key, "artifact_name": artifact_name, 897 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 898 "status": "failed", "error": str(error), 899 }) 900 901 citation_warnings = self._validate_citations(artifact_key, analysis_text) 902 903 result = { 904 "artifact_key": artifact_key, "artifact_name": artifact_name, 905 "analysis": analysis_text, "model": model, 906 } 907 if citation_warnings: 908 result["citation_warnings"] = citation_warnings 909 return result
Analyze a single artifact's CSV data and return AI findings.
Arguments:
- artifact_key: Unique identifier for the artifact.
- investigation_context: Free-text investigation context.
- progress_callback: Optional callable for streaming progress.
Returns:
A dict with
artifact_key,artifact_name,analysis,model, and optionallycitation_warnings.
911 def generate_summary( 912 self, 913 per_artifact_results: list[Mapping[str, Any]], 914 investigation_context: str, 915 metadata: Mapping[str, Any] | None, 916 ) -> str: 917 """Generate a cross-artifact summary by correlating findings. 918 919 Args: 920 per_artifact_results: List of per-artifact result dicts. 921 investigation_context: The user's investigation context. 922 metadata: Optional host metadata mapping. 923 924 Returns: 925 The AI-generated summary text, or an error message. 926 """ 927 metadata_map = metadata if isinstance(metadata, Mapping) else {} 928 summary_prompt = build_summary_prompt( 929 summary_prompt_template=self.summary_prompt_template, 930 investigation_context=investigation_context, 931 per_artifact_results=per_artifact_results, 932 metadata_map=metadata_map, 933 ) 934 935 model = self.model_info.get("model", "unknown") 936 provider = self.model_info.get("provider", "unknown") 937 summary_artifact_key = "cross_artifact_summary" 938 summary_artifact_name = "Cross-Artifact Summary" 939 summary_prompt_filename = f"{sanitize_filename(summary_artifact_key)}.md" 940 941 self._audit_log("analysis_started", { 942 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 943 "provider": provider, "model": model, 944 }) 945 self._save_case_prompt(summary_prompt_filename, self.system_prompt, summary_prompt) 946 947 start_time = perf_counter() 948 try: 949 summary = self._call_ai_with_retry( 950 lambda: self.ai_provider.analyze( 951 system_prompt=self.system_prompt, user_prompt=summary_prompt, 952 max_tokens=self.ai_response_max_tokens, 953 ) 954 ) 955 duration_seconds = perf_counter() - start_time 956 self._audit_log("analysis_completed", { 957 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 958 "token_count": self._estimate_tokens(summary), 959 "duration_seconds": round(duration_seconds, 6), "status": "success", 960 }) 961 return summary 962 except Exception as error: 963 duration_seconds = perf_counter() - start_time 964 summary = f"Analysis failed: {error}" 965 self._audit_log("analysis_completed", { 966 "artifact_key": summary_artifact_key, "artifact_name": summary_artifact_name, 967 "token_count": 0, "duration_seconds": round(duration_seconds, 6), 968 "status": "failed", "error": str(error), 969 }) 970 return summary
Generate a cross-artifact summary by correlating findings.
Arguments:
- per_artifact_results: List of per-artifact result dicts.
- investigation_context: The user's investigation context.
- metadata: Optional host metadata mapping.
Returns:
The AI-generated summary text, or an error message.
972 def run_full_analysis( 973 self, 974 artifact_keys: Iterable[str], 975 investigation_context: str, 976 metadata: Mapping[str, Any] | None, 977 progress_callback: Any | None = None, 978 cancel_check: Any | None = None, 979 ) -> dict[str, Any]: 980 """Run the complete analysis pipeline: per-artifact then summary. 981 982 Args: 983 artifact_keys: Iterable of artifact key strings. 984 investigation_context: The user's investigation context. 985 metadata: Optional metadata mapping. 986 progress_callback: Optional callable for streaming progress. 987 cancel_check: Optional callable returning ``True`` when the 988 analysis should be aborted early. 989 990 Returns: 991 A dict with ``per_artifact``, ``summary``, and ``model_info``. 992 993 Raises: 994 AnalysisCancelledError: If *cancel_check* returns ``True``. 995 """ 996 if isinstance(self.ai_provider, UnavailableProvider): 997 raise AIProviderError(self.ai_provider._error_message) 998 999 self._register_artifact_paths_from_metadata(metadata) 1000 per_artifact_results: list[dict[str, Any]] = [] 1001 for artifact_key in artifact_keys: 1002 if cancel_check is not None and cancel_check(): 1003 raise AnalysisCancelledError("Analysis cancelled by user.") 1004 result = self.analyze_artifact( 1005 artifact_key=str(artifact_key), 1006 investigation_context=investigation_context, 1007 progress_callback=progress_callback, 1008 ) 1009 per_artifact_results.append(result) 1010 if progress_callback is not None: 1011 emit_analysis_progress(progress_callback, str(artifact_key), "complete", result) 1012 1013 summary = self.generate_summary( 1014 per_artifact_results=per_artifact_results, 1015 investigation_context=investigation_context, 1016 metadata=metadata, 1017 ) 1018 return { 1019 "per_artifact": per_artifact_results, 1020 "summary": summary, 1021 "model_info": dict(self.model_info), 1022 }
Run the complete analysis pipeline: per-artifact then summary.
Arguments:
- artifact_keys: Iterable of artifact key strings.
- investigation_context: The user's investigation context.
- metadata: Optional metadata mapping.
- progress_callback: Optional callable for streaming progress.
- cancel_check: Optional callable returning
Truewhen the analysis should be aborted early.
Returns:
A dict with
per_artifact,summary, andmodel_info.
Raises:
- AnalysisCancelledError: If cancel_check returns
True.