app.analyzer.data_prep
Data preparation pipeline for forensic artifact analysis.
Handles date extraction from investigation context, CSV reading, column projection, deduplication, statistics computation, and final prompt assembly for AI analysis.
1"""Data preparation pipeline for forensic artifact analysis. 2 3Handles date extraction from investigation context, CSV reading, 4column projection, deduplication, statistics computation, and 5final prompt assembly for AI analysis. 6""" 7 8from __future__ import annotations 9 10import csv 11import io 12import logging 13from collections import Counter 14from datetime import datetime 15from pathlib import Path 16from typing import Any, Mapping 17 18from .constants import ( 19 DEDUP_COMMENT_COLUMN, 20 DEDUPLICATED_PARSED_DIRNAME, 21 LOW_SIGNAL_VALUES, 22 METADATA_COLUMNS, 23) 24from .ioc import ( 25 build_artifact_final_context_reminder, 26 build_priority_directives, 27 format_ioc_targets, 28) 29from .utils import ( 30 format_datetime, 31 is_dedup_safe_identifier_column, 32 looks_like_timestamp_column, 33 normalize_artifact_key, 34 normalize_csv_row, 35 normalize_table_cell, 36 sanitize_filename, 37 stringify_value, 38 time_range_for_rows, 39) 40 41LOGGER = logging.getLogger(__name__) 42 43__all__ = [ 44 "prepare_artifact_data", 45 "write_analysis_input_csv", 46 "resolve_analysis_input_output_dir", 47 "build_artifact_csv_attachment", 48 "counter_normalize", 49 "compute_statistics", 50 "select_ai_columns", 51 "project_rows_for_analysis", 52 "deduplicate_rows_for_analysis", 53 "build_full_data_csv", 54] 55 56 57# --------------------------------------------------------------------------- 58# Statistics and normalisation 59# --------------------------------------------------------------------------- 60 61def counter_normalize(value: str) -> str: 62 """Normalize a cell value for frequency counting in statistics. 63 64 Args: 65 value: Raw cell value string. 66 67 Returns: 68 The normalized value, or empty string if low-signal. 69 """ 70 cleaned = normalize_table_cell(value=value, cell_limit=120) 71 if cleaned.lower() in LOW_SIGNAL_VALUES: 72 return "" 73 return cleaned 74 75 76def compute_statistics( 77 rows: list[dict[str, str]], 78 columns: list[str], 79) -> tuple[str, datetime | None, datetime | None]: 80 """Compute record count, time range, and top-20 frequent values per column. 81 82 Args: 83 rows: List of normalized CSV row dicts. 84 columns: Column names for frequency statistics. 85 86 Returns: 87 A 3-tuple of ``(statistics_text, min_time, max_time)``. 88 """ 89 total_records = len(rows) 90 min_time, max_time = time_range_for_rows(rows) 91 counters: dict[str, Counter[str]] = {column: Counter() for column in columns} 92 93 for row in rows: 94 for column in columns: 95 value = counter_normalize(row.get(column, "")) 96 if value: 97 counters[column][value] += 1 98 99 lines = [ 100 f"Record count: {total_records}", 101 f"Time range start: {format_datetime(min_time)}", 102 f"Time range end: {format_datetime(max_time)}", 103 ] 104 105 if columns: 106 lines.append("Top values (up to 20 per key column):") 107 for column in columns: 108 lines.append(f"- {column}:") 109 top_values = counters[column].most_common(20) 110 if not top_values: 111 lines.append(" (no non-empty values)") 112 continue 113 for value, count in top_values: 114 lines.append(f" {count}x {value}") 115 else: 116 lines.append("Top values: no key columns selected.") 117 118 return "\n".join(lines), min_time, max_time 119 120 121# --------------------------------------------------------------------------- 122# Column selection and projection 123# --------------------------------------------------------------------------- 124 125def select_ai_columns( 126 artifact_key: str, 127 available_columns: list[str], 128 column_projections: dict[str, tuple[str, ...]], 129 audit_log_fn: Any = None, 130) -> tuple[list[str], bool]: 131 """Select the subset of CSV columns to include in the AI prompt. 132 133 Args: 134 artifact_key: Artifact identifier for projection lookup. 135 available_columns: Column names present in the source CSV. 136 column_projections: Mapping of artifact keys to column tuples. 137 audit_log_fn: Optional callable for logging missing columns. 138 139 Returns: 140 A 2-tuple of ``(selected_columns, projection_applied)``. 141 """ 142 normalized_key = normalize_artifact_key(artifact_key) 143 configured_columns = column_projections.get(normalized_key) 144 if not configured_columns: 145 return list(available_columns), False 146 147 lookup = {column.strip().lower(): column for column in available_columns} 148 projected_columns: list[str] = [] 149 missing_columns: list[str] = [] 150 has_wildcard = False 151 for column_name in configured_columns: 152 if column_name.strip() == "*": 153 has_wildcard = True 154 continue 155 matched = lookup.get(column_name.strip().lower()) 156 if matched is not None: 157 projected_columns.append(matched) 158 else: 159 missing_columns.append(column_name) 160 161 # A trailing ``*`` means "pass through any remaining columns not already 162 # listed". This is used for artifacts with dynamic/variable fields (e.g. 163 # systemd service records whose fields vary per unit). 164 if has_wildcard: 165 already_selected = {c.strip().lower() for c in projected_columns} 166 for col in available_columns: 167 if col.strip().lower() not in already_selected: 168 projected_columns.append(col) 169 170 if missing_columns and audit_log_fn is not None: 171 audit_log_fn( 172 "artifact_ai_projection_warning", 173 {"artifact_key": artifact_key, "missing_columns": missing_columns, 174 "available_columns": available_columns}, 175 ) 176 177 if not projected_columns: 178 return list(available_columns), False 179 return projected_columns, True 180 181 182def project_rows_for_analysis( 183 rows: list[dict[str, str]], 184 columns: list[str], 185) -> list[dict[str, str]]: 186 """Project rows to only the selected columns for analysis. 187 188 Args: 189 rows: List of normalized row dicts. 190 columns: Column names to retain. 191 192 Returns: 193 A new list of row dicts with only the specified columns 194 and ``_row_ref``. 195 """ 196 projected_rows: list[dict[str, str]] = [] 197 for row in rows: 198 projected: dict[str, str] = { 199 column: stringify_value(row.get(column, "")) 200 for column in columns 201 } 202 row_ref = stringify_value(row.get("_row_ref", "")) 203 if row_ref: 204 projected["_row_ref"] = row_ref 205 projected_rows.append(projected) 206 return projected_rows 207 208 209# --------------------------------------------------------------------------- 210# Deduplication 211# --------------------------------------------------------------------------- 212 213def deduplicate_rows_for_analysis( 214 rows: list[dict[str, str]], 215 columns: list[str], 216) -> tuple[list[dict[str, str]], list[str], int, int, list[str]]: 217 """Deduplicate rows that differ only in timestamp or record-ID columns. 218 219 Args: 220 rows: List of projected row dicts. 221 columns: Column names present in the rows. 222 223 Returns: 224 A 5-tuple of ``(kept_rows, output_columns, removed_count, 225 annotated_count, variant_columns)``. 226 """ 227 if not rows or not columns: 228 return list(rows), list(columns), 0, 0, [] 229 230 variant_columns = [ 231 column for column in columns 232 if looks_like_timestamp_column(column) or is_dedup_safe_identifier_column(column) 233 ] 234 if not variant_columns: 235 return [dict(row) for row in rows], list(columns), 0, 0, [] 236 237 variant_set = set(variant_columns) 238 base_columns = [ 239 column for column in columns 240 if column not in variant_set and column.lower() not in METADATA_COLUMNS and column != DEDUP_COMMENT_COLUMN 241 ] 242 if not base_columns: 243 return [dict(row) for row in rows], list(columns), 0, 0, variant_columns 244 245 kept_rows: list[dict[str, str]] = [] 246 representative_by_key: dict[tuple[tuple[str, str], ...], int] = {} 247 dedup_counts: Counter[int] = Counter() 248 249 for row in rows: 250 normalized_row = {str(key): stringify_value(value) for key, value in row.items()} 251 key = tuple((column, normalized_row.get(column, "")) for column in base_columns) 252 representative_index = representative_by_key.get(key) 253 if representative_index is None: 254 representative_by_key[key] = len(kept_rows) 255 kept_rows.append(normalized_row) 256 continue 257 dedup_counts[representative_index] += 1 258 259 annotated_rows = 0 260 output_columns = list(columns) 261 if dedup_counts: 262 if DEDUP_COMMENT_COLUMN not in output_columns: 263 output_columns.append(DEDUP_COMMENT_COLUMN) 264 for representative_index, dedup_count in dedup_counts.items(): 265 kept_rows[representative_index][DEDUP_COMMENT_COLUMN] = ( 266 f"Deduplicated {dedup_count} records with matching event data and different timestamp/ID." 267 ) 268 annotated_rows += 1 269 270 removed_rows = sum(dedup_counts.values()) 271 return kept_rows, output_columns, removed_rows, annotated_rows, variant_columns 272 273 274# --------------------------------------------------------------------------- 275# CSV serialisation 276# --------------------------------------------------------------------------- 277 278def build_full_data_csv( 279 rows: list[dict[str, str]], 280 columns: list[str], 281) -> str: 282 """Serialize rows to inline CSV text for prompt inclusion. 283 284 Args: 285 rows: List of row dicts to serialize. 286 columns: Column names for the CSV header. 287 288 Returns: 289 A CSV-formatted string with a ``row_ref`` column prepended. 290 """ 291 if not columns: 292 return "No columns available." 293 294 buffer = io.StringIO(newline="") 295 writer = csv.writer(buffer) 296 writer.writerow(["row_ref", *columns]) 297 for row in rows: 298 writer.writerow([row.get("_row_ref", ""), *[row.get(column, "") for column in columns]]) 299 300 full_csv = buffer.getvalue().strip() 301 if not full_csv: 302 return "No rows available for analysis." 303 return full_csv 304 305 306# --------------------------------------------------------------------------- 307# Analysis-input CSV output 308# --------------------------------------------------------------------------- 309 310def resolve_analysis_input_output_dir(case_dir: Path | None, source_csv_path: Path) -> Path: 311 """Determine the output directory for deduplicated/projected CSV files. 312 313 Args: 314 case_dir: Optional case directory path. 315 source_csv_path: Path to the original parsed CSV file. 316 317 Returns: 318 A ``Path`` to the output directory. 319 """ 320 if case_dir is not None: 321 return case_dir / DEDUPLICATED_PARSED_DIRNAME 322 parent = source_csv_path.parent 323 if parent.name.strip().lower() == "parsed": 324 return parent.parent / DEDUPLICATED_PARSED_DIRNAME 325 return parent / DEDUPLICATED_PARSED_DIRNAME 326 327 328def write_analysis_input_csv( 329 source_csv_path: Path, 330 rows: list[dict[str, str]], 331 columns: list[str], 332 case_dir: Path | None = None, 333) -> Path: 334 """Write deduplicated/projected rows to a new CSV file for audit. 335 336 Args: 337 source_csv_path: Path to the original parsed CSV. 338 rows: Row dicts to write. 339 columns: Column names for the CSV header. 340 case_dir: Optional case directory. 341 342 Returns: 343 Path to the newly written analysis-input CSV file. 344 345 Raises: 346 OSError: If the write fails. 347 """ 348 output_dir = resolve_analysis_input_output_dir(case_dir=case_dir, source_csv_path=source_csv_path) 349 output_dir.mkdir(parents=True, exist_ok=True) 350 output_path = output_dir / source_csv_path.name 351 352 write_columns = list(columns) 353 include_row_ref = "_row_ref" not in write_columns and any("_row_ref" in r for r in rows) 354 if include_row_ref: 355 write_columns = ["row_ref", *write_columns] 356 357 with output_path.open("w", newline="", encoding="utf-8") as handle: 358 writer = csv.DictWriter(handle, fieldnames=write_columns, extrasaction="ignore") 359 writer.writeheader() 360 for row in rows: 361 out: dict[str, str] = {column: row.get(column, "") for column in columns} 362 if include_row_ref: 363 out["row_ref"] = row.get("_row_ref", "") 364 writer.writerow(out) 365 366 return output_path 367 368 369def build_artifact_csv_attachment(artifact_key: str, csv_path: Path) -> dict[str, str]: 370 """Build an attachment descriptor dict for an artifact CSV file. 371 372 Args: 373 artifact_key: Artifact identifier. 374 csv_path: Path to the CSV file on disk. 375 376 Returns: 377 A dict with ``path``, ``name``, and ``mime_type`` keys. 378 """ 379 filename_stem = sanitize_filename(artifact_key) 380 filename = f"{filename_stem}.csv" if not filename_stem.lower().endswith(".csv") else filename_stem 381 return {"path": str(csv_path), "name": filename, "mime_type": "text/csv"} 382 383 384# --------------------------------------------------------------------------- 385# Full artifact data preparation 386# --------------------------------------------------------------------------- 387 388def prepare_artifact_data( 389 artifact_key: str, 390 investigation_context: str, 391 csv_path: Path, 392 *, 393 artifact_metadata: dict[str, str], 394 artifact_prompt_template: str, 395 artifact_prompt_template_small_context: str, 396 artifact_instruction_prompts: dict[str, str], 397 artifact_ai_column_projections: dict[str, tuple[str, ...]], 398 artifact_deduplication_enabled: bool, 399 ai_max_tokens: int, 400 shortened_prompt_cutoff_tokens: int, 401 case_dir: Path | None, 402 audit_log_fn: Any = None, 403) -> tuple[str, Path, list[str]]: 404 """Prepare one artifact CSV as a bounded, analysis-ready prompt. 405 406 Reads the full artifact CSV (all rows), applies column projection, 407 deduplication, and statistics computation. Fills the appropriate 408 prompt template with all gathered data. 409 410 Args: 411 artifact_key: Unique identifier for the artifact. 412 investigation_context: Free-text investigation context. 413 csv_path: Path to the artifact CSV file. 414 artifact_metadata: Metadata dict for the artifact. 415 artifact_prompt_template: Full prompt template. 416 artifact_prompt_template_small_context: Shortened prompt template. 417 artifact_instruction_prompts: Per-artifact instruction overrides. 418 artifact_ai_column_projections: Column projection config. 419 artifact_deduplication_enabled: Whether to deduplicate rows. 420 ai_max_tokens: Configured AI context window size. 421 shortened_prompt_cutoff_tokens: Token threshold for small template. 422 case_dir: Optional case directory path. 423 audit_log_fn: Optional callable ``(action, details)`` for audit. 424 425 Returns: 426 A 3-tuple of ``(prompt_text, analysis_csv_path, analysis_columns)``. 427 """ 428 include_statistics = ai_max_tokens >= shortened_prompt_cutoff_tokens 429 template = artifact_prompt_template if include_statistics else artifact_prompt_template_small_context 430 431 rows: list[dict[str, str]] = [] 432 source_row_count = 0 433 columns: list[str] = [] 434 435 with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as handle: 436 reader = csv.DictReader(handle) 437 columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")] 438 439 for source_row_count, raw_row in enumerate(reader, start=1): 440 row = normalize_csv_row(raw_row, columns=columns) 441 row["_row_ref"] = str(source_row_count) 442 rows.append(row) 443 444 analysis_columns, projection_applied = select_ai_columns( 445 artifact_key=artifact_key, available_columns=columns, 446 column_projections=artifact_ai_column_projections, audit_log_fn=audit_log_fn, 447 ) 448 analysis_rows = project_rows_for_analysis(rows=rows, columns=analysis_columns) 449 deduplicated_records = 0 450 dedup_annotated_rows = 0 451 dedup_variant_columns: list[str] = [] 452 analysis_csv_path = csv_path 453 dedup_write_error = "" 454 455 if artifact_deduplication_enabled: 456 analysis_rows, analysis_columns, deduplicated_records, dedup_annotated_rows, dedup_variant_columns = ( 457 deduplicate_rows_for_analysis(rows=analysis_rows, columns=analysis_columns) 458 ) 459 460 if projection_applied or artifact_deduplication_enabled: 461 try: 462 analysis_csv_path = write_analysis_input_csv( 463 source_csv_path=csv_path, rows=analysis_rows, columns=analysis_columns, case_dir=case_dir, 464 ) 465 except OSError as error: 466 analysis_csv_path = csv_path 467 dedup_write_error = str(error) 468 469 if projection_applied and audit_log_fn is not None: 470 projection_details: dict[str, Any] = { 471 "artifact_key": artifact_key, "source_csv": str(csv_path), 472 "analysis_csv": str(analysis_csv_path), "projection_columns": list(analysis_columns), 473 } 474 if dedup_write_error and not artifact_deduplication_enabled: 475 projection_details["write_error"] = dedup_write_error 476 audit_log_fn("artifact_ai_projection", projection_details) 477 478 if artifact_deduplication_enabled and audit_log_fn is not None: 479 dedup_audit_details: dict[str, Any] = { 480 "artifact_key": artifact_key, "source_csv": str(csv_path), 481 "analysis_csv": str(analysis_csv_path), "removed_records": deduplicated_records, 482 "annotated_rows": dedup_annotated_rows, "variant_columns": list(dedup_variant_columns), 483 } 484 if dedup_write_error: 485 dedup_audit_details["write_error"] = dedup_write_error 486 audit_log_fn("artifact_deduplicated", dedup_audit_details) 487 488 statistics = "" 489 if include_statistics: 490 statistics, min_time, max_time = compute_statistics(rows=analysis_rows, columns=analysis_columns) 491 stats_prefix: list[str] = [] 492 493 if artifact_deduplication_enabled: 494 dedup_details = [ 495 "Artifact deduplication enabled.", 496 f"Rows removed as timestamp/ID-only duplicates: {deduplicated_records}.", 497 f"Rows annotated with deduplication comment: {dedup_annotated_rows}.", 498 ] 499 if dedup_variant_columns: 500 dedup_details.append("Dedup variant columns: " + ", ".join(dedup_variant_columns) + ".") 501 stats_prefix.append("\n".join(dedup_details)) 502 503 if projection_applied: 504 stats_prefix.append("AI column projection applied: " + ", ".join(analysis_columns) + ".") 505 506 if stats_prefix: 507 statistics = "\n".join(stats_prefix) + "\n" + statistics 508 else: 509 min_time, max_time = time_range_for_rows(analysis_rows) 510 511 full_data_csv = build_full_data_csv(rows=analysis_rows, columns=analysis_columns) 512 priority_directives = build_priority_directives(investigation_context) 513 ioc_targets = format_ioc_targets(investigation_context) 514 artifact_guidance = _resolve_analysis_instructions( 515 artifact_key=artifact_key, artifact_metadata=artifact_metadata, 516 artifact_instruction_prompts=artifact_instruction_prompts, 517 ) 518 519 replacements = { 520 "priority_directives": priority_directives, 521 "investigation_context": investigation_context.strip() or "No investigation context provided.", 522 "ioc_targets": ioc_targets, 523 "artifact_key": artifact_key, 524 "artifact_name": artifact_metadata.get("name", artifact_key), 525 "artifact_description": artifact_metadata.get("description", "No artifact description available."), 526 "total_records": str(len(analysis_rows)), 527 "time_range_start": format_datetime(min_time), 528 "time_range_end": format_datetime(max_time), 529 "statistics": statistics, 530 "analysis_instructions": artifact_guidance, 531 "artifact_guidance": artifact_guidance, 532 "data_csv": full_data_csv, 533 } 534 535 filled = template 536 for placeholder, value in replacements.items(): 537 filled = filled.replace(f"{{{{{placeholder}}}}}", value) 538 539 final_context_reminder = build_artifact_final_context_reminder( 540 artifact_key=artifact_key, 541 artifact_name=artifact_metadata.get("name", artifact_key), 542 investigation_context=investigation_context, 543 ) 544 if final_context_reminder: 545 filled = f"{filled.rstrip()}\n\n{final_context_reminder}\n" 546 547 return filled, analysis_csv_path, analysis_columns 548 549 550def _resolve_analysis_instructions( 551 artifact_key: str, 552 artifact_metadata: Mapping[str, str], 553 artifact_instruction_prompts: dict[str, str], 554) -> str: 555 """Resolve artifact-specific analysis instructions for the AI prompt. 556 557 Args: 558 artifact_key: Artifact identifier. 559 artifact_metadata: Metadata dict for the artifact. 560 artifact_instruction_prompts: Per-artifact instruction overrides. 561 562 Returns: 563 The analysis instruction text. 564 """ 565 normalized_key = normalize_artifact_key(artifact_key) 566 # Try the raw key, the normalised key, and dot/underscore variants 567 # so that "ssh.authorized_keys" matches "ssh_authorized_keys.md". 568 candidates: list[str] = [artifact_key, normalized_key] 569 for variant in (artifact_key.replace(".", "_"), artifact_key.replace("_", ".")): 570 if variant not in candidates: 571 candidates.append(variant) 572 for key in candidates: 573 prompt = artifact_instruction_prompts.get(key.strip().lower(), "").strip() 574 if prompt: 575 return prompt 576 577 for field in ("artifact_guidance", "analysis_instructions", "analysis_hint"): 578 value = stringify_value(artifact_metadata.get(field, "")) 579 if value: 580 return value 581 582 return "No specific analysis instructions are available for this artifact."
389def prepare_artifact_data( 390 artifact_key: str, 391 investigation_context: str, 392 csv_path: Path, 393 *, 394 artifact_metadata: dict[str, str], 395 artifact_prompt_template: str, 396 artifact_prompt_template_small_context: str, 397 artifact_instruction_prompts: dict[str, str], 398 artifact_ai_column_projections: dict[str, tuple[str, ...]], 399 artifact_deduplication_enabled: bool, 400 ai_max_tokens: int, 401 shortened_prompt_cutoff_tokens: int, 402 case_dir: Path | None, 403 audit_log_fn: Any = None, 404) -> tuple[str, Path, list[str]]: 405 """Prepare one artifact CSV as a bounded, analysis-ready prompt. 406 407 Reads the full artifact CSV (all rows), applies column projection, 408 deduplication, and statistics computation. Fills the appropriate 409 prompt template with all gathered data. 410 411 Args: 412 artifact_key: Unique identifier for the artifact. 413 investigation_context: Free-text investigation context. 414 csv_path: Path to the artifact CSV file. 415 artifact_metadata: Metadata dict for the artifact. 416 artifact_prompt_template: Full prompt template. 417 artifact_prompt_template_small_context: Shortened prompt template. 418 artifact_instruction_prompts: Per-artifact instruction overrides. 419 artifact_ai_column_projections: Column projection config. 420 artifact_deduplication_enabled: Whether to deduplicate rows. 421 ai_max_tokens: Configured AI context window size. 422 shortened_prompt_cutoff_tokens: Token threshold for small template. 423 case_dir: Optional case directory path. 424 audit_log_fn: Optional callable ``(action, details)`` for audit. 425 426 Returns: 427 A 3-tuple of ``(prompt_text, analysis_csv_path, analysis_columns)``. 428 """ 429 include_statistics = ai_max_tokens >= shortened_prompt_cutoff_tokens 430 template = artifact_prompt_template if include_statistics else artifact_prompt_template_small_context 431 432 rows: list[dict[str, str]] = [] 433 source_row_count = 0 434 columns: list[str] = [] 435 436 with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as handle: 437 reader = csv.DictReader(handle) 438 columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")] 439 440 for source_row_count, raw_row in enumerate(reader, start=1): 441 row = normalize_csv_row(raw_row, columns=columns) 442 row["_row_ref"] = str(source_row_count) 443 rows.append(row) 444 445 analysis_columns, projection_applied = select_ai_columns( 446 artifact_key=artifact_key, available_columns=columns, 447 column_projections=artifact_ai_column_projections, audit_log_fn=audit_log_fn, 448 ) 449 analysis_rows = project_rows_for_analysis(rows=rows, columns=analysis_columns) 450 deduplicated_records = 0 451 dedup_annotated_rows = 0 452 dedup_variant_columns: list[str] = [] 453 analysis_csv_path = csv_path 454 dedup_write_error = "" 455 456 if artifact_deduplication_enabled: 457 analysis_rows, analysis_columns, deduplicated_records, dedup_annotated_rows, dedup_variant_columns = ( 458 deduplicate_rows_for_analysis(rows=analysis_rows, columns=analysis_columns) 459 ) 460 461 if projection_applied or artifact_deduplication_enabled: 462 try: 463 analysis_csv_path = write_analysis_input_csv( 464 source_csv_path=csv_path, rows=analysis_rows, columns=analysis_columns, case_dir=case_dir, 465 ) 466 except OSError as error: 467 analysis_csv_path = csv_path 468 dedup_write_error = str(error) 469 470 if projection_applied and audit_log_fn is not None: 471 projection_details: dict[str, Any] = { 472 "artifact_key": artifact_key, "source_csv": str(csv_path), 473 "analysis_csv": str(analysis_csv_path), "projection_columns": list(analysis_columns), 474 } 475 if dedup_write_error and not artifact_deduplication_enabled: 476 projection_details["write_error"] = dedup_write_error 477 audit_log_fn("artifact_ai_projection", projection_details) 478 479 if artifact_deduplication_enabled and audit_log_fn is not None: 480 dedup_audit_details: dict[str, Any] = { 481 "artifact_key": artifact_key, "source_csv": str(csv_path), 482 "analysis_csv": str(analysis_csv_path), "removed_records": deduplicated_records, 483 "annotated_rows": dedup_annotated_rows, "variant_columns": list(dedup_variant_columns), 484 } 485 if dedup_write_error: 486 dedup_audit_details["write_error"] = dedup_write_error 487 audit_log_fn("artifact_deduplicated", dedup_audit_details) 488 489 statistics = "" 490 if include_statistics: 491 statistics, min_time, max_time = compute_statistics(rows=analysis_rows, columns=analysis_columns) 492 stats_prefix: list[str] = [] 493 494 if artifact_deduplication_enabled: 495 dedup_details = [ 496 "Artifact deduplication enabled.", 497 f"Rows removed as timestamp/ID-only duplicates: {deduplicated_records}.", 498 f"Rows annotated with deduplication comment: {dedup_annotated_rows}.", 499 ] 500 if dedup_variant_columns: 501 dedup_details.append("Dedup variant columns: " + ", ".join(dedup_variant_columns) + ".") 502 stats_prefix.append("\n".join(dedup_details)) 503 504 if projection_applied: 505 stats_prefix.append("AI column projection applied: " + ", ".join(analysis_columns) + ".") 506 507 if stats_prefix: 508 statistics = "\n".join(stats_prefix) + "\n" + statistics 509 else: 510 min_time, max_time = time_range_for_rows(analysis_rows) 511 512 full_data_csv = build_full_data_csv(rows=analysis_rows, columns=analysis_columns) 513 priority_directives = build_priority_directives(investigation_context) 514 ioc_targets = format_ioc_targets(investigation_context) 515 artifact_guidance = _resolve_analysis_instructions( 516 artifact_key=artifact_key, artifact_metadata=artifact_metadata, 517 artifact_instruction_prompts=artifact_instruction_prompts, 518 ) 519 520 replacements = { 521 "priority_directives": priority_directives, 522 "investigation_context": investigation_context.strip() or "No investigation context provided.", 523 "ioc_targets": ioc_targets, 524 "artifact_key": artifact_key, 525 "artifact_name": artifact_metadata.get("name", artifact_key), 526 "artifact_description": artifact_metadata.get("description", "No artifact description available."), 527 "total_records": str(len(analysis_rows)), 528 "time_range_start": format_datetime(min_time), 529 "time_range_end": format_datetime(max_time), 530 "statistics": statistics, 531 "analysis_instructions": artifact_guidance, 532 "artifact_guidance": artifact_guidance, 533 "data_csv": full_data_csv, 534 } 535 536 filled = template 537 for placeholder, value in replacements.items(): 538 filled = filled.replace(f"{{{{{placeholder}}}}}", value) 539 540 final_context_reminder = build_artifact_final_context_reminder( 541 artifact_key=artifact_key, 542 artifact_name=artifact_metadata.get("name", artifact_key), 543 investigation_context=investigation_context, 544 ) 545 if final_context_reminder: 546 filled = f"{filled.rstrip()}\n\n{final_context_reminder}\n" 547 548 return filled, analysis_csv_path, analysis_columns
Prepare one artifact CSV as a bounded, analysis-ready prompt.
Reads the full artifact CSV (all rows), applies column projection, deduplication, and statistics computation. Fills the appropriate prompt template with all gathered data.
Arguments:
- artifact_key: Unique identifier for the artifact.
- investigation_context: Free-text investigation context.
- csv_path: Path to the artifact CSV file.
- artifact_metadata: Metadata dict for the artifact.
- artifact_prompt_template: Full prompt template.
- artifact_prompt_template_small_context: Shortened prompt template.
- artifact_instruction_prompts: Per-artifact instruction overrides.
- artifact_ai_column_projections: Column projection config.
- artifact_deduplication_enabled: Whether to deduplicate rows.
- ai_max_tokens: Configured AI context window size.
- shortened_prompt_cutoff_tokens: Token threshold for small template.
- case_dir: Optional case directory path.
- audit_log_fn: Optional callable
(action, details)for audit.
Returns:
A 3-tuple of
(prompt_text, analysis_csv_path, analysis_columns).
329def write_analysis_input_csv( 330 source_csv_path: Path, 331 rows: list[dict[str, str]], 332 columns: list[str], 333 case_dir: Path | None = None, 334) -> Path: 335 """Write deduplicated/projected rows to a new CSV file for audit. 336 337 Args: 338 source_csv_path: Path to the original parsed CSV. 339 rows: Row dicts to write. 340 columns: Column names for the CSV header. 341 case_dir: Optional case directory. 342 343 Returns: 344 Path to the newly written analysis-input CSV file. 345 346 Raises: 347 OSError: If the write fails. 348 """ 349 output_dir = resolve_analysis_input_output_dir(case_dir=case_dir, source_csv_path=source_csv_path) 350 output_dir.mkdir(parents=True, exist_ok=True) 351 output_path = output_dir / source_csv_path.name 352 353 write_columns = list(columns) 354 include_row_ref = "_row_ref" not in write_columns and any("_row_ref" in r for r in rows) 355 if include_row_ref: 356 write_columns = ["row_ref", *write_columns] 357 358 with output_path.open("w", newline="", encoding="utf-8") as handle: 359 writer = csv.DictWriter(handle, fieldnames=write_columns, extrasaction="ignore") 360 writer.writeheader() 361 for row in rows: 362 out: dict[str, str] = {column: row.get(column, "") for column in columns} 363 if include_row_ref: 364 out["row_ref"] = row.get("_row_ref", "") 365 writer.writerow(out) 366 367 return output_path
Write deduplicated/projected rows to a new CSV file for audit.
Arguments:
- source_csv_path: Path to the original parsed CSV.
- rows: Row dicts to write.
- columns: Column names for the CSV header.
- case_dir: Optional case directory.
Returns:
Path to the newly written analysis-input CSV file.
Raises:
- OSError: If the write fails.
311def resolve_analysis_input_output_dir(case_dir: Path | None, source_csv_path: Path) -> Path: 312 """Determine the output directory for deduplicated/projected CSV files. 313 314 Args: 315 case_dir: Optional case directory path. 316 source_csv_path: Path to the original parsed CSV file. 317 318 Returns: 319 A ``Path`` to the output directory. 320 """ 321 if case_dir is not None: 322 return case_dir / DEDUPLICATED_PARSED_DIRNAME 323 parent = source_csv_path.parent 324 if parent.name.strip().lower() == "parsed": 325 return parent.parent / DEDUPLICATED_PARSED_DIRNAME 326 return parent / DEDUPLICATED_PARSED_DIRNAME
Determine the output directory for deduplicated/projected CSV files.
Arguments:
- case_dir: Optional case directory path.
- source_csv_path: Path to the original parsed CSV file.
Returns:
A
Pathto the output directory.
370def build_artifact_csv_attachment(artifact_key: str, csv_path: Path) -> dict[str, str]: 371 """Build an attachment descriptor dict for an artifact CSV file. 372 373 Args: 374 artifact_key: Artifact identifier. 375 csv_path: Path to the CSV file on disk. 376 377 Returns: 378 A dict with ``path``, ``name``, and ``mime_type`` keys. 379 """ 380 filename_stem = sanitize_filename(artifact_key) 381 filename = f"{filename_stem}.csv" if not filename_stem.lower().endswith(".csv") else filename_stem 382 return {"path": str(csv_path), "name": filename, "mime_type": "text/csv"}
Build an attachment descriptor dict for an artifact CSV file.
Arguments:
- artifact_key: Artifact identifier.
- csv_path: Path to the CSV file on disk.
Returns:
A dict with
path,name, andmime_typekeys.
62def counter_normalize(value: str) -> str: 63 """Normalize a cell value for frequency counting in statistics. 64 65 Args: 66 value: Raw cell value string. 67 68 Returns: 69 The normalized value, or empty string if low-signal. 70 """ 71 cleaned = normalize_table_cell(value=value, cell_limit=120) 72 if cleaned.lower() in LOW_SIGNAL_VALUES: 73 return "" 74 return cleaned
Normalize a cell value for frequency counting in statistics.
Arguments:
- value: Raw cell value string.
Returns:
The normalized value, or empty string if low-signal.
77def compute_statistics( 78 rows: list[dict[str, str]], 79 columns: list[str], 80) -> tuple[str, datetime | None, datetime | None]: 81 """Compute record count, time range, and top-20 frequent values per column. 82 83 Args: 84 rows: List of normalized CSV row dicts. 85 columns: Column names for frequency statistics. 86 87 Returns: 88 A 3-tuple of ``(statistics_text, min_time, max_time)``. 89 """ 90 total_records = len(rows) 91 min_time, max_time = time_range_for_rows(rows) 92 counters: dict[str, Counter[str]] = {column: Counter() for column in columns} 93 94 for row in rows: 95 for column in columns: 96 value = counter_normalize(row.get(column, "")) 97 if value: 98 counters[column][value] += 1 99 100 lines = [ 101 f"Record count: {total_records}", 102 f"Time range start: {format_datetime(min_time)}", 103 f"Time range end: {format_datetime(max_time)}", 104 ] 105 106 if columns: 107 lines.append("Top values (up to 20 per key column):") 108 for column in columns: 109 lines.append(f"- {column}:") 110 top_values = counters[column].most_common(20) 111 if not top_values: 112 lines.append(" (no non-empty values)") 113 continue 114 for value, count in top_values: 115 lines.append(f" {count}x {value}") 116 else: 117 lines.append("Top values: no key columns selected.") 118 119 return "\n".join(lines), min_time, max_time
Compute record count, time range, and top-20 frequent values per column.
Arguments:
- rows: List of normalized CSV row dicts.
- columns: Column names for frequency statistics.
Returns:
A 3-tuple of
(statistics_text, min_time, max_time).
126def select_ai_columns( 127 artifact_key: str, 128 available_columns: list[str], 129 column_projections: dict[str, tuple[str, ...]], 130 audit_log_fn: Any = None, 131) -> tuple[list[str], bool]: 132 """Select the subset of CSV columns to include in the AI prompt. 133 134 Args: 135 artifact_key: Artifact identifier for projection lookup. 136 available_columns: Column names present in the source CSV. 137 column_projections: Mapping of artifact keys to column tuples. 138 audit_log_fn: Optional callable for logging missing columns. 139 140 Returns: 141 A 2-tuple of ``(selected_columns, projection_applied)``. 142 """ 143 normalized_key = normalize_artifact_key(artifact_key) 144 configured_columns = column_projections.get(normalized_key) 145 if not configured_columns: 146 return list(available_columns), False 147 148 lookup = {column.strip().lower(): column for column in available_columns} 149 projected_columns: list[str] = [] 150 missing_columns: list[str] = [] 151 has_wildcard = False 152 for column_name in configured_columns: 153 if column_name.strip() == "*": 154 has_wildcard = True 155 continue 156 matched = lookup.get(column_name.strip().lower()) 157 if matched is not None: 158 projected_columns.append(matched) 159 else: 160 missing_columns.append(column_name) 161 162 # A trailing ``*`` means "pass through any remaining columns not already 163 # listed". This is used for artifacts with dynamic/variable fields (e.g. 164 # systemd service records whose fields vary per unit). 165 if has_wildcard: 166 already_selected = {c.strip().lower() for c in projected_columns} 167 for col in available_columns: 168 if col.strip().lower() not in already_selected: 169 projected_columns.append(col) 170 171 if missing_columns and audit_log_fn is not None: 172 audit_log_fn( 173 "artifact_ai_projection_warning", 174 {"artifact_key": artifact_key, "missing_columns": missing_columns, 175 "available_columns": available_columns}, 176 ) 177 178 if not projected_columns: 179 return list(available_columns), False 180 return projected_columns, True
Select the subset of CSV columns to include in the AI prompt.
Arguments:
- artifact_key: Artifact identifier for projection lookup.
- available_columns: Column names present in the source CSV.
- column_projections: Mapping of artifact keys to column tuples.
- audit_log_fn: Optional callable for logging missing columns.
Returns:
A 2-tuple of
(selected_columns, projection_applied).
183def project_rows_for_analysis( 184 rows: list[dict[str, str]], 185 columns: list[str], 186) -> list[dict[str, str]]: 187 """Project rows to only the selected columns for analysis. 188 189 Args: 190 rows: List of normalized row dicts. 191 columns: Column names to retain. 192 193 Returns: 194 A new list of row dicts with only the specified columns 195 and ``_row_ref``. 196 """ 197 projected_rows: list[dict[str, str]] = [] 198 for row in rows: 199 projected: dict[str, str] = { 200 column: stringify_value(row.get(column, "")) 201 for column in columns 202 } 203 row_ref = stringify_value(row.get("_row_ref", "")) 204 if row_ref: 205 projected["_row_ref"] = row_ref 206 projected_rows.append(projected) 207 return projected_rows
Project rows to only the selected columns for analysis.
Arguments:
- rows: List of normalized row dicts.
- columns: Column names to retain.
Returns:
A new list of row dicts with only the specified columns and
_row_ref.
214def deduplicate_rows_for_analysis( 215 rows: list[dict[str, str]], 216 columns: list[str], 217) -> tuple[list[dict[str, str]], list[str], int, int, list[str]]: 218 """Deduplicate rows that differ only in timestamp or record-ID columns. 219 220 Args: 221 rows: List of projected row dicts. 222 columns: Column names present in the rows. 223 224 Returns: 225 A 5-tuple of ``(kept_rows, output_columns, removed_count, 226 annotated_count, variant_columns)``. 227 """ 228 if not rows or not columns: 229 return list(rows), list(columns), 0, 0, [] 230 231 variant_columns = [ 232 column for column in columns 233 if looks_like_timestamp_column(column) or is_dedup_safe_identifier_column(column) 234 ] 235 if not variant_columns: 236 return [dict(row) for row in rows], list(columns), 0, 0, [] 237 238 variant_set = set(variant_columns) 239 base_columns = [ 240 column for column in columns 241 if column not in variant_set and column.lower() not in METADATA_COLUMNS and column != DEDUP_COMMENT_COLUMN 242 ] 243 if not base_columns: 244 return [dict(row) for row in rows], list(columns), 0, 0, variant_columns 245 246 kept_rows: list[dict[str, str]] = [] 247 representative_by_key: dict[tuple[tuple[str, str], ...], int] = {} 248 dedup_counts: Counter[int] = Counter() 249 250 for row in rows: 251 normalized_row = {str(key): stringify_value(value) for key, value in row.items()} 252 key = tuple((column, normalized_row.get(column, "")) for column in base_columns) 253 representative_index = representative_by_key.get(key) 254 if representative_index is None: 255 representative_by_key[key] = len(kept_rows) 256 kept_rows.append(normalized_row) 257 continue 258 dedup_counts[representative_index] += 1 259 260 annotated_rows = 0 261 output_columns = list(columns) 262 if dedup_counts: 263 if DEDUP_COMMENT_COLUMN not in output_columns: 264 output_columns.append(DEDUP_COMMENT_COLUMN) 265 for representative_index, dedup_count in dedup_counts.items(): 266 kept_rows[representative_index][DEDUP_COMMENT_COLUMN] = ( 267 f"Deduplicated {dedup_count} records with matching event data and different timestamp/ID." 268 ) 269 annotated_rows += 1 270 271 removed_rows = sum(dedup_counts.values()) 272 return kept_rows, output_columns, removed_rows, annotated_rows, variant_columns
Deduplicate rows that differ only in timestamp or record-ID columns.
Arguments:
- rows: List of projected row dicts.
- columns: Column names present in the rows.
Returns:
A 5-tuple of
(kept_rows, output_columns, removed_count, annotated_count, variant_columns).
279def build_full_data_csv( 280 rows: list[dict[str, str]], 281 columns: list[str], 282) -> str: 283 """Serialize rows to inline CSV text for prompt inclusion. 284 285 Args: 286 rows: List of row dicts to serialize. 287 columns: Column names for the CSV header. 288 289 Returns: 290 A CSV-formatted string with a ``row_ref`` column prepended. 291 """ 292 if not columns: 293 return "No columns available." 294 295 buffer = io.StringIO(newline="") 296 writer = csv.writer(buffer) 297 writer.writerow(["row_ref", *columns]) 298 for row in rows: 299 writer.writerow([row.get("_row_ref", ""), *[row.get(column, "") for column in columns]]) 300 301 full_csv = buffer.getvalue().strip() 302 if not full_csv: 303 return "No rows available for analysis." 304 return full_csv
Serialize rows to inline CSV text for prompt inclusion.
Arguments:
- rows: List of row dicts to serialize.
- columns: Column names for the CSV header.
Returns:
A CSV-formatted string with a
row_refcolumn prepended.