app.analyzer.citations

Citation validation for AI-generated forensic analysis.

Spot-checks timestamps, row references, and column names cited by the AI against source CSV data to detect potential hallucinations.

Attributes:
  • LOGGER: Module-level logger instance.
  1"""Citation validation for AI-generated forensic analysis.
  2
  3Spot-checks timestamps, row references, and column names cited by the AI
  4against source CSV data to detect potential hallucinations.
  5
  6Attributes:
  7    LOGGER: Module-level logger instance.
  8"""
  9
 10from __future__ import annotations
 11
 12import csv
 13import logging
 14from datetime import datetime, timezone
 15from pathlib import Path
 16from typing import Any
 17
 18from .constants import CITED_ISO_TIMESTAMP_RE, CITED_ROW_REF_RE, CITED_COLUMN_REF_RE
 19from .utils import looks_like_timestamp_column, stringify_value
 20
 21LOGGER = logging.getLogger(__name__)
 22
 23__all__ = [
 24    "validate_citations",
 25    "timestamp_lookup_keys",
 26    "timestamp_found_in_csv",
 27    "match_column_name",
 28]
 29
 30
 31def timestamp_lookup_keys(value: str) -> set[str]:
 32    """Build comparable lookup keys for a timestamp string.
 33
 34    Generates multiple normalized representations of the input timestamp
 35    (with/without timezone, with/without fractional seconds, space vs ``T``
 36    separator) so that citation checks can match regardless of formatting.
 37
 38    Args:
 39        value: Raw timestamp string from the CSV data.
 40
 41    Returns:
 42        A set of non-empty string keys suitable for membership testing.
 43    """
 44    text = value.strip()
 45    if not text:
 46        return set()
 47
 48    normalized = text.replace(" ", "T")
 49    keys: set[str] = {text, normalized}
 50
 51    match = CITED_ISO_TIMESTAMP_RE.search(normalized)
 52    if match:
 53        token = match.group()
 54        keys.add(token)
 55        normalized_token = token.replace(" ", "T")
 56        keys.add(normalized_token)
 57
 58        if normalized_token.endswith("Z"):
 59            keys.add(f"{normalized_token[:-1]}+00:00")
 60
 61        token_without_tz = normalized_token
 62        suffix = ""
 63        if token_without_tz.endswith("Z"):
 64            suffix = "Z"
 65            token_without_tz = token_without_tz[:-1]
 66        elif len(token_without_tz) >= 6 and token_without_tz[-6] in {"+", "-"} and token_without_tz[-3] == ":":
 67            suffix = token_without_tz[-6:]
 68            token_without_tz = token_without_tz[:-6]
 69
 70        if "." in token_without_tz:
 71            base_seconds = token_without_tz.split(".", 1)[0]
 72            keys.add(base_seconds)
 73            if suffix:
 74                keys.add(f"{base_seconds}{suffix}")
 75        else:
 76            keys.add(token_without_tz)
 77
 78    try:
 79        parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00"))
 80    except ValueError:
 81        parsed = None
 82
 83    if parsed is not None:
 84        if parsed.tzinfo is not None:
 85            parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None)
 86        keys.add(parsed.isoformat(timespec="seconds"))
 87        keys.add(parsed.isoformat(timespec="microseconds"))
 88
 89    return {key for key in keys if key}
 90
 91
 92def timestamp_found_in_csv(cited: str, csv_timestamp_lookup: set[str]) -> bool:
 93    """Check whether a cited timestamp matches preloaded CSV timestamp lookup keys.
 94
 95    Args:
 96        cited: Timestamp string cited by the AI in its analysis text.
 97        csv_timestamp_lookup: Pre-built set of normalized timestamp keys
 98            from the source CSV.
 99
100    Returns:
101        ``True`` if any normalized form of *cited* is present in the
102        lookup set, ``False`` otherwise.
103    """
104    if not csv_timestamp_lookup:
105        return False
106    return any(key in csv_timestamp_lookup for key in timestamp_lookup_keys(cited))
107
108
109def match_column_name(
110    cited_column: str, csv_columns: list[str]
111) -> tuple[str, str | None]:
112    """Match an AI-cited column name against actual CSV headers.
113
114    Performs a three-tier match: exact, then case-insensitive with
115    whitespace/underscore normalization (fuzzy), then reports unverifiable.
116
117    Args:
118        cited_column: Column name string cited by the AI.
119        csv_columns: Actual CSV header column names.
120
121    Returns:
122        A 2-tuple of ``(match_status, matched_header)`` where
123        *match_status* is one of ``"exact"``, ``"fuzzy"``, or
124        ``"unverifiable"``.
125    """
126    cited_stripped = cited_column.strip()
127
128    for header in csv_columns:
129        if header == cited_stripped:
130            return "exact", header
131
132    def _normalize_col(name: str) -> str:
133        """Normalize a column name for fuzzy comparison."""
134        return name.strip().lower().replace("_", "").replace(" ", "")
135
136    cited_norm = _normalize_col(cited_stripped)
137    for header in csv_columns:
138        if _normalize_col(header) == cited_norm:
139            return "fuzzy", header
140
141    return "unverifiable", None
142
143
144def validate_citations(
145    artifact_key: str,
146    analysis_text: str,
147    csv_path: Path,
148    citation_spot_check_limit: int,
149    audit_log_fn: Any = None,
150) -> list[str]:
151    """Spot-check timestamps, row references, and column names cited by the AI.
152
153    Extracts ISO timestamps, ``row <N>`` references, and column/field
154    name references from the AI's analysis text, then verifies each
155    against the source CSV data.
156
157    Args:
158        artifact_key: Artifact identifier used for logging.
159        analysis_text: The AI's analysis text to scan for citations.
160        csv_path: Path to the source CSV file.
161        citation_spot_check_limit: Maximum citations to validate per
162            category.
163        audit_log_fn: Optional callable ``(action, details)`` for audit
164            logging.
165
166    Returns:
167        A list of human-readable warning strings for values that could
168        not be verified.
169    """
170    if analysis_text.startswith("Analysis failed:"):
171        return []
172
173    cited_timestamps: list[str] = CITED_ISO_TIMESTAMP_RE.findall(analysis_text)
174    cited_row_refs: list[str] = CITED_ROW_REF_RE.findall(analysis_text)
175
176    cited_columns: list[str] = []
177    for match in CITED_COLUMN_REF_RE.finditer(analysis_text):
178        cited_col = match.group(1) or match.group(2) or match.group(3)
179        if cited_col and cited_col.strip():
180            cited_columns.append(cited_col.strip())
181    seen_cols: set[str] = set()
182    unique_cited_columns: list[str] = []
183    for col in cited_columns:
184        if col not in seen_cols:
185            seen_cols.add(col)
186            unique_cited_columns.append(col)
187    cited_columns = unique_cited_columns
188
189    if not cited_timestamps and not cited_row_refs and not cited_columns:
190        return []
191
192    csv_timestamp_lookup: set[str] = set()
193    csv_row_refs: set[str] = set()
194    csv_columns: list[str] = []
195    try:
196        with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as fh:
197            reader = csv.DictReader(fh)
198            csv_columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")]
199            ts_columns = [c for c in csv_columns if looks_like_timestamp_column(c)]
200            has_row_ref_col = "row_ref" in csv_columns
201            for row_number, raw_row in enumerate(reader, start=1):
202                if has_row_ref_col:
203                    ref_val = stringify_value(raw_row.get("row_ref"))
204                    if ref_val:
205                        csv_row_refs.add(ref_val)
206                else:
207                    csv_row_refs.add(str(row_number))
208                for col in ts_columns:
209                    val = stringify_value(raw_row.get(col))
210                    if val:
211                        csv_timestamp_lookup.update(timestamp_lookup_keys(val))
212    except OSError:
213        return []
214
215    warnings: list[str] = []
216    column_match_results: list[dict[str, str]] = []
217
218    for ts in cited_timestamps[:citation_spot_check_limit]:
219        if not timestamp_found_in_csv(ts, csv_timestamp_lookup):
220            warnings.append(
221                f"Note: AI cited timestamp {ts} which could not be verified in the source data."
222            )
223
224    for ref in cited_row_refs[:citation_spot_check_limit]:
225        if ref not in csv_row_refs:
226            warnings.append(
227                f"Note: AI cited row {ref} which could not be verified in the source data."
228            )
229
230    for cited_col in cited_columns[:citation_spot_check_limit]:
231        match_status, matched_header = match_column_name(cited_col, csv_columns)
232        column_match_results.append({
233            "cited": cited_col,
234            "match_status": match_status,
235            "matched_header": matched_header or "",
236        })
237        if match_status == "fuzzy":
238            LOGGER.warning(
239                "AI cited column '%s' is a fuzzy match for CSV header '%s' "
240                "(case/whitespace difference) in artifact %s.",
241                cited_col,
242                matched_header,
243                artifact_key,
244            )
245            warnings.append(
246                f"Note: AI cited column '{cited_col}' is a fuzzy match for CSV header "
247                f"'{matched_header}' (case or whitespace difference)."
248            )
249        elif match_status == "unverifiable":
250            warnings.append(
251                f"Note: AI cited column '{cited_col}' which does not match any column "
252                f"in the source data — citation is unverifiable."
253            )
254
255    if warnings and audit_log_fn is not None:
256        audit_details: dict[str, object] = {
257            "artifact_key": artifact_key,
258            "citation_validation": "warnings_found",
259            "warning_count": len(warnings),
260            "warnings": warnings[:10],
261        }
262        if column_match_results:
263            audit_details["column_match_results"] = column_match_results[:10]
264        audit_log_fn("citation_validation", audit_details)
265
266    return warnings
def validate_citations( artifact_key: str, analysis_text: str, csv_path: pathlib.Path, citation_spot_check_limit: int, audit_log_fn: Any = None) -> list[str]:
145def validate_citations(
146    artifact_key: str,
147    analysis_text: str,
148    csv_path: Path,
149    citation_spot_check_limit: int,
150    audit_log_fn: Any = None,
151) -> list[str]:
152    """Spot-check timestamps, row references, and column names cited by the AI.
153
154    Extracts ISO timestamps, ``row <N>`` references, and column/field
155    name references from the AI's analysis text, then verifies each
156    against the source CSV data.
157
158    Args:
159        artifact_key: Artifact identifier used for logging.
160        analysis_text: The AI's analysis text to scan for citations.
161        csv_path: Path to the source CSV file.
162        citation_spot_check_limit: Maximum citations to validate per
163            category.
164        audit_log_fn: Optional callable ``(action, details)`` for audit
165            logging.
166
167    Returns:
168        A list of human-readable warning strings for values that could
169        not be verified.
170    """
171    if analysis_text.startswith("Analysis failed:"):
172        return []
173
174    cited_timestamps: list[str] = CITED_ISO_TIMESTAMP_RE.findall(analysis_text)
175    cited_row_refs: list[str] = CITED_ROW_REF_RE.findall(analysis_text)
176
177    cited_columns: list[str] = []
178    for match in CITED_COLUMN_REF_RE.finditer(analysis_text):
179        cited_col = match.group(1) or match.group(2) or match.group(3)
180        if cited_col and cited_col.strip():
181            cited_columns.append(cited_col.strip())
182    seen_cols: set[str] = set()
183    unique_cited_columns: list[str] = []
184    for col in cited_columns:
185        if col not in seen_cols:
186            seen_cols.add(col)
187            unique_cited_columns.append(col)
188    cited_columns = unique_cited_columns
189
190    if not cited_timestamps and not cited_row_refs and not cited_columns:
191        return []
192
193    csv_timestamp_lookup: set[str] = set()
194    csv_row_refs: set[str] = set()
195    csv_columns: list[str] = []
196    try:
197        with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as fh:
198            reader = csv.DictReader(fh)
199            csv_columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")]
200            ts_columns = [c for c in csv_columns if looks_like_timestamp_column(c)]
201            has_row_ref_col = "row_ref" in csv_columns
202            for row_number, raw_row in enumerate(reader, start=1):
203                if has_row_ref_col:
204                    ref_val = stringify_value(raw_row.get("row_ref"))
205                    if ref_val:
206                        csv_row_refs.add(ref_val)
207                else:
208                    csv_row_refs.add(str(row_number))
209                for col in ts_columns:
210                    val = stringify_value(raw_row.get(col))
211                    if val:
212                        csv_timestamp_lookup.update(timestamp_lookup_keys(val))
213    except OSError:
214        return []
215
216    warnings: list[str] = []
217    column_match_results: list[dict[str, str]] = []
218
219    for ts in cited_timestamps[:citation_spot_check_limit]:
220        if not timestamp_found_in_csv(ts, csv_timestamp_lookup):
221            warnings.append(
222                f"Note: AI cited timestamp {ts} which could not be verified in the source data."
223            )
224
225    for ref in cited_row_refs[:citation_spot_check_limit]:
226        if ref not in csv_row_refs:
227            warnings.append(
228                f"Note: AI cited row {ref} which could not be verified in the source data."
229            )
230
231    for cited_col in cited_columns[:citation_spot_check_limit]:
232        match_status, matched_header = match_column_name(cited_col, csv_columns)
233        column_match_results.append({
234            "cited": cited_col,
235            "match_status": match_status,
236            "matched_header": matched_header or "",
237        })
238        if match_status == "fuzzy":
239            LOGGER.warning(
240                "AI cited column '%s' is a fuzzy match for CSV header '%s' "
241                "(case/whitespace difference) in artifact %s.",
242                cited_col,
243                matched_header,
244                artifact_key,
245            )
246            warnings.append(
247                f"Note: AI cited column '{cited_col}' is a fuzzy match for CSV header "
248                f"'{matched_header}' (case or whitespace difference)."
249            )
250        elif match_status == "unverifiable":
251            warnings.append(
252                f"Note: AI cited column '{cited_col}' which does not match any column "
253                f"in the source data — citation is unverifiable."
254            )
255
256    if warnings and audit_log_fn is not None:
257        audit_details: dict[str, object] = {
258            "artifact_key": artifact_key,
259            "citation_validation": "warnings_found",
260            "warning_count": len(warnings),
261            "warnings": warnings[:10],
262        }
263        if column_match_results:
264            audit_details["column_match_results"] = column_match_results[:10]
265        audit_log_fn("citation_validation", audit_details)
266
267    return warnings

Spot-check timestamps, row references, and column names cited by the AI.

Extracts ISO timestamps, row <N> references, and column/field name references from the AI's analysis text, then verifies each against the source CSV data.

Arguments:
  • artifact_key: Artifact identifier used for logging.
  • analysis_text: The AI's analysis text to scan for citations.
  • csv_path: Path to the source CSV file.
  • citation_spot_check_limit: Maximum citations to validate per category.
  • audit_log_fn: Optional callable (action, details) for audit logging.
Returns:

A list of human-readable warning strings for values that could not be verified.

def timestamp_lookup_keys(value: str) -> set[str]:
32def timestamp_lookup_keys(value: str) -> set[str]:
33    """Build comparable lookup keys for a timestamp string.
34
35    Generates multiple normalized representations of the input timestamp
36    (with/without timezone, with/without fractional seconds, space vs ``T``
37    separator) so that citation checks can match regardless of formatting.
38
39    Args:
40        value: Raw timestamp string from the CSV data.
41
42    Returns:
43        A set of non-empty string keys suitable for membership testing.
44    """
45    text = value.strip()
46    if not text:
47        return set()
48
49    normalized = text.replace(" ", "T")
50    keys: set[str] = {text, normalized}
51
52    match = CITED_ISO_TIMESTAMP_RE.search(normalized)
53    if match:
54        token = match.group()
55        keys.add(token)
56        normalized_token = token.replace(" ", "T")
57        keys.add(normalized_token)
58
59        if normalized_token.endswith("Z"):
60            keys.add(f"{normalized_token[:-1]}+00:00")
61
62        token_without_tz = normalized_token
63        suffix = ""
64        if token_without_tz.endswith("Z"):
65            suffix = "Z"
66            token_without_tz = token_without_tz[:-1]
67        elif len(token_without_tz) >= 6 and token_without_tz[-6] in {"+", "-"} and token_without_tz[-3] == ":":
68            suffix = token_without_tz[-6:]
69            token_without_tz = token_without_tz[:-6]
70
71        if "." in token_without_tz:
72            base_seconds = token_without_tz.split(".", 1)[0]
73            keys.add(base_seconds)
74            if suffix:
75                keys.add(f"{base_seconds}{suffix}")
76        else:
77            keys.add(token_without_tz)
78
79    try:
80        parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00"))
81    except ValueError:
82        parsed = None
83
84    if parsed is not None:
85        if parsed.tzinfo is not None:
86            parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None)
87        keys.add(parsed.isoformat(timespec="seconds"))
88        keys.add(parsed.isoformat(timespec="microseconds"))
89
90    return {key for key in keys if key}

Build comparable lookup keys for a timestamp string.

Generates multiple normalized representations of the input timestamp (with/without timezone, with/without fractional seconds, space vs T separator) so that citation checks can match regardless of formatting.

Arguments:
  • value: Raw timestamp string from the CSV data.
Returns:

A set of non-empty string keys suitable for membership testing.

def timestamp_found_in_csv(cited: str, csv_timestamp_lookup: set[str]) -> bool:
 93def timestamp_found_in_csv(cited: str, csv_timestamp_lookup: set[str]) -> bool:
 94    """Check whether a cited timestamp matches preloaded CSV timestamp lookup keys.
 95
 96    Args:
 97        cited: Timestamp string cited by the AI in its analysis text.
 98        csv_timestamp_lookup: Pre-built set of normalized timestamp keys
 99            from the source CSV.
100
101    Returns:
102        ``True`` if any normalized form of *cited* is present in the
103        lookup set, ``False`` otherwise.
104    """
105    if not csv_timestamp_lookup:
106        return False
107    return any(key in csv_timestamp_lookup for key in timestamp_lookup_keys(cited))

Check whether a cited timestamp matches preloaded CSV timestamp lookup keys.

Arguments:
  • cited: Timestamp string cited by the AI in its analysis text.
  • csv_timestamp_lookup: Pre-built set of normalized timestamp keys from the source CSV.
Returns:

True if any normalized form of cited is present in the lookup set, False otherwise.

def match_column_name(cited_column: str, csv_columns: list[str]) -> tuple[str, str | None]:
110def match_column_name(
111    cited_column: str, csv_columns: list[str]
112) -> tuple[str, str | None]:
113    """Match an AI-cited column name against actual CSV headers.
114
115    Performs a three-tier match: exact, then case-insensitive with
116    whitespace/underscore normalization (fuzzy), then reports unverifiable.
117
118    Args:
119        cited_column: Column name string cited by the AI.
120        csv_columns: Actual CSV header column names.
121
122    Returns:
123        A 2-tuple of ``(match_status, matched_header)`` where
124        *match_status* is one of ``"exact"``, ``"fuzzy"``, or
125        ``"unverifiable"``.
126    """
127    cited_stripped = cited_column.strip()
128
129    for header in csv_columns:
130        if header == cited_stripped:
131            return "exact", header
132
133    def _normalize_col(name: str) -> str:
134        """Normalize a column name for fuzzy comparison."""
135        return name.strip().lower().replace("_", "").replace(" ", "")
136
137    cited_norm = _normalize_col(cited_stripped)
138    for header in csv_columns:
139        if _normalize_col(header) == cited_norm:
140            return "fuzzy", header
141
142    return "unverifiable", None

Match an AI-cited column name against actual CSV headers.

Performs a three-tier match: exact, then case-insensitive with whitespace/underscore normalization (fuzzy), then reports unverifiable.

Arguments:
  • cited_column: Column name string cited by the AI.
  • csv_columns: Actual CSV header column names.
Returns:

A 2-tuple of (match_status, matched_header) where match_status is one of "exact", "fuzzy", or "unverifiable".