app.chat.csv_retrieval
CSV data retrieval for chat-based forensic Q&A.
Provides heuristic matching of user questions to parsed artifact CSV files, reading and formatting relevant rows for injection into AI prompts so the model can answer data-specific queries.
Key responsibilities:
- Artifact matching -- Matches user questions against CSV filenames using generated aliases (stem, space-separated, base without part suffixes).
- Column matching -- Falls back to matching against CSV column headers when artifact-name matching finds nothing.
- Row sampling -- Reads up to a configurable limit of rows, compacting values and truncating long strings to keep prompt size manageable.
Attributes:
- CSV_RETRIEVAL_KEYWORDS: Tuple of lowercase keyword phrases that indicate the user is requesting raw data.
- CSV_ROW_LIMIT: Maximum number of CSV rows to include in a single retrieval response.
1"""CSV data retrieval for chat-based forensic Q&A. 2 3Provides heuristic matching of user questions to parsed artifact CSV 4files, reading and formatting relevant rows for injection into AI 5prompts so the model can answer data-specific queries. 6 7Key responsibilities: 8 9* **Artifact matching** -- Matches user questions against CSV filenames 10 using generated aliases (stem, space-separated, base without part 11 suffixes). 12* **Column matching** -- Falls back to matching against CSV column headers 13 when artifact-name matching finds nothing. 14* **Row sampling** -- Reads up to a configurable limit of rows, compacting 15 values and truncating long strings to keep prompt size manageable. 16 17Attributes: 18 CSV_RETRIEVAL_KEYWORDS: Tuple of lowercase keyword phrases that 19 indicate the user is requesting raw data. 20 CSV_ROW_LIMIT: Maximum number of CSV rows to include in a single 21 retrieval response. 22""" 23 24from __future__ import annotations 25 26import csv 27import logging 28import re 29from pathlib import Path 30from typing import Any 31 32__all__ = [ 33 "retrieve_csv_data", 34 "build_csv_aliases", 35 "contains_heuristic_term", 36] 37 38log = logging.getLogger(__name__) 39 40CSV_RETRIEVAL_KEYWORDS = ( 41 "show me", 42 "list", 43 "csv", 44 "rows", 45 "records", 46 "check the", 47 "look in", 48) 49 50CSV_ROW_LIMIT = 500 51 52 53def _stringify(value: Any, default: str = "") -> str: 54 """Convert *value* to a stripped string, returning *default* when empty. 55 56 Args: 57 value: Arbitrary value to stringify. 58 default: Fallback string when *value* is *None* or blank. 59 60 Returns: 61 The stripped string representation or *default*. 62 """ 63 text = str(value).strip() if value is not None else "" 64 return text or default 65 66 67def retrieve_csv_data( 68 question: str, 69 parsed_dir: str | Path, 70 row_limit: int = CSV_ROW_LIMIT, 71) -> dict[str, Any]: 72 """Best-effort retrieval of raw CSV rows for data-centric chat questions. 73 74 Heuristically matches the user's *question* against parsed artifact 75 CSV filenames and column headers. When a match is found, up to 76 *row_limit* rows are read and formatted as a structured text block 77 for injection into the AI prompt. 78 79 Args: 80 question: The user's chat question text. 81 parsed_dir: Path to the directory containing parsed artifact 82 CSV files. 83 row_limit: Maximum total rows to include across all matched 84 CSVs. Defaults to :data:`CSV_ROW_LIMIT`. 85 86 Returns: 87 A dictionary with a ``retrieved`` boolean. When *True*, also 88 includes ``artifacts`` (list of matched CSV filenames) and 89 ``data`` (formatted row text). 90 """ 91 question_text = _stringify(question) 92 if not question_text: 93 return {"retrieved": False} 94 95 parsed_path = Path(parsed_dir) 96 if not parsed_path.exists() or not parsed_path.is_dir(): 97 return {"retrieved": False} 98 99 csv_paths = sorted(path for path in parsed_path.glob("*.csv") if path.is_file()) 100 if not csv_paths: 101 return {"retrieved": False} 102 103 question_lower = question_text.lower() 104 keyword_detected = any(kw in question_lower for kw in CSV_RETRIEVAL_KEYWORDS) 105 106 target_paths = _match_target_paths(csv_paths, question_lower, keyword_detected) 107 if target_paths is None: 108 return {"retrieved": False} 109 110 target_paths = list(dict.fromkeys(target_paths)) 111 artifacts = [path.name for path in target_paths] 112 formatted_blocks: list[str] = [] 113 rows_remaining = row_limit 114 115 for csv_path in target_paths: 116 if rows_remaining <= 0: 117 break 118 headers, rows, total_row_count = _read_csv_rows(csv_path, limit=rows_remaining) 119 if not headers and not rows: 120 continue 121 122 rows_remaining -= len(rows) 123 formatted_blocks.append( 124 _format_csv_block(csv_path.name, headers, rows, total_row_count) 125 ) 126 127 if not formatted_blocks: 128 return { 129 "retrieved": True, 130 "artifacts": artifacts, 131 "data": "No readable rows found in selected CSV files.", 132 } 133 134 return { 135 "retrieved": True, 136 "artifacts": artifacts, 137 "data": "\n\n".join(formatted_blocks), 138 } 139 140 141def _match_target_paths( 142 csv_paths: list[Path], 143 question_lower: str, 144 keyword_detected: bool, 145) -> list[Path] | None: 146 """Determine which CSV files match the user's question. 147 148 Tries artifact-name matching first, then column-header matching, 149 then falls back to returning all CSVs if keywords were detected 150 and the collection is small. 151 152 Args: 153 csv_paths: Sorted list of available CSV file paths. 154 question_lower: Lowercased question text. 155 keyword_detected: Whether retrieval keywords were found in 156 the question. 157 158 Returns: 159 A list of matched paths, or *None* when no match is found. 160 """ 161 aliases_by_path = {path: build_csv_aliases(path) for path in csv_paths} 162 artifact_matches = [ 163 path 164 for path, aliases in aliases_by_path.items() 165 if any(contains_heuristic_term(question_lower, alias) for alias in aliases) 166 ] 167 168 if artifact_matches: 169 return artifact_matches 170 171 # Only scan CSV headers when artifact-name matching didn't find anything, 172 # to avoid reading every CSV file on every chat message. 173 headers_by_path = {path: _read_csv_headers(path) for path in csv_paths} 174 matched_columns = { 175 header.lower() 176 for headers in headers_by_path.values() 177 for header in headers 178 if contains_heuristic_term(question_lower, header.lower()) 179 } 180 if matched_columns: 181 return [ 182 path 183 for path, headers in headers_by_path.items() 184 if any(header.lower() in matched_columns for header in headers) 185 ] 186 187 if keyword_detected and len(csv_paths) <= 3: 188 return csv_paths 189 190 return None 191 192 193def build_csv_aliases(csv_path: Path) -> set[str]: 194 """Build a set of lowercase name aliases for a CSV file. 195 196 Aliases include the full filename, stem, space-separated stem, 197 base name (without ``_partN`` suffixes), and leading segments 198 before the first underscore. 199 200 Args: 201 csv_path: Path to the CSV file. 202 203 Returns: 204 A set of non-empty lowercase alias strings. 205 """ 206 stem = csv_path.stem.lower() 207 base = re.sub(r"_part\d+$", "", stem) 208 aliases = { 209 csv_path.name.lower(), 210 stem, 211 stem.replace("_", " "), 212 base, 213 base.replace("_", " "), 214 } 215 if "_" in stem: 216 aliases.add(stem.split("_", 1)[0]) 217 if "_" in base: 218 aliases.add(base.split("_", 1)[0]) 219 return {alias.strip() for alias in aliases if alias.strip()} 220 221 222def contains_heuristic_term(question_lower: str, term: str) -> bool: 223 """Check whether *term* appears as a distinct token in *question_lower*. 224 225 Uses a word-boundary regex so that short substrings do not 226 produce false positives. Terms shorter than 3 characters are 227 always rejected. 228 229 Args: 230 question_lower: Lowercased question text to search. 231 term: Candidate term to look for. 232 233 Returns: 234 *True* when *term* (>= 3 chars) appears on a word boundary 235 in *question_lower*. 236 """ 237 normalized = term.strip().lower() 238 if len(normalized) < 3: 239 return False 240 pattern = rf"(?<![a-z0-9]){re.escape(normalized)}(?![a-z0-9])" 241 return re.search(pattern, question_lower) is not None 242 243 244def _read_csv_headers(csv_path: Path) -> list[str]: 245 """Read and return the header row from a CSV file. 246 247 Args: 248 csv_path: Path to the CSV file. 249 250 Returns: 251 A list of non-empty, stripped header strings. Returns an 252 empty list on read failure. 253 """ 254 try: 255 with csv_path.open("r", encoding="utf-8-sig", newline="", errors="replace") as csv_stream: 256 header_row = next(csv.reader(csv_stream), []) 257 except Exception: 258 log.warning("Failed to read CSV headers from %s", csv_path, exc_info=True) 259 return [] 260 261 return [_stringify(h) for h in header_row if _stringify(h)] 262 263 264def _read_csv_rows( 265 csv_path: Path, 266 limit: int, 267) -> tuple[list[str], list[dict[str, str]], int]: 268 """Read up to *limit* data rows from a CSV file. 269 270 Values are whitespace-collapsed and truncated to 240 characters 271 to keep the resulting text compact for AI prompt injection. 272 273 After reading the sampled rows, the remainder of the file is 274 consumed (without storing data) to obtain an accurate total row 275 count. 276 277 Args: 278 csv_path: Path to the CSV file. 279 limit: Maximum number of data rows to read. 280 281 Returns: 282 A tuple of ``(headers, rows, total_row_count)`` where 283 *headers* is a list of column name strings, *rows* is a 284 list of ordered dictionaries mapping column names to string 285 values, and *total_row_count* is the total number of data 286 rows in the file (including those beyond *limit*). Returns 287 ``([], [], 0)`` on read failure or when *limit* is 288 non-positive. 289 """ 290 if limit <= 0: 291 return [], [], 0 292 293 try: 294 with csv_path.open("r", encoding="utf-8-sig", newline="", errors="replace") as csv_stream: 295 reader = csv.DictReader(csv_stream) 296 headers = [_stringify(field) for field in (reader.fieldnames or []) if _stringify(field)] 297 298 rows: list[dict[str, str]] = [] 299 total_row_count = 0 300 for row in reader: 301 total_row_count += 1 302 if len(rows) < limit: 303 compact_row: dict[str, str] = {} 304 for column in headers: 305 value = _stringify(row.get(column, "")) 306 value = re.sub(r"\s+", " ", value) 307 if len(value) > 240: 308 value = f"{value[:237]}..." 309 compact_row[column] = value 310 rows.append(compact_row) 311 except Exception: 312 log.warning("Failed to read CSV rows from %s", csv_path, exc_info=True) 313 return [], [], 0 314 315 return headers, rows, total_row_count 316 317 318def _format_csv_block( 319 filename: str, 320 headers: list[str], 321 rows: list[dict[str, str]], 322 total_row_count: int, 323) -> str: 324 """Format CSV data as a readable text block for AI prompt injection. 325 326 Args: 327 filename: The CSV filename for the block header. 328 headers: Column name strings. 329 rows: List of row dictionaries. 330 total_row_count: Total rows in the source file. 331 332 Returns: 333 A formatted multi-line text block. 334 """ 335 block_lines = [f"Artifact: {filename}"] 336 block_lines.append( 337 f"Total rows: {total_row_count}" 338 + (f" (showing first {len(rows)})" if len(rows) < total_row_count else "") 339 ) 340 if headers: 341 block_lines.append(f"Columns: {', '.join(headers)}") 342 if rows: 343 block_lines.append("Rows:") 344 for row_index, row in enumerate(rows, start=1): 345 parts = [f"{column}={value}" for column, value in row.items()] 346 block_lines.append(f"{row_index}. " + " | ".join(parts)) 347 else: 348 block_lines.append("Rows: none") 349 return "\n".join(block_lines)
68def retrieve_csv_data( 69 question: str, 70 parsed_dir: str | Path, 71 row_limit: int = CSV_ROW_LIMIT, 72) -> dict[str, Any]: 73 """Best-effort retrieval of raw CSV rows for data-centric chat questions. 74 75 Heuristically matches the user's *question* against parsed artifact 76 CSV filenames and column headers. When a match is found, up to 77 *row_limit* rows are read and formatted as a structured text block 78 for injection into the AI prompt. 79 80 Args: 81 question: The user's chat question text. 82 parsed_dir: Path to the directory containing parsed artifact 83 CSV files. 84 row_limit: Maximum total rows to include across all matched 85 CSVs. Defaults to :data:`CSV_ROW_LIMIT`. 86 87 Returns: 88 A dictionary with a ``retrieved`` boolean. When *True*, also 89 includes ``artifacts`` (list of matched CSV filenames) and 90 ``data`` (formatted row text). 91 """ 92 question_text = _stringify(question) 93 if not question_text: 94 return {"retrieved": False} 95 96 parsed_path = Path(parsed_dir) 97 if not parsed_path.exists() or not parsed_path.is_dir(): 98 return {"retrieved": False} 99 100 csv_paths = sorted(path for path in parsed_path.glob("*.csv") if path.is_file()) 101 if not csv_paths: 102 return {"retrieved": False} 103 104 question_lower = question_text.lower() 105 keyword_detected = any(kw in question_lower for kw in CSV_RETRIEVAL_KEYWORDS) 106 107 target_paths = _match_target_paths(csv_paths, question_lower, keyword_detected) 108 if target_paths is None: 109 return {"retrieved": False} 110 111 target_paths = list(dict.fromkeys(target_paths)) 112 artifacts = [path.name for path in target_paths] 113 formatted_blocks: list[str] = [] 114 rows_remaining = row_limit 115 116 for csv_path in target_paths: 117 if rows_remaining <= 0: 118 break 119 headers, rows, total_row_count = _read_csv_rows(csv_path, limit=rows_remaining) 120 if not headers and not rows: 121 continue 122 123 rows_remaining -= len(rows) 124 formatted_blocks.append( 125 _format_csv_block(csv_path.name, headers, rows, total_row_count) 126 ) 127 128 if not formatted_blocks: 129 return { 130 "retrieved": True, 131 "artifacts": artifacts, 132 "data": "No readable rows found in selected CSV files.", 133 } 134 135 return { 136 "retrieved": True, 137 "artifacts": artifacts, 138 "data": "\n\n".join(formatted_blocks), 139 }
Best-effort retrieval of raw CSV rows for data-centric chat questions.
Heuristically matches the user's question against parsed artifact CSV filenames and column headers. When a match is found, up to row_limit rows are read and formatted as a structured text block for injection into the AI prompt.
Arguments:
- question: The user's chat question text.
- parsed_dir: Path to the directory containing parsed artifact CSV files.
- row_limit: Maximum total rows to include across all matched
CSVs. Defaults to
CSV_ROW_LIMIT.
Returns:
A dictionary with a
retrievedboolean. When True, also includesartifacts(list of matched CSV filenames) anddata(formatted row text).
194def build_csv_aliases(csv_path: Path) -> set[str]: 195 """Build a set of lowercase name aliases for a CSV file. 196 197 Aliases include the full filename, stem, space-separated stem, 198 base name (without ``_partN`` suffixes), and leading segments 199 before the first underscore. 200 201 Args: 202 csv_path: Path to the CSV file. 203 204 Returns: 205 A set of non-empty lowercase alias strings. 206 """ 207 stem = csv_path.stem.lower() 208 base = re.sub(r"_part\d+$", "", stem) 209 aliases = { 210 csv_path.name.lower(), 211 stem, 212 stem.replace("_", " "), 213 base, 214 base.replace("_", " "), 215 } 216 if "_" in stem: 217 aliases.add(stem.split("_", 1)[0]) 218 if "_" in base: 219 aliases.add(base.split("_", 1)[0]) 220 return {alias.strip() for alias in aliases if alias.strip()}
Build a set of lowercase name aliases for a CSV file.
Aliases include the full filename, stem, space-separated stem,
base name (without _partN suffixes), and leading segments
before the first underscore.
Arguments:
- csv_path: Path to the CSV file.
Returns:
A set of non-empty lowercase alias strings.
223def contains_heuristic_term(question_lower: str, term: str) -> bool: 224 """Check whether *term* appears as a distinct token in *question_lower*. 225 226 Uses a word-boundary regex so that short substrings do not 227 produce false positives. Terms shorter than 3 characters are 228 always rejected. 229 230 Args: 231 question_lower: Lowercased question text to search. 232 term: Candidate term to look for. 233 234 Returns: 235 *True* when *term* (>= 3 chars) appears on a word boundary 236 in *question_lower*. 237 """ 238 normalized = term.strip().lower() 239 if len(normalized) < 3: 240 return False 241 pattern = rf"(?<![a-z0-9]){re.escape(normalized)}(?![a-z0-9])" 242 return re.search(pattern, question_lower) is not None
Check whether term appears as a distinct token in question_lower.
Uses a word-boundary regex so that short substrings do not produce false positives. Terms shorter than 3 characters are always rejected.
Arguments:
- question_lower: Lowercased question text to search.
- term: Candidate term to look for.
Returns:
True when term (>= 3 chars) appears on a word boundary in question_lower.