app.analyzer.data_prep

Data preparation pipeline for forensic artifact analysis.

Handles date extraction from investigation context, CSV reading, column projection, deduplication, statistics computation, and final prompt assembly for AI analysis.

  1"""Data preparation pipeline for forensic artifact analysis.
  2
  3Handles date extraction from investigation context, CSV reading,
  4column projection, deduplication, statistics computation, and
  5final prompt assembly for AI analysis.
  6"""
  7
  8from __future__ import annotations
  9
 10import csv
 11import io
 12import logging
 13from collections import Counter
 14from datetime import datetime
 15from pathlib import Path
 16from typing import Any, Mapping
 17
 18from .constants import (
 19    DEDUP_COMMENT_COLUMN,
 20    DEDUPLICATED_PARSED_DIRNAME,
 21    LOW_SIGNAL_VALUES,
 22    METADATA_COLUMNS,
 23)
 24from .ioc import (
 25    build_artifact_final_context_reminder,
 26    build_priority_directives,
 27    format_ioc_targets,
 28)
 29from .utils import (
 30    format_datetime,
 31    is_dedup_safe_identifier_column,
 32    looks_like_timestamp_column,
 33    normalize_artifact_key,
 34    normalize_csv_row,
 35    normalize_table_cell,
 36    sanitize_filename,
 37    stringify_value,
 38    time_range_for_rows,
 39)
 40
 41LOGGER = logging.getLogger(__name__)
 42
 43__all__ = [
 44    "prepare_artifact_data",
 45    "write_analysis_input_csv",
 46    "resolve_analysis_input_output_dir",
 47    "build_artifact_csv_attachment",
 48    "counter_normalize",
 49    "compute_statistics",
 50    "select_ai_columns",
 51    "project_rows_for_analysis",
 52    "deduplicate_rows_for_analysis",
 53    "build_full_data_csv",
 54]
 55
 56
 57# ---------------------------------------------------------------------------
 58# Statistics and normalisation
 59# ---------------------------------------------------------------------------
 60
 61def counter_normalize(value: str) -> str:
 62    """Normalize a cell value for frequency counting in statistics.
 63
 64    Args:
 65        value: Raw cell value string.
 66
 67    Returns:
 68        The normalized value, or empty string if low-signal.
 69    """
 70    cleaned = normalize_table_cell(value=value, cell_limit=120)
 71    if cleaned.lower() in LOW_SIGNAL_VALUES:
 72        return ""
 73    return cleaned
 74
 75
 76def compute_statistics(
 77    rows: list[dict[str, str]],
 78    columns: list[str],
 79) -> tuple[str, datetime | None, datetime | None]:
 80    """Compute record count, time range, and top-20 frequent values per column.
 81
 82    Args:
 83        rows: List of normalized CSV row dicts.
 84        columns: Column names for frequency statistics.
 85
 86    Returns:
 87        A 3-tuple of ``(statistics_text, min_time, max_time)``.
 88    """
 89    total_records = len(rows)
 90    min_time, max_time = time_range_for_rows(rows)
 91    counters: dict[str, Counter[str]] = {column: Counter() for column in columns}
 92
 93    for row in rows:
 94        for column in columns:
 95            value = counter_normalize(row.get(column, ""))
 96            if value:
 97                counters[column][value] += 1
 98
 99    lines = [
100        f"Record count: {total_records}",
101        f"Time range start: {format_datetime(min_time)}",
102        f"Time range end: {format_datetime(max_time)}",
103    ]
104
105    if columns:
106        lines.append("Top values (up to 20 per key column):")
107        for column in columns:
108            lines.append(f"- {column}:")
109            top_values = counters[column].most_common(20)
110            if not top_values:
111                lines.append("  (no non-empty values)")
112                continue
113            for value, count in top_values:
114                lines.append(f"  {count}x {value}")
115    else:
116        lines.append("Top values: no key columns selected.")
117
118    return "\n".join(lines), min_time, max_time
119
120
121# ---------------------------------------------------------------------------
122# Column selection and projection
123# ---------------------------------------------------------------------------
124
125def select_ai_columns(
126    artifact_key: str,
127    available_columns: list[str],
128    column_projections: dict[str, tuple[str, ...]],
129    audit_log_fn: Any = None,
130) -> tuple[list[str], bool]:
131    """Select the subset of CSV columns to include in the AI prompt.
132
133    Args:
134        artifact_key: Artifact identifier for projection lookup.
135        available_columns: Column names present in the source CSV.
136        column_projections: Mapping of artifact keys to column tuples.
137        audit_log_fn: Optional callable for logging missing columns.
138
139    Returns:
140        A 2-tuple of ``(selected_columns, projection_applied)``.
141    """
142    normalized_key = normalize_artifact_key(artifact_key)
143    configured_columns = column_projections.get(normalized_key)
144    if not configured_columns:
145        return list(available_columns), False
146
147    lookup = {column.strip().lower(): column for column in available_columns}
148    projected_columns: list[str] = []
149    missing_columns: list[str] = []
150    has_wildcard = False
151    for column_name in configured_columns:
152        if column_name.strip() == "*":
153            has_wildcard = True
154            continue
155        matched = lookup.get(column_name.strip().lower())
156        if matched is not None:
157            projected_columns.append(matched)
158        else:
159            missing_columns.append(column_name)
160
161    # A trailing ``*`` means "pass through any remaining columns not already
162    # listed".  This is used for artifacts with dynamic/variable fields (e.g.
163    # systemd service records whose fields vary per unit).
164    if has_wildcard:
165        already_selected = {c.strip().lower() for c in projected_columns}
166        for col in available_columns:
167            if col.strip().lower() not in already_selected:
168                projected_columns.append(col)
169
170    if missing_columns and audit_log_fn is not None:
171        audit_log_fn(
172            "artifact_ai_projection_warning",
173            {"artifact_key": artifact_key, "missing_columns": missing_columns,
174             "available_columns": available_columns},
175        )
176
177    if not projected_columns:
178        return list(available_columns), False
179    return projected_columns, True
180
181
182def project_rows_for_analysis(
183    rows: list[dict[str, str]],
184    columns: list[str],
185) -> list[dict[str, str]]:
186    """Project rows to only the selected columns for analysis.
187
188    Args:
189        rows: List of normalized row dicts.
190        columns: Column names to retain.
191
192    Returns:
193        A new list of row dicts with only the specified columns
194        and ``_row_ref``.
195    """
196    projected_rows: list[dict[str, str]] = []
197    for row in rows:
198        projected: dict[str, str] = {
199            column: stringify_value(row.get(column, ""))
200            for column in columns
201        }
202        row_ref = stringify_value(row.get("_row_ref", ""))
203        if row_ref:
204            projected["_row_ref"] = row_ref
205        projected_rows.append(projected)
206    return projected_rows
207
208
209# ---------------------------------------------------------------------------
210# Deduplication
211# ---------------------------------------------------------------------------
212
213def deduplicate_rows_for_analysis(
214    rows: list[dict[str, str]],
215    columns: list[str],
216) -> tuple[list[dict[str, str]], list[str], int, int, list[str]]:
217    """Deduplicate rows that differ only in timestamp or record-ID columns.
218
219    Args:
220        rows: List of projected row dicts.
221        columns: Column names present in the rows.
222
223    Returns:
224        A 5-tuple of ``(kept_rows, output_columns, removed_count,
225        annotated_count, variant_columns)``.
226    """
227    if not rows or not columns:
228        return list(rows), list(columns), 0, 0, []
229
230    variant_columns = [
231        column for column in columns
232        if looks_like_timestamp_column(column) or is_dedup_safe_identifier_column(column)
233    ]
234    if not variant_columns:
235        return [dict(row) for row in rows], list(columns), 0, 0, []
236
237    variant_set = set(variant_columns)
238    base_columns = [
239        column for column in columns
240        if column not in variant_set and column.lower() not in METADATA_COLUMNS and column != DEDUP_COMMENT_COLUMN
241    ]
242    if not base_columns:
243        return [dict(row) for row in rows], list(columns), 0, 0, variant_columns
244
245    kept_rows: list[dict[str, str]] = []
246    representative_by_key: dict[tuple[tuple[str, str], ...], int] = {}
247    dedup_counts: Counter[int] = Counter()
248
249    for row in rows:
250        normalized_row = {str(key): stringify_value(value) for key, value in row.items()}
251        key = tuple((column, normalized_row.get(column, "")) for column in base_columns)
252        representative_index = representative_by_key.get(key)
253        if representative_index is None:
254            representative_by_key[key] = len(kept_rows)
255            kept_rows.append(normalized_row)
256            continue
257        dedup_counts[representative_index] += 1
258
259    annotated_rows = 0
260    output_columns = list(columns)
261    if dedup_counts:
262        if DEDUP_COMMENT_COLUMN not in output_columns:
263            output_columns.append(DEDUP_COMMENT_COLUMN)
264        for representative_index, dedup_count in dedup_counts.items():
265            kept_rows[representative_index][DEDUP_COMMENT_COLUMN] = (
266                f"Deduplicated {dedup_count} records with matching event data and different timestamp/ID."
267            )
268            annotated_rows += 1
269
270    removed_rows = sum(dedup_counts.values())
271    return kept_rows, output_columns, removed_rows, annotated_rows, variant_columns
272
273
274# ---------------------------------------------------------------------------
275# CSV serialisation
276# ---------------------------------------------------------------------------
277
278def build_full_data_csv(
279    rows: list[dict[str, str]],
280    columns: list[str],
281) -> str:
282    """Serialize rows to inline CSV text for prompt inclusion.
283
284    Args:
285        rows: List of row dicts to serialize.
286        columns: Column names for the CSV header.
287
288    Returns:
289        A CSV-formatted string with a ``row_ref`` column prepended.
290    """
291    if not columns:
292        return "No columns available."
293
294    buffer = io.StringIO(newline="")
295    writer = csv.writer(buffer)
296    writer.writerow(["row_ref", *columns])
297    for row in rows:
298        writer.writerow([row.get("_row_ref", ""), *[row.get(column, "") for column in columns]])
299
300    full_csv = buffer.getvalue().strip()
301    if not full_csv:
302        return "No rows available for analysis."
303    return full_csv
304
305
306# ---------------------------------------------------------------------------
307# Analysis-input CSV output
308# ---------------------------------------------------------------------------
309
310def resolve_analysis_input_output_dir(case_dir: Path | None, source_csv_path: Path) -> Path:
311    """Determine the output directory for deduplicated/projected CSV files.
312
313    Args:
314        case_dir: Optional case directory path.
315        source_csv_path: Path to the original parsed CSV file.
316
317    Returns:
318        A ``Path`` to the output directory.
319    """
320    if case_dir is not None:
321        return case_dir / DEDUPLICATED_PARSED_DIRNAME
322    parent = source_csv_path.parent
323    if parent.name.strip().lower() == "parsed":
324        return parent.parent / DEDUPLICATED_PARSED_DIRNAME
325    return parent / DEDUPLICATED_PARSED_DIRNAME
326
327
328def write_analysis_input_csv(
329    source_csv_path: Path,
330    rows: list[dict[str, str]],
331    columns: list[str],
332    case_dir: Path | None = None,
333) -> Path:
334    """Write deduplicated/projected rows to a new CSV file for audit.
335
336    Args:
337        source_csv_path: Path to the original parsed CSV.
338        rows: Row dicts to write.
339        columns: Column names for the CSV header.
340        case_dir: Optional case directory.
341
342    Returns:
343        Path to the newly written analysis-input CSV file.
344
345    Raises:
346        OSError: If the write fails.
347    """
348    output_dir = resolve_analysis_input_output_dir(case_dir=case_dir, source_csv_path=source_csv_path)
349    output_dir.mkdir(parents=True, exist_ok=True)
350    output_path = output_dir / source_csv_path.name
351
352    write_columns = list(columns)
353    include_row_ref = "_row_ref" not in write_columns and any("_row_ref" in r for r in rows)
354    if include_row_ref:
355        write_columns = ["row_ref", *write_columns]
356
357    with output_path.open("w", newline="", encoding="utf-8") as handle:
358        writer = csv.DictWriter(handle, fieldnames=write_columns, extrasaction="ignore")
359        writer.writeheader()
360        for row in rows:
361            out: dict[str, str] = {column: row.get(column, "") for column in columns}
362            if include_row_ref:
363                out["row_ref"] = row.get("_row_ref", "")
364            writer.writerow(out)
365
366    return output_path
367
368
369def build_artifact_csv_attachment(artifact_key: str, csv_path: Path) -> dict[str, str]:
370    """Build an attachment descriptor dict for an artifact CSV file.
371
372    Args:
373        artifact_key: Artifact identifier.
374        csv_path: Path to the CSV file on disk.
375
376    Returns:
377        A dict with ``path``, ``name``, and ``mime_type`` keys.
378    """
379    filename_stem = sanitize_filename(artifact_key)
380    filename = f"{filename_stem}.csv" if not filename_stem.lower().endswith(".csv") else filename_stem
381    return {"path": str(csv_path), "name": filename, "mime_type": "text/csv"}
382
383
384# ---------------------------------------------------------------------------
385# Full artifact data preparation
386# ---------------------------------------------------------------------------
387
388def prepare_artifact_data(
389    artifact_key: str,
390    investigation_context: str,
391    csv_path: Path,
392    *,
393    artifact_metadata: dict[str, str],
394    artifact_prompt_template: str,
395    artifact_prompt_template_small_context: str,
396    artifact_instruction_prompts: dict[str, str],
397    artifact_ai_column_projections: dict[str, tuple[str, ...]],
398    artifact_deduplication_enabled: bool,
399    ai_max_tokens: int,
400    shortened_prompt_cutoff_tokens: int,
401    case_dir: Path | None,
402    audit_log_fn: Any = None,
403) -> tuple[str, Path, list[str]]:
404    """Prepare one artifact CSV as a bounded, analysis-ready prompt.
405
406    Reads the full artifact CSV (all rows), applies column projection,
407    deduplication, and statistics computation.  Fills the appropriate
408    prompt template with all gathered data.
409
410    Args:
411        artifact_key: Unique identifier for the artifact.
412        investigation_context: Free-text investigation context.
413        csv_path: Path to the artifact CSV file.
414        artifact_metadata: Metadata dict for the artifact.
415        artifact_prompt_template: Full prompt template.
416        artifact_prompt_template_small_context: Shortened prompt template.
417        artifact_instruction_prompts: Per-artifact instruction overrides.
418        artifact_ai_column_projections: Column projection config.
419        artifact_deduplication_enabled: Whether to deduplicate rows.
420        ai_max_tokens: Configured AI context window size.
421        shortened_prompt_cutoff_tokens: Token threshold for small template.
422        case_dir: Optional case directory path.
423        audit_log_fn: Optional callable ``(action, details)`` for audit.
424
425    Returns:
426        A 3-tuple of ``(prompt_text, analysis_csv_path, analysis_columns)``.
427    """
428    include_statistics = ai_max_tokens >= shortened_prompt_cutoff_tokens
429    template = artifact_prompt_template if include_statistics else artifact_prompt_template_small_context
430
431    rows: list[dict[str, str]] = []
432    source_row_count = 0
433    columns: list[str] = []
434
435    with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as handle:
436        reader = csv.DictReader(handle)
437        columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")]
438
439        for source_row_count, raw_row in enumerate(reader, start=1):
440            row = normalize_csv_row(raw_row, columns=columns)
441            row["_row_ref"] = str(source_row_count)
442            rows.append(row)
443
444    analysis_columns, projection_applied = select_ai_columns(
445        artifact_key=artifact_key, available_columns=columns,
446        column_projections=artifact_ai_column_projections, audit_log_fn=audit_log_fn,
447    )
448    analysis_rows = project_rows_for_analysis(rows=rows, columns=analysis_columns)
449    deduplicated_records = 0
450    dedup_annotated_rows = 0
451    dedup_variant_columns: list[str] = []
452    analysis_csv_path = csv_path
453    dedup_write_error = ""
454
455    if artifact_deduplication_enabled:
456        analysis_rows, analysis_columns, deduplicated_records, dedup_annotated_rows, dedup_variant_columns = (
457            deduplicate_rows_for_analysis(rows=analysis_rows, columns=analysis_columns)
458        )
459
460    if projection_applied or artifact_deduplication_enabled:
461        try:
462            analysis_csv_path = write_analysis_input_csv(
463                source_csv_path=csv_path, rows=analysis_rows, columns=analysis_columns, case_dir=case_dir,
464            )
465        except OSError as error:
466            analysis_csv_path = csv_path
467            dedup_write_error = str(error)
468
469    if projection_applied and audit_log_fn is not None:
470        projection_details: dict[str, Any] = {
471            "artifact_key": artifact_key, "source_csv": str(csv_path),
472            "analysis_csv": str(analysis_csv_path), "projection_columns": list(analysis_columns),
473        }
474        if dedup_write_error and not artifact_deduplication_enabled:
475            projection_details["write_error"] = dedup_write_error
476        audit_log_fn("artifact_ai_projection", projection_details)
477
478    if artifact_deduplication_enabled and audit_log_fn is not None:
479        dedup_audit_details: dict[str, Any] = {
480            "artifact_key": artifact_key, "source_csv": str(csv_path),
481            "analysis_csv": str(analysis_csv_path), "removed_records": deduplicated_records,
482            "annotated_rows": dedup_annotated_rows, "variant_columns": list(dedup_variant_columns),
483        }
484        if dedup_write_error:
485            dedup_audit_details["write_error"] = dedup_write_error
486        audit_log_fn("artifact_deduplicated", dedup_audit_details)
487
488    statistics = ""
489    if include_statistics:
490        statistics, min_time, max_time = compute_statistics(rows=analysis_rows, columns=analysis_columns)
491        stats_prefix: list[str] = []
492
493        if artifact_deduplication_enabled:
494            dedup_details = [
495                "Artifact deduplication enabled.",
496                f"Rows removed as timestamp/ID-only duplicates: {deduplicated_records}.",
497                f"Rows annotated with deduplication comment: {dedup_annotated_rows}.",
498            ]
499            if dedup_variant_columns:
500                dedup_details.append("Dedup variant columns: " + ", ".join(dedup_variant_columns) + ".")
501            stats_prefix.append("\n".join(dedup_details))
502
503        if projection_applied:
504            stats_prefix.append("AI column projection applied: " + ", ".join(analysis_columns) + ".")
505
506        if stats_prefix:
507            statistics = "\n".join(stats_prefix) + "\n" + statistics
508    else:
509        min_time, max_time = time_range_for_rows(analysis_rows)
510
511    full_data_csv = build_full_data_csv(rows=analysis_rows, columns=analysis_columns)
512    priority_directives = build_priority_directives(investigation_context)
513    ioc_targets = format_ioc_targets(investigation_context)
514    artifact_guidance = _resolve_analysis_instructions(
515        artifact_key=artifact_key, artifact_metadata=artifact_metadata,
516        artifact_instruction_prompts=artifact_instruction_prompts,
517    )
518
519    replacements = {
520        "priority_directives": priority_directives,
521        "investigation_context": investigation_context.strip() or "No investigation context provided.",
522        "ioc_targets": ioc_targets,
523        "artifact_key": artifact_key,
524        "artifact_name": artifact_metadata.get("name", artifact_key),
525        "artifact_description": artifact_metadata.get("description", "No artifact description available."),
526        "total_records": str(len(analysis_rows)),
527        "time_range_start": format_datetime(min_time),
528        "time_range_end": format_datetime(max_time),
529        "statistics": statistics,
530        "analysis_instructions": artifact_guidance,
531        "artifact_guidance": artifact_guidance,
532        "data_csv": full_data_csv,
533    }
534
535    filled = template
536    for placeholder, value in replacements.items():
537        filled = filled.replace(f"{{{{{placeholder}}}}}", value)
538
539    final_context_reminder = build_artifact_final_context_reminder(
540        artifact_key=artifact_key,
541        artifact_name=artifact_metadata.get("name", artifact_key),
542        investigation_context=investigation_context,
543    )
544    if final_context_reminder:
545        filled = f"{filled.rstrip()}\n\n{final_context_reminder}\n"
546
547    return filled, analysis_csv_path, analysis_columns
548
549
550def _resolve_analysis_instructions(
551    artifact_key: str,
552    artifact_metadata: Mapping[str, str],
553    artifact_instruction_prompts: dict[str, str],
554) -> str:
555    """Resolve artifact-specific analysis instructions for the AI prompt.
556
557    Args:
558        artifact_key: Artifact identifier.
559        artifact_metadata: Metadata dict for the artifact.
560        artifact_instruction_prompts: Per-artifact instruction overrides.
561
562    Returns:
563        The analysis instruction text.
564    """
565    normalized_key = normalize_artifact_key(artifact_key)
566    # Try the raw key, the normalised key, and dot/underscore variants
567    # so that "ssh.authorized_keys" matches "ssh_authorized_keys.md".
568    candidates: list[str] = [artifact_key, normalized_key]
569    for variant in (artifact_key.replace(".", "_"), artifact_key.replace("_", ".")):
570        if variant not in candidates:
571            candidates.append(variant)
572    for key in candidates:
573        prompt = artifact_instruction_prompts.get(key.strip().lower(), "").strip()
574        if prompt:
575            return prompt
576
577    for field in ("artifact_guidance", "analysis_instructions", "analysis_hint"):
578        value = stringify_value(artifact_metadata.get(field, ""))
579        if value:
580            return value
581
582    return "No specific analysis instructions are available for this artifact."
def prepare_artifact_data( artifact_key: str, investigation_context: str, csv_path: pathlib.Path, *, artifact_metadata: dict[str, str], artifact_prompt_template: str, artifact_prompt_template_small_context: str, artifact_instruction_prompts: dict[str, str], artifact_ai_column_projections: dict[str, tuple[str, ...]], artifact_deduplication_enabled: bool, ai_max_tokens: int, shortened_prompt_cutoff_tokens: int, case_dir: pathlib.Path | None, audit_log_fn: Any = None) -> tuple[str, pathlib.Path, list[str]]:
389def prepare_artifact_data(
390    artifact_key: str,
391    investigation_context: str,
392    csv_path: Path,
393    *,
394    artifact_metadata: dict[str, str],
395    artifact_prompt_template: str,
396    artifact_prompt_template_small_context: str,
397    artifact_instruction_prompts: dict[str, str],
398    artifact_ai_column_projections: dict[str, tuple[str, ...]],
399    artifact_deduplication_enabled: bool,
400    ai_max_tokens: int,
401    shortened_prompt_cutoff_tokens: int,
402    case_dir: Path | None,
403    audit_log_fn: Any = None,
404) -> tuple[str, Path, list[str]]:
405    """Prepare one artifact CSV as a bounded, analysis-ready prompt.
406
407    Reads the full artifact CSV (all rows), applies column projection,
408    deduplication, and statistics computation.  Fills the appropriate
409    prompt template with all gathered data.
410
411    Args:
412        artifact_key: Unique identifier for the artifact.
413        investigation_context: Free-text investigation context.
414        csv_path: Path to the artifact CSV file.
415        artifact_metadata: Metadata dict for the artifact.
416        artifact_prompt_template: Full prompt template.
417        artifact_prompt_template_small_context: Shortened prompt template.
418        artifact_instruction_prompts: Per-artifact instruction overrides.
419        artifact_ai_column_projections: Column projection config.
420        artifact_deduplication_enabled: Whether to deduplicate rows.
421        ai_max_tokens: Configured AI context window size.
422        shortened_prompt_cutoff_tokens: Token threshold for small template.
423        case_dir: Optional case directory path.
424        audit_log_fn: Optional callable ``(action, details)`` for audit.
425
426    Returns:
427        A 3-tuple of ``(prompt_text, analysis_csv_path, analysis_columns)``.
428    """
429    include_statistics = ai_max_tokens >= shortened_prompt_cutoff_tokens
430    template = artifact_prompt_template if include_statistics else artifact_prompt_template_small_context
431
432    rows: list[dict[str, str]] = []
433    source_row_count = 0
434    columns: list[str] = []
435
436    with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as handle:
437        reader = csv.DictReader(handle)
438        columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")]
439
440        for source_row_count, raw_row in enumerate(reader, start=1):
441            row = normalize_csv_row(raw_row, columns=columns)
442            row["_row_ref"] = str(source_row_count)
443            rows.append(row)
444
445    analysis_columns, projection_applied = select_ai_columns(
446        artifact_key=artifact_key, available_columns=columns,
447        column_projections=artifact_ai_column_projections, audit_log_fn=audit_log_fn,
448    )
449    analysis_rows = project_rows_for_analysis(rows=rows, columns=analysis_columns)
450    deduplicated_records = 0
451    dedup_annotated_rows = 0
452    dedup_variant_columns: list[str] = []
453    analysis_csv_path = csv_path
454    dedup_write_error = ""
455
456    if artifact_deduplication_enabled:
457        analysis_rows, analysis_columns, deduplicated_records, dedup_annotated_rows, dedup_variant_columns = (
458            deduplicate_rows_for_analysis(rows=analysis_rows, columns=analysis_columns)
459        )
460
461    if projection_applied or artifact_deduplication_enabled:
462        try:
463            analysis_csv_path = write_analysis_input_csv(
464                source_csv_path=csv_path, rows=analysis_rows, columns=analysis_columns, case_dir=case_dir,
465            )
466        except OSError as error:
467            analysis_csv_path = csv_path
468            dedup_write_error = str(error)
469
470    if projection_applied and audit_log_fn is not None:
471        projection_details: dict[str, Any] = {
472            "artifact_key": artifact_key, "source_csv": str(csv_path),
473            "analysis_csv": str(analysis_csv_path), "projection_columns": list(analysis_columns),
474        }
475        if dedup_write_error and not artifact_deduplication_enabled:
476            projection_details["write_error"] = dedup_write_error
477        audit_log_fn("artifact_ai_projection", projection_details)
478
479    if artifact_deduplication_enabled and audit_log_fn is not None:
480        dedup_audit_details: dict[str, Any] = {
481            "artifact_key": artifact_key, "source_csv": str(csv_path),
482            "analysis_csv": str(analysis_csv_path), "removed_records": deduplicated_records,
483            "annotated_rows": dedup_annotated_rows, "variant_columns": list(dedup_variant_columns),
484        }
485        if dedup_write_error:
486            dedup_audit_details["write_error"] = dedup_write_error
487        audit_log_fn("artifact_deduplicated", dedup_audit_details)
488
489    statistics = ""
490    if include_statistics:
491        statistics, min_time, max_time = compute_statistics(rows=analysis_rows, columns=analysis_columns)
492        stats_prefix: list[str] = []
493
494        if artifact_deduplication_enabled:
495            dedup_details = [
496                "Artifact deduplication enabled.",
497                f"Rows removed as timestamp/ID-only duplicates: {deduplicated_records}.",
498                f"Rows annotated with deduplication comment: {dedup_annotated_rows}.",
499            ]
500            if dedup_variant_columns:
501                dedup_details.append("Dedup variant columns: " + ", ".join(dedup_variant_columns) + ".")
502            stats_prefix.append("\n".join(dedup_details))
503
504        if projection_applied:
505            stats_prefix.append("AI column projection applied: " + ", ".join(analysis_columns) + ".")
506
507        if stats_prefix:
508            statistics = "\n".join(stats_prefix) + "\n" + statistics
509    else:
510        min_time, max_time = time_range_for_rows(analysis_rows)
511
512    full_data_csv = build_full_data_csv(rows=analysis_rows, columns=analysis_columns)
513    priority_directives = build_priority_directives(investigation_context)
514    ioc_targets = format_ioc_targets(investigation_context)
515    artifact_guidance = _resolve_analysis_instructions(
516        artifact_key=artifact_key, artifact_metadata=artifact_metadata,
517        artifact_instruction_prompts=artifact_instruction_prompts,
518    )
519
520    replacements = {
521        "priority_directives": priority_directives,
522        "investigation_context": investigation_context.strip() or "No investigation context provided.",
523        "ioc_targets": ioc_targets,
524        "artifact_key": artifact_key,
525        "artifact_name": artifact_metadata.get("name", artifact_key),
526        "artifact_description": artifact_metadata.get("description", "No artifact description available."),
527        "total_records": str(len(analysis_rows)),
528        "time_range_start": format_datetime(min_time),
529        "time_range_end": format_datetime(max_time),
530        "statistics": statistics,
531        "analysis_instructions": artifact_guidance,
532        "artifact_guidance": artifact_guidance,
533        "data_csv": full_data_csv,
534    }
535
536    filled = template
537    for placeholder, value in replacements.items():
538        filled = filled.replace(f"{{{{{placeholder}}}}}", value)
539
540    final_context_reminder = build_artifact_final_context_reminder(
541        artifact_key=artifact_key,
542        artifact_name=artifact_metadata.get("name", artifact_key),
543        investigation_context=investigation_context,
544    )
545    if final_context_reminder:
546        filled = f"{filled.rstrip()}\n\n{final_context_reminder}\n"
547
548    return filled, analysis_csv_path, analysis_columns

Prepare one artifact CSV as a bounded, analysis-ready prompt.

Reads the full artifact CSV (all rows), applies column projection, deduplication, and statistics computation. Fills the appropriate prompt template with all gathered data.

Arguments:
  • artifact_key: Unique identifier for the artifact.
  • investigation_context: Free-text investigation context.
  • csv_path: Path to the artifact CSV file.
  • artifact_metadata: Metadata dict for the artifact.
  • artifact_prompt_template: Full prompt template.
  • artifact_prompt_template_small_context: Shortened prompt template.
  • artifact_instruction_prompts: Per-artifact instruction overrides.
  • artifact_ai_column_projections: Column projection config.
  • artifact_deduplication_enabled: Whether to deduplicate rows.
  • ai_max_tokens: Configured AI context window size.
  • shortened_prompt_cutoff_tokens: Token threshold for small template.
  • case_dir: Optional case directory path.
  • audit_log_fn: Optional callable (action, details) for audit.
Returns:

A 3-tuple of (prompt_text, analysis_csv_path, analysis_columns).

def write_analysis_input_csv( source_csv_path: pathlib.Path, rows: list[dict[str, str]], columns: list[str], case_dir: pathlib.Path | None = None) -> pathlib.Path:
329def write_analysis_input_csv(
330    source_csv_path: Path,
331    rows: list[dict[str, str]],
332    columns: list[str],
333    case_dir: Path | None = None,
334) -> Path:
335    """Write deduplicated/projected rows to a new CSV file for audit.
336
337    Args:
338        source_csv_path: Path to the original parsed CSV.
339        rows: Row dicts to write.
340        columns: Column names for the CSV header.
341        case_dir: Optional case directory.
342
343    Returns:
344        Path to the newly written analysis-input CSV file.
345
346    Raises:
347        OSError: If the write fails.
348    """
349    output_dir = resolve_analysis_input_output_dir(case_dir=case_dir, source_csv_path=source_csv_path)
350    output_dir.mkdir(parents=True, exist_ok=True)
351    output_path = output_dir / source_csv_path.name
352
353    write_columns = list(columns)
354    include_row_ref = "_row_ref" not in write_columns and any("_row_ref" in r for r in rows)
355    if include_row_ref:
356        write_columns = ["row_ref", *write_columns]
357
358    with output_path.open("w", newline="", encoding="utf-8") as handle:
359        writer = csv.DictWriter(handle, fieldnames=write_columns, extrasaction="ignore")
360        writer.writeheader()
361        for row in rows:
362            out: dict[str, str] = {column: row.get(column, "") for column in columns}
363            if include_row_ref:
364                out["row_ref"] = row.get("_row_ref", "")
365            writer.writerow(out)
366
367    return output_path

Write deduplicated/projected rows to a new CSV file for audit.

Arguments:
  • source_csv_path: Path to the original parsed CSV.
  • rows: Row dicts to write.
  • columns: Column names for the CSV header.
  • case_dir: Optional case directory.
Returns:

Path to the newly written analysis-input CSV file.

Raises:
  • OSError: If the write fails.
def resolve_analysis_input_output_dir( case_dir: pathlib.Path | None, source_csv_path: pathlib.Path) -> pathlib.Path:
311def resolve_analysis_input_output_dir(case_dir: Path | None, source_csv_path: Path) -> Path:
312    """Determine the output directory for deduplicated/projected CSV files.
313
314    Args:
315        case_dir: Optional case directory path.
316        source_csv_path: Path to the original parsed CSV file.
317
318    Returns:
319        A ``Path`` to the output directory.
320    """
321    if case_dir is not None:
322        return case_dir / DEDUPLICATED_PARSED_DIRNAME
323    parent = source_csv_path.parent
324    if parent.name.strip().lower() == "parsed":
325        return parent.parent / DEDUPLICATED_PARSED_DIRNAME
326    return parent / DEDUPLICATED_PARSED_DIRNAME

Determine the output directory for deduplicated/projected CSV files.

Arguments:
  • case_dir: Optional case directory path.
  • source_csv_path: Path to the original parsed CSV file.
Returns:

A Path to the output directory.

def build_artifact_csv_attachment(artifact_key: str, csv_path: pathlib.Path) -> dict[str, str]:
370def build_artifact_csv_attachment(artifact_key: str, csv_path: Path) -> dict[str, str]:
371    """Build an attachment descriptor dict for an artifact CSV file.
372
373    Args:
374        artifact_key: Artifact identifier.
375        csv_path: Path to the CSV file on disk.
376
377    Returns:
378        A dict with ``path``, ``name``, and ``mime_type`` keys.
379    """
380    filename_stem = sanitize_filename(artifact_key)
381    filename = f"{filename_stem}.csv" if not filename_stem.lower().endswith(".csv") else filename_stem
382    return {"path": str(csv_path), "name": filename, "mime_type": "text/csv"}

Build an attachment descriptor dict for an artifact CSV file.

Arguments:
  • artifact_key: Artifact identifier.
  • csv_path: Path to the CSV file on disk.
Returns:

A dict with path, name, and mime_type keys.

def counter_normalize(value: str) -> str:
62def counter_normalize(value: str) -> str:
63    """Normalize a cell value for frequency counting in statistics.
64
65    Args:
66        value: Raw cell value string.
67
68    Returns:
69        The normalized value, or empty string if low-signal.
70    """
71    cleaned = normalize_table_cell(value=value, cell_limit=120)
72    if cleaned.lower() in LOW_SIGNAL_VALUES:
73        return ""
74    return cleaned

Normalize a cell value for frequency counting in statistics.

Arguments:
  • value: Raw cell value string.
Returns:

The normalized value, or empty string if low-signal.

def compute_statistics( rows: list[dict[str, str]], columns: list[str]) -> tuple[str, datetime.datetime | None, datetime.datetime | None]:
 77def compute_statistics(
 78    rows: list[dict[str, str]],
 79    columns: list[str],
 80) -> tuple[str, datetime | None, datetime | None]:
 81    """Compute record count, time range, and top-20 frequent values per column.
 82
 83    Args:
 84        rows: List of normalized CSV row dicts.
 85        columns: Column names for frequency statistics.
 86
 87    Returns:
 88        A 3-tuple of ``(statistics_text, min_time, max_time)``.
 89    """
 90    total_records = len(rows)
 91    min_time, max_time = time_range_for_rows(rows)
 92    counters: dict[str, Counter[str]] = {column: Counter() for column in columns}
 93
 94    for row in rows:
 95        for column in columns:
 96            value = counter_normalize(row.get(column, ""))
 97            if value:
 98                counters[column][value] += 1
 99
100    lines = [
101        f"Record count: {total_records}",
102        f"Time range start: {format_datetime(min_time)}",
103        f"Time range end: {format_datetime(max_time)}",
104    ]
105
106    if columns:
107        lines.append("Top values (up to 20 per key column):")
108        for column in columns:
109            lines.append(f"- {column}:")
110            top_values = counters[column].most_common(20)
111            if not top_values:
112                lines.append("  (no non-empty values)")
113                continue
114            for value, count in top_values:
115                lines.append(f"  {count}x {value}")
116    else:
117        lines.append("Top values: no key columns selected.")
118
119    return "\n".join(lines), min_time, max_time

Compute record count, time range, and top-20 frequent values per column.

Arguments:
  • rows: List of normalized CSV row dicts.
  • columns: Column names for frequency statistics.
Returns:

A 3-tuple of (statistics_text, min_time, max_time).

def select_ai_columns( artifact_key: str, available_columns: list[str], column_projections: dict[str, tuple[str, ...]], audit_log_fn: Any = None) -> tuple[list[str], bool]:
126def select_ai_columns(
127    artifact_key: str,
128    available_columns: list[str],
129    column_projections: dict[str, tuple[str, ...]],
130    audit_log_fn: Any = None,
131) -> tuple[list[str], bool]:
132    """Select the subset of CSV columns to include in the AI prompt.
133
134    Args:
135        artifact_key: Artifact identifier for projection lookup.
136        available_columns: Column names present in the source CSV.
137        column_projections: Mapping of artifact keys to column tuples.
138        audit_log_fn: Optional callable for logging missing columns.
139
140    Returns:
141        A 2-tuple of ``(selected_columns, projection_applied)``.
142    """
143    normalized_key = normalize_artifact_key(artifact_key)
144    configured_columns = column_projections.get(normalized_key)
145    if not configured_columns:
146        return list(available_columns), False
147
148    lookup = {column.strip().lower(): column for column in available_columns}
149    projected_columns: list[str] = []
150    missing_columns: list[str] = []
151    has_wildcard = False
152    for column_name in configured_columns:
153        if column_name.strip() == "*":
154            has_wildcard = True
155            continue
156        matched = lookup.get(column_name.strip().lower())
157        if matched is not None:
158            projected_columns.append(matched)
159        else:
160            missing_columns.append(column_name)
161
162    # A trailing ``*`` means "pass through any remaining columns not already
163    # listed".  This is used for artifacts with dynamic/variable fields (e.g.
164    # systemd service records whose fields vary per unit).
165    if has_wildcard:
166        already_selected = {c.strip().lower() for c in projected_columns}
167        for col in available_columns:
168            if col.strip().lower() not in already_selected:
169                projected_columns.append(col)
170
171    if missing_columns and audit_log_fn is not None:
172        audit_log_fn(
173            "artifact_ai_projection_warning",
174            {"artifact_key": artifact_key, "missing_columns": missing_columns,
175             "available_columns": available_columns},
176        )
177
178    if not projected_columns:
179        return list(available_columns), False
180    return projected_columns, True

Select the subset of CSV columns to include in the AI prompt.

Arguments:
  • artifact_key: Artifact identifier for projection lookup.
  • available_columns: Column names present in the source CSV.
  • column_projections: Mapping of artifact keys to column tuples.
  • audit_log_fn: Optional callable for logging missing columns.
Returns:

A 2-tuple of (selected_columns, projection_applied).

def project_rows_for_analysis(rows: list[dict[str, str]], columns: list[str]) -> list[dict[str, str]]:
183def project_rows_for_analysis(
184    rows: list[dict[str, str]],
185    columns: list[str],
186) -> list[dict[str, str]]:
187    """Project rows to only the selected columns for analysis.
188
189    Args:
190        rows: List of normalized row dicts.
191        columns: Column names to retain.
192
193    Returns:
194        A new list of row dicts with only the specified columns
195        and ``_row_ref``.
196    """
197    projected_rows: list[dict[str, str]] = []
198    for row in rows:
199        projected: dict[str, str] = {
200            column: stringify_value(row.get(column, ""))
201            for column in columns
202        }
203        row_ref = stringify_value(row.get("_row_ref", ""))
204        if row_ref:
205            projected["_row_ref"] = row_ref
206        projected_rows.append(projected)
207    return projected_rows

Project rows to only the selected columns for analysis.

Arguments:
  • rows: List of normalized row dicts.
  • columns: Column names to retain.
Returns:

A new list of row dicts with only the specified columns and _row_ref.

def deduplicate_rows_for_analysis( rows: list[dict[str, str]], columns: list[str]) -> tuple[list[dict[str, str]], list[str], int, int, list[str]]:
214def deduplicate_rows_for_analysis(
215    rows: list[dict[str, str]],
216    columns: list[str],
217) -> tuple[list[dict[str, str]], list[str], int, int, list[str]]:
218    """Deduplicate rows that differ only in timestamp or record-ID columns.
219
220    Args:
221        rows: List of projected row dicts.
222        columns: Column names present in the rows.
223
224    Returns:
225        A 5-tuple of ``(kept_rows, output_columns, removed_count,
226        annotated_count, variant_columns)``.
227    """
228    if not rows or not columns:
229        return list(rows), list(columns), 0, 0, []
230
231    variant_columns = [
232        column for column in columns
233        if looks_like_timestamp_column(column) or is_dedup_safe_identifier_column(column)
234    ]
235    if not variant_columns:
236        return [dict(row) for row in rows], list(columns), 0, 0, []
237
238    variant_set = set(variant_columns)
239    base_columns = [
240        column for column in columns
241        if column not in variant_set and column.lower() not in METADATA_COLUMNS and column != DEDUP_COMMENT_COLUMN
242    ]
243    if not base_columns:
244        return [dict(row) for row in rows], list(columns), 0, 0, variant_columns
245
246    kept_rows: list[dict[str, str]] = []
247    representative_by_key: dict[tuple[tuple[str, str], ...], int] = {}
248    dedup_counts: Counter[int] = Counter()
249
250    for row in rows:
251        normalized_row = {str(key): stringify_value(value) for key, value in row.items()}
252        key = tuple((column, normalized_row.get(column, "")) for column in base_columns)
253        representative_index = representative_by_key.get(key)
254        if representative_index is None:
255            representative_by_key[key] = len(kept_rows)
256            kept_rows.append(normalized_row)
257            continue
258        dedup_counts[representative_index] += 1
259
260    annotated_rows = 0
261    output_columns = list(columns)
262    if dedup_counts:
263        if DEDUP_COMMENT_COLUMN not in output_columns:
264            output_columns.append(DEDUP_COMMENT_COLUMN)
265        for representative_index, dedup_count in dedup_counts.items():
266            kept_rows[representative_index][DEDUP_COMMENT_COLUMN] = (
267                f"Deduplicated {dedup_count} records with matching event data and different timestamp/ID."
268            )
269            annotated_rows += 1
270
271    removed_rows = sum(dedup_counts.values())
272    return kept_rows, output_columns, removed_rows, annotated_rows, variant_columns

Deduplicate rows that differ only in timestamp or record-ID columns.

Arguments:
  • rows: List of projected row dicts.
  • columns: Column names present in the rows.
Returns:

A 5-tuple of (kept_rows, output_columns, removed_count, annotated_count, variant_columns).

def build_full_data_csv(rows: list[dict[str, str]], columns: list[str]) -> str:
279def build_full_data_csv(
280    rows: list[dict[str, str]],
281    columns: list[str],
282) -> str:
283    """Serialize rows to inline CSV text for prompt inclusion.
284
285    Args:
286        rows: List of row dicts to serialize.
287        columns: Column names for the CSV header.
288
289    Returns:
290        A CSV-formatted string with a ``row_ref`` column prepended.
291    """
292    if not columns:
293        return "No columns available."
294
295    buffer = io.StringIO(newline="")
296    writer = csv.writer(buffer)
297    writer.writerow(["row_ref", *columns])
298    for row in rows:
299        writer.writerow([row.get("_row_ref", ""), *[row.get(column, "") for column in columns]])
300
301    full_csv = buffer.getvalue().strip()
302    if not full_csv:
303        return "No rows available for analysis."
304    return full_csv

Serialize rows to inline CSV text for prompt inclusion.

Arguments:
  • rows: List of row dicts to serialize.
  • columns: Column names for the CSV header.
Returns:

A CSV-formatted string with a row_ref column prepended.