app.analyzer.citations
Citation validation for AI-generated forensic analysis.
Spot-checks timestamps, row references, and column names cited by the AI against source CSV data to detect potential hallucinations.
Attributes:
- LOGGER: Module-level logger instance.
1"""Citation validation for AI-generated forensic analysis. 2 3Spot-checks timestamps, row references, and column names cited by the AI 4against source CSV data to detect potential hallucinations. 5 6Attributes: 7 LOGGER: Module-level logger instance. 8""" 9 10from __future__ import annotations 11 12import csv 13import logging 14from datetime import datetime, timezone 15from pathlib import Path 16from typing import Any 17 18from .constants import CITED_ISO_TIMESTAMP_RE, CITED_ROW_REF_RE, CITED_COLUMN_REF_RE 19from .utils import looks_like_timestamp_column, stringify_value 20 21LOGGER = logging.getLogger(__name__) 22 23__all__ = [ 24 "validate_citations", 25 "timestamp_lookup_keys", 26 "timestamp_found_in_csv", 27 "match_column_name", 28] 29 30 31def timestamp_lookup_keys(value: str) -> set[str]: 32 """Build comparable lookup keys for a timestamp string. 33 34 Generates multiple normalized representations of the input timestamp 35 (with/without timezone, with/without fractional seconds, space vs ``T`` 36 separator) so that citation checks can match regardless of formatting. 37 38 Args: 39 value: Raw timestamp string from the CSV data. 40 41 Returns: 42 A set of non-empty string keys suitable for membership testing. 43 """ 44 text = value.strip() 45 if not text: 46 return set() 47 48 normalized = text.replace(" ", "T") 49 keys: set[str] = {text, normalized} 50 51 match = CITED_ISO_TIMESTAMP_RE.search(normalized) 52 if match: 53 token = match.group() 54 keys.add(token) 55 normalized_token = token.replace(" ", "T") 56 keys.add(normalized_token) 57 58 if normalized_token.endswith("Z"): 59 keys.add(f"{normalized_token[:-1]}+00:00") 60 61 token_without_tz = normalized_token 62 suffix = "" 63 if token_without_tz.endswith("Z"): 64 suffix = "Z" 65 token_without_tz = token_without_tz[:-1] 66 elif len(token_without_tz) >= 6 and token_without_tz[-6] in {"+", "-"} and token_without_tz[-3] == ":": 67 suffix = token_without_tz[-6:] 68 token_without_tz = token_without_tz[:-6] 69 70 if "." in token_without_tz: 71 base_seconds = token_without_tz.split(".", 1)[0] 72 keys.add(base_seconds) 73 if suffix: 74 keys.add(f"{base_seconds}{suffix}") 75 else: 76 keys.add(token_without_tz) 77 78 try: 79 parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00")) 80 except ValueError: 81 parsed = None 82 83 if parsed is not None: 84 if parsed.tzinfo is not None: 85 parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) 86 keys.add(parsed.isoformat(timespec="seconds")) 87 keys.add(parsed.isoformat(timespec="microseconds")) 88 89 return {key for key in keys if key} 90 91 92def timestamp_found_in_csv(cited: str, csv_timestamp_lookup: set[str]) -> bool: 93 """Check whether a cited timestamp matches preloaded CSV timestamp lookup keys. 94 95 Args: 96 cited: Timestamp string cited by the AI in its analysis text. 97 csv_timestamp_lookup: Pre-built set of normalized timestamp keys 98 from the source CSV. 99 100 Returns: 101 ``True`` if any normalized form of *cited* is present in the 102 lookup set, ``False`` otherwise. 103 """ 104 if not csv_timestamp_lookup: 105 return False 106 return any(key in csv_timestamp_lookup for key in timestamp_lookup_keys(cited)) 107 108 109def match_column_name( 110 cited_column: str, csv_columns: list[str] 111) -> tuple[str, str | None]: 112 """Match an AI-cited column name against actual CSV headers. 113 114 Performs a three-tier match: exact, then case-insensitive with 115 whitespace/underscore normalization (fuzzy), then reports unverifiable. 116 117 Args: 118 cited_column: Column name string cited by the AI. 119 csv_columns: Actual CSV header column names. 120 121 Returns: 122 A 2-tuple of ``(match_status, matched_header)`` where 123 *match_status* is one of ``"exact"``, ``"fuzzy"``, or 124 ``"unverifiable"``. 125 """ 126 cited_stripped = cited_column.strip() 127 128 for header in csv_columns: 129 if header == cited_stripped: 130 return "exact", header 131 132 def _normalize_col(name: str) -> str: 133 """Normalize a column name for fuzzy comparison.""" 134 return name.strip().lower().replace("_", "").replace(" ", "") 135 136 cited_norm = _normalize_col(cited_stripped) 137 for header in csv_columns: 138 if _normalize_col(header) == cited_norm: 139 return "fuzzy", header 140 141 return "unverifiable", None 142 143 144def validate_citations( 145 artifact_key: str, 146 analysis_text: str, 147 csv_path: Path, 148 citation_spot_check_limit: int, 149 audit_log_fn: Any = None, 150) -> list[str]: 151 """Spot-check timestamps, row references, and column names cited by the AI. 152 153 Extracts ISO timestamps, ``row <N>`` references, and column/field 154 name references from the AI's analysis text, then verifies each 155 against the source CSV data. 156 157 Args: 158 artifact_key: Artifact identifier used for logging. 159 analysis_text: The AI's analysis text to scan for citations. 160 csv_path: Path to the source CSV file. 161 citation_spot_check_limit: Maximum citations to validate per 162 category. 163 audit_log_fn: Optional callable ``(action, details)`` for audit 164 logging. 165 166 Returns: 167 A list of human-readable warning strings for values that could 168 not be verified. 169 """ 170 if analysis_text.startswith("Analysis failed:"): 171 return [] 172 173 cited_timestamps: list[str] = CITED_ISO_TIMESTAMP_RE.findall(analysis_text) 174 cited_row_refs: list[str] = CITED_ROW_REF_RE.findall(analysis_text) 175 176 cited_columns: list[str] = [] 177 for match in CITED_COLUMN_REF_RE.finditer(analysis_text): 178 cited_col = match.group(1) or match.group(2) or match.group(3) 179 if cited_col and cited_col.strip(): 180 cited_columns.append(cited_col.strip()) 181 seen_cols: set[str] = set() 182 unique_cited_columns: list[str] = [] 183 for col in cited_columns: 184 if col not in seen_cols: 185 seen_cols.add(col) 186 unique_cited_columns.append(col) 187 cited_columns = unique_cited_columns 188 189 if not cited_timestamps and not cited_row_refs and not cited_columns: 190 return [] 191 192 csv_timestamp_lookup: set[str] = set() 193 csv_row_refs: set[str] = set() 194 csv_columns: list[str] = [] 195 try: 196 with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as fh: 197 reader = csv.DictReader(fh) 198 csv_columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")] 199 ts_columns = [c for c in csv_columns if looks_like_timestamp_column(c)] 200 has_row_ref_col = "row_ref" in csv_columns 201 for row_number, raw_row in enumerate(reader, start=1): 202 if has_row_ref_col: 203 ref_val = stringify_value(raw_row.get("row_ref")) 204 if ref_val: 205 csv_row_refs.add(ref_val) 206 else: 207 csv_row_refs.add(str(row_number)) 208 for col in ts_columns: 209 val = stringify_value(raw_row.get(col)) 210 if val: 211 csv_timestamp_lookup.update(timestamp_lookup_keys(val)) 212 except OSError: 213 return [] 214 215 warnings: list[str] = [] 216 column_match_results: list[dict[str, str]] = [] 217 218 for ts in cited_timestamps[:citation_spot_check_limit]: 219 if not timestamp_found_in_csv(ts, csv_timestamp_lookup): 220 warnings.append( 221 f"Note: AI cited timestamp {ts} which could not be verified in the source data." 222 ) 223 224 for ref in cited_row_refs[:citation_spot_check_limit]: 225 if ref not in csv_row_refs: 226 warnings.append( 227 f"Note: AI cited row {ref} which could not be verified in the source data." 228 ) 229 230 for cited_col in cited_columns[:citation_spot_check_limit]: 231 match_status, matched_header = match_column_name(cited_col, csv_columns) 232 column_match_results.append({ 233 "cited": cited_col, 234 "match_status": match_status, 235 "matched_header": matched_header or "", 236 }) 237 if match_status == "fuzzy": 238 LOGGER.warning( 239 "AI cited column '%s' is a fuzzy match for CSV header '%s' " 240 "(case/whitespace difference) in artifact %s.", 241 cited_col, 242 matched_header, 243 artifact_key, 244 ) 245 warnings.append( 246 f"Note: AI cited column '{cited_col}' is a fuzzy match for CSV header " 247 f"'{matched_header}' (case or whitespace difference)." 248 ) 249 elif match_status == "unverifiable": 250 warnings.append( 251 f"Note: AI cited column '{cited_col}' which does not match any column " 252 f"in the source data — citation is unverifiable." 253 ) 254 255 if warnings and audit_log_fn is not None: 256 audit_details: dict[str, object] = { 257 "artifact_key": artifact_key, 258 "citation_validation": "warnings_found", 259 "warning_count": len(warnings), 260 "warnings": warnings[:10], 261 } 262 if column_match_results: 263 audit_details["column_match_results"] = column_match_results[:10] 264 audit_log_fn("citation_validation", audit_details) 265 266 return warnings
145def validate_citations( 146 artifact_key: str, 147 analysis_text: str, 148 csv_path: Path, 149 citation_spot_check_limit: int, 150 audit_log_fn: Any = None, 151) -> list[str]: 152 """Spot-check timestamps, row references, and column names cited by the AI. 153 154 Extracts ISO timestamps, ``row <N>`` references, and column/field 155 name references from the AI's analysis text, then verifies each 156 against the source CSV data. 157 158 Args: 159 artifact_key: Artifact identifier used for logging. 160 analysis_text: The AI's analysis text to scan for citations. 161 csv_path: Path to the source CSV file. 162 citation_spot_check_limit: Maximum citations to validate per 163 category. 164 audit_log_fn: Optional callable ``(action, details)`` for audit 165 logging. 166 167 Returns: 168 A list of human-readable warning strings for values that could 169 not be verified. 170 """ 171 if analysis_text.startswith("Analysis failed:"): 172 return [] 173 174 cited_timestamps: list[str] = CITED_ISO_TIMESTAMP_RE.findall(analysis_text) 175 cited_row_refs: list[str] = CITED_ROW_REF_RE.findall(analysis_text) 176 177 cited_columns: list[str] = [] 178 for match in CITED_COLUMN_REF_RE.finditer(analysis_text): 179 cited_col = match.group(1) or match.group(2) or match.group(3) 180 if cited_col and cited_col.strip(): 181 cited_columns.append(cited_col.strip()) 182 seen_cols: set[str] = set() 183 unique_cited_columns: list[str] = [] 184 for col in cited_columns: 185 if col not in seen_cols: 186 seen_cols.add(col) 187 unique_cited_columns.append(col) 188 cited_columns = unique_cited_columns 189 190 if not cited_timestamps and not cited_row_refs and not cited_columns: 191 return [] 192 193 csv_timestamp_lookup: set[str] = set() 194 csv_row_refs: set[str] = set() 195 csv_columns: list[str] = [] 196 try: 197 with csv_path.open("r", newline="", encoding="utf-8-sig", errors="replace") as fh: 198 reader = csv.DictReader(fh) 199 csv_columns = [str(c) for c in (reader.fieldnames or []) if c not in (None, "")] 200 ts_columns = [c for c in csv_columns if looks_like_timestamp_column(c)] 201 has_row_ref_col = "row_ref" in csv_columns 202 for row_number, raw_row in enumerate(reader, start=1): 203 if has_row_ref_col: 204 ref_val = stringify_value(raw_row.get("row_ref")) 205 if ref_val: 206 csv_row_refs.add(ref_val) 207 else: 208 csv_row_refs.add(str(row_number)) 209 for col in ts_columns: 210 val = stringify_value(raw_row.get(col)) 211 if val: 212 csv_timestamp_lookup.update(timestamp_lookup_keys(val)) 213 except OSError: 214 return [] 215 216 warnings: list[str] = [] 217 column_match_results: list[dict[str, str]] = [] 218 219 for ts in cited_timestamps[:citation_spot_check_limit]: 220 if not timestamp_found_in_csv(ts, csv_timestamp_lookup): 221 warnings.append( 222 f"Note: AI cited timestamp {ts} which could not be verified in the source data." 223 ) 224 225 for ref in cited_row_refs[:citation_spot_check_limit]: 226 if ref not in csv_row_refs: 227 warnings.append( 228 f"Note: AI cited row {ref} which could not be verified in the source data." 229 ) 230 231 for cited_col in cited_columns[:citation_spot_check_limit]: 232 match_status, matched_header = match_column_name(cited_col, csv_columns) 233 column_match_results.append({ 234 "cited": cited_col, 235 "match_status": match_status, 236 "matched_header": matched_header or "", 237 }) 238 if match_status == "fuzzy": 239 LOGGER.warning( 240 "AI cited column '%s' is a fuzzy match for CSV header '%s' " 241 "(case/whitespace difference) in artifact %s.", 242 cited_col, 243 matched_header, 244 artifact_key, 245 ) 246 warnings.append( 247 f"Note: AI cited column '{cited_col}' is a fuzzy match for CSV header " 248 f"'{matched_header}' (case or whitespace difference)." 249 ) 250 elif match_status == "unverifiable": 251 warnings.append( 252 f"Note: AI cited column '{cited_col}' which does not match any column " 253 f"in the source data — citation is unverifiable." 254 ) 255 256 if warnings and audit_log_fn is not None: 257 audit_details: dict[str, object] = { 258 "artifact_key": artifact_key, 259 "citation_validation": "warnings_found", 260 "warning_count": len(warnings), 261 "warnings": warnings[:10], 262 } 263 if column_match_results: 264 audit_details["column_match_results"] = column_match_results[:10] 265 audit_log_fn("citation_validation", audit_details) 266 267 return warnings
Spot-check timestamps, row references, and column names cited by the AI.
Extracts ISO timestamps, row <N> references, and column/field
name references from the AI's analysis text, then verifies each
against the source CSV data.
Arguments:
- artifact_key: Artifact identifier used for logging.
- analysis_text: The AI's analysis text to scan for citations.
- csv_path: Path to the source CSV file.
- citation_spot_check_limit: Maximum citations to validate per category.
- audit_log_fn: Optional callable
(action, details)for audit logging.
Returns:
A list of human-readable warning strings for values that could not be verified.
32def timestamp_lookup_keys(value: str) -> set[str]: 33 """Build comparable lookup keys for a timestamp string. 34 35 Generates multiple normalized representations of the input timestamp 36 (with/without timezone, with/without fractional seconds, space vs ``T`` 37 separator) so that citation checks can match regardless of formatting. 38 39 Args: 40 value: Raw timestamp string from the CSV data. 41 42 Returns: 43 A set of non-empty string keys suitable for membership testing. 44 """ 45 text = value.strip() 46 if not text: 47 return set() 48 49 normalized = text.replace(" ", "T") 50 keys: set[str] = {text, normalized} 51 52 match = CITED_ISO_TIMESTAMP_RE.search(normalized) 53 if match: 54 token = match.group() 55 keys.add(token) 56 normalized_token = token.replace(" ", "T") 57 keys.add(normalized_token) 58 59 if normalized_token.endswith("Z"): 60 keys.add(f"{normalized_token[:-1]}+00:00") 61 62 token_without_tz = normalized_token 63 suffix = "" 64 if token_without_tz.endswith("Z"): 65 suffix = "Z" 66 token_without_tz = token_without_tz[:-1] 67 elif len(token_without_tz) >= 6 and token_without_tz[-6] in {"+", "-"} and token_without_tz[-3] == ":": 68 suffix = token_without_tz[-6:] 69 token_without_tz = token_without_tz[:-6] 70 71 if "." in token_without_tz: 72 base_seconds = token_without_tz.split(".", 1)[0] 73 keys.add(base_seconds) 74 if suffix: 75 keys.add(f"{base_seconds}{suffix}") 76 else: 77 keys.add(token_without_tz) 78 79 try: 80 parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00")) 81 except ValueError: 82 parsed = None 83 84 if parsed is not None: 85 if parsed.tzinfo is not None: 86 parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) 87 keys.add(parsed.isoformat(timespec="seconds")) 88 keys.add(parsed.isoformat(timespec="microseconds")) 89 90 return {key for key in keys if key}
Build comparable lookup keys for a timestamp string.
Generates multiple normalized representations of the input timestamp
(with/without timezone, with/without fractional seconds, space vs T
separator) so that citation checks can match regardless of formatting.
Arguments:
- value: Raw timestamp string from the CSV data.
Returns:
A set of non-empty string keys suitable for membership testing.
93def timestamp_found_in_csv(cited: str, csv_timestamp_lookup: set[str]) -> bool: 94 """Check whether a cited timestamp matches preloaded CSV timestamp lookup keys. 95 96 Args: 97 cited: Timestamp string cited by the AI in its analysis text. 98 csv_timestamp_lookup: Pre-built set of normalized timestamp keys 99 from the source CSV. 100 101 Returns: 102 ``True`` if any normalized form of *cited* is present in the 103 lookup set, ``False`` otherwise. 104 """ 105 if not csv_timestamp_lookup: 106 return False 107 return any(key in csv_timestamp_lookup for key in timestamp_lookup_keys(cited))
Check whether a cited timestamp matches preloaded CSV timestamp lookup keys.
Arguments:
- cited: Timestamp string cited by the AI in its analysis text.
- csv_timestamp_lookup: Pre-built set of normalized timestamp keys from the source CSV.
Returns:
Trueif any normalized form of cited is present in the lookup set,Falseotherwise.
110def match_column_name( 111 cited_column: str, csv_columns: list[str] 112) -> tuple[str, str | None]: 113 """Match an AI-cited column name against actual CSV headers. 114 115 Performs a three-tier match: exact, then case-insensitive with 116 whitespace/underscore normalization (fuzzy), then reports unverifiable. 117 118 Args: 119 cited_column: Column name string cited by the AI. 120 csv_columns: Actual CSV header column names. 121 122 Returns: 123 A 2-tuple of ``(match_status, matched_header)`` where 124 *match_status* is one of ``"exact"``, ``"fuzzy"``, or 125 ``"unverifiable"``. 126 """ 127 cited_stripped = cited_column.strip() 128 129 for header in csv_columns: 130 if header == cited_stripped: 131 return "exact", header 132 133 def _normalize_col(name: str) -> str: 134 """Normalize a column name for fuzzy comparison.""" 135 return name.strip().lower().replace("_", "").replace(" ", "") 136 137 cited_norm = _normalize_col(cited_stripped) 138 for header in csv_columns: 139 if _normalize_col(header) == cited_norm: 140 return "fuzzy", header 141 142 return "unverifiable", None
Match an AI-cited column name against actual CSV headers.
Performs a three-tier match: exact, then case-insensitive with whitespace/underscore normalization (fuzzy), then reports unverifiable.
Arguments:
- cited_column: Column name string cited by the AI.
- csv_columns: Actual CSV header column names.
Returns:
A 2-tuple of
(match_status, matched_header)where match_status is one of"exact","fuzzy", or"unverifiable".