app.chat.csv_retrieval

CSV data retrieval for chat-based forensic Q&A.

Provides heuristic matching of user questions to parsed artifact CSV files, reading and formatting relevant rows for injection into AI prompts so the model can answer data-specific queries.

Key responsibilities:

  • Artifact matching -- Matches user questions against CSV filenames using generated aliases (stem, space-separated, base without part suffixes).
  • Column matching -- Falls back to matching against CSV column headers when artifact-name matching finds nothing.
  • Row sampling -- Reads up to a configurable limit of rows, compacting values and truncating long strings to keep prompt size manageable.
Attributes:
  • CSV_RETRIEVAL_KEYWORDS: Tuple of lowercase keyword phrases that indicate the user is requesting raw data.
  • CSV_ROW_LIMIT: Maximum number of CSV rows to include in a single retrieval response.
  1"""CSV data retrieval for chat-based forensic Q&A.
  2
  3Provides heuristic matching of user questions to parsed artifact CSV
  4files, reading and formatting relevant rows for injection into AI
  5prompts so the model can answer data-specific queries.
  6
  7Key responsibilities:
  8
  9* **Artifact matching** -- Matches user questions against CSV filenames
 10  using generated aliases (stem, space-separated, base without part
 11  suffixes).
 12* **Column matching** -- Falls back to matching against CSV column headers
 13  when artifact-name matching finds nothing.
 14* **Row sampling** -- Reads up to a configurable limit of rows, compacting
 15  values and truncating long strings to keep prompt size manageable.
 16
 17Attributes:
 18    CSV_RETRIEVAL_KEYWORDS: Tuple of lowercase keyword phrases that
 19        indicate the user is requesting raw data.
 20    CSV_ROW_LIMIT: Maximum number of CSV rows to include in a single
 21        retrieval response.
 22"""
 23
 24from __future__ import annotations
 25
 26import csv
 27import logging
 28import re
 29from pathlib import Path
 30from typing import Any
 31
 32__all__ = [
 33    "retrieve_csv_data",
 34    "build_csv_aliases",
 35    "contains_heuristic_term",
 36]
 37
 38log = logging.getLogger(__name__)
 39
 40CSV_RETRIEVAL_KEYWORDS = (
 41    "show me",
 42    "list",
 43    "csv",
 44    "rows",
 45    "records",
 46    "check the",
 47    "look in",
 48)
 49
 50CSV_ROW_LIMIT = 500
 51
 52
 53def _stringify(value: Any, default: str = "") -> str:
 54    """Convert *value* to a stripped string, returning *default* when empty.
 55
 56    Args:
 57        value: Arbitrary value to stringify.
 58        default: Fallback string when *value* is *None* or blank.
 59
 60    Returns:
 61        The stripped string representation or *default*.
 62    """
 63    text = str(value).strip() if value is not None else ""
 64    return text or default
 65
 66
 67def retrieve_csv_data(
 68    question: str,
 69    parsed_dir: str | Path,
 70    row_limit: int = CSV_ROW_LIMIT,
 71) -> dict[str, Any]:
 72    """Best-effort retrieval of raw CSV rows for data-centric chat questions.
 73
 74    Heuristically matches the user's *question* against parsed artifact
 75    CSV filenames and column headers.  When a match is found, up to
 76    *row_limit* rows are read and formatted as a structured text block
 77    for injection into the AI prompt.
 78
 79    Args:
 80        question: The user's chat question text.
 81        parsed_dir: Path to the directory containing parsed artifact
 82            CSV files.
 83        row_limit: Maximum total rows to include across all matched
 84            CSVs.  Defaults to :data:`CSV_ROW_LIMIT`.
 85
 86    Returns:
 87        A dictionary with a ``retrieved`` boolean.  When *True*, also
 88        includes ``artifacts`` (list of matched CSV filenames) and
 89        ``data`` (formatted row text).
 90    """
 91    question_text = _stringify(question)
 92    if not question_text:
 93        return {"retrieved": False}
 94
 95    parsed_path = Path(parsed_dir)
 96    if not parsed_path.exists() or not parsed_path.is_dir():
 97        return {"retrieved": False}
 98
 99    csv_paths = sorted(path for path in parsed_path.glob("*.csv") if path.is_file())
100    if not csv_paths:
101        return {"retrieved": False}
102
103    question_lower = question_text.lower()
104    keyword_detected = any(kw in question_lower for kw in CSV_RETRIEVAL_KEYWORDS)
105
106    target_paths = _match_target_paths(csv_paths, question_lower, keyword_detected)
107    if target_paths is None:
108        return {"retrieved": False}
109
110    target_paths = list(dict.fromkeys(target_paths))
111    artifacts = [path.name for path in target_paths]
112    formatted_blocks: list[str] = []
113    rows_remaining = row_limit
114
115    for csv_path in target_paths:
116        if rows_remaining <= 0:
117            break
118        headers, rows, total_row_count = _read_csv_rows(csv_path, limit=rows_remaining)
119        if not headers and not rows:
120            continue
121
122        rows_remaining -= len(rows)
123        formatted_blocks.append(
124            _format_csv_block(csv_path.name, headers, rows, total_row_count)
125        )
126
127    if not formatted_blocks:
128        return {
129            "retrieved": True,
130            "artifacts": artifacts,
131            "data": "No readable rows found in selected CSV files.",
132        }
133
134    return {
135        "retrieved": True,
136        "artifacts": artifacts,
137        "data": "\n\n".join(formatted_blocks),
138    }
139
140
141def _match_target_paths(
142    csv_paths: list[Path],
143    question_lower: str,
144    keyword_detected: bool,
145) -> list[Path] | None:
146    """Determine which CSV files match the user's question.
147
148    Tries artifact-name matching first, then column-header matching,
149    then falls back to returning all CSVs if keywords were detected
150    and the collection is small.
151
152    Args:
153        csv_paths: Sorted list of available CSV file paths.
154        question_lower: Lowercased question text.
155        keyword_detected: Whether retrieval keywords were found in
156            the question.
157
158    Returns:
159        A list of matched paths, or *None* when no match is found.
160    """
161    aliases_by_path = {path: build_csv_aliases(path) for path in csv_paths}
162    artifact_matches = [
163        path
164        for path, aliases in aliases_by_path.items()
165        if any(contains_heuristic_term(question_lower, alias) for alias in aliases)
166    ]
167
168    if artifact_matches:
169        return artifact_matches
170
171    # Only scan CSV headers when artifact-name matching didn't find anything,
172    # to avoid reading every CSV file on every chat message.
173    headers_by_path = {path: _read_csv_headers(path) for path in csv_paths}
174    matched_columns = {
175        header.lower()
176        for headers in headers_by_path.values()
177        for header in headers
178        if contains_heuristic_term(question_lower, header.lower())
179    }
180    if matched_columns:
181        return [
182            path
183            for path, headers in headers_by_path.items()
184            if any(header.lower() in matched_columns for header in headers)
185        ]
186
187    if keyword_detected and len(csv_paths) <= 3:
188        return csv_paths
189
190    return None
191
192
193def build_csv_aliases(csv_path: Path) -> set[str]:
194    """Build a set of lowercase name aliases for a CSV file.
195
196    Aliases include the full filename, stem, space-separated stem,
197    base name (without ``_partN`` suffixes), and leading segments
198    before the first underscore.
199
200    Args:
201        csv_path: Path to the CSV file.
202
203    Returns:
204        A set of non-empty lowercase alias strings.
205    """
206    stem = csv_path.stem.lower()
207    base = re.sub(r"_part\d+$", "", stem)
208    aliases = {
209        csv_path.name.lower(),
210        stem,
211        stem.replace("_", " "),
212        base,
213        base.replace("_", " "),
214    }
215    if "_" in stem:
216        aliases.add(stem.split("_", 1)[0])
217    if "_" in base:
218        aliases.add(base.split("_", 1)[0])
219    return {alias.strip() for alias in aliases if alias.strip()}
220
221
222def contains_heuristic_term(question_lower: str, term: str) -> bool:
223    """Check whether *term* appears as a distinct token in *question_lower*.
224
225    Uses a word-boundary regex so that short substrings do not
226    produce false positives.  Terms shorter than 3 characters are
227    always rejected.
228
229    Args:
230        question_lower: Lowercased question text to search.
231        term: Candidate term to look for.
232
233    Returns:
234        *True* when *term* (>= 3 chars) appears on a word boundary
235        in *question_lower*.
236    """
237    normalized = term.strip().lower()
238    if len(normalized) < 3:
239        return False
240    pattern = rf"(?<![a-z0-9]){re.escape(normalized)}(?![a-z0-9])"
241    return re.search(pattern, question_lower) is not None
242
243
244def _read_csv_headers(csv_path: Path) -> list[str]:
245    """Read and return the header row from a CSV file.
246
247    Args:
248        csv_path: Path to the CSV file.
249
250    Returns:
251        A list of non-empty, stripped header strings.  Returns an
252        empty list on read failure.
253    """
254    try:
255        with csv_path.open("r", encoding="utf-8-sig", newline="", errors="replace") as csv_stream:
256            header_row = next(csv.reader(csv_stream), [])
257    except Exception:
258        log.warning("Failed to read CSV headers from %s", csv_path, exc_info=True)
259        return []
260
261    return [_stringify(h) for h in header_row if _stringify(h)]
262
263
264def _read_csv_rows(
265    csv_path: Path,
266    limit: int,
267) -> tuple[list[str], list[dict[str, str]], int]:
268    """Read up to *limit* data rows from a CSV file.
269
270    Values are whitespace-collapsed and truncated to 240 characters
271    to keep the resulting text compact for AI prompt injection.
272
273    After reading the sampled rows, the remainder of the file is
274    consumed (without storing data) to obtain an accurate total row
275    count.
276
277    Args:
278        csv_path: Path to the CSV file.
279        limit: Maximum number of data rows to read.
280
281    Returns:
282        A tuple of ``(headers, rows, total_row_count)`` where
283        *headers* is a list of column name strings, *rows* is a
284        list of ordered dictionaries mapping column names to string
285        values, and *total_row_count* is the total number of data
286        rows in the file (including those beyond *limit*).  Returns
287        ``([], [], 0)`` on read failure or when *limit* is
288        non-positive.
289    """
290    if limit <= 0:
291        return [], [], 0
292
293    try:
294        with csv_path.open("r", encoding="utf-8-sig", newline="", errors="replace") as csv_stream:
295            reader = csv.DictReader(csv_stream)
296            headers = [_stringify(field) for field in (reader.fieldnames or []) if _stringify(field)]
297
298            rows: list[dict[str, str]] = []
299            total_row_count = 0
300            for row in reader:
301                total_row_count += 1
302                if len(rows) < limit:
303                    compact_row: dict[str, str] = {}
304                    for column in headers:
305                        value = _stringify(row.get(column, ""))
306                        value = re.sub(r"\s+", " ", value)
307                        if len(value) > 240:
308                            value = f"{value[:237]}..."
309                        compact_row[column] = value
310                    rows.append(compact_row)
311    except Exception:
312        log.warning("Failed to read CSV rows from %s", csv_path, exc_info=True)
313        return [], [], 0
314
315    return headers, rows, total_row_count
316
317
318def _format_csv_block(
319    filename: str,
320    headers: list[str],
321    rows: list[dict[str, str]],
322    total_row_count: int,
323) -> str:
324    """Format CSV data as a readable text block for AI prompt injection.
325
326    Args:
327        filename: The CSV filename for the block header.
328        headers: Column name strings.
329        rows: List of row dictionaries.
330        total_row_count: Total rows in the source file.
331
332    Returns:
333        A formatted multi-line text block.
334    """
335    block_lines = [f"Artifact: {filename}"]
336    block_lines.append(
337        f"Total rows: {total_row_count}"
338        + (f" (showing first {len(rows)})" if len(rows) < total_row_count else "")
339    )
340    if headers:
341        block_lines.append(f"Columns: {', '.join(headers)}")
342    if rows:
343        block_lines.append("Rows:")
344        for row_index, row in enumerate(rows, start=1):
345            parts = [f"{column}={value}" for column, value in row.items()]
346            block_lines.append(f"{row_index}. " + " | ".join(parts))
347    else:
348        block_lines.append("Rows: none")
349    return "\n".join(block_lines)
def retrieve_csv_data( question: str, parsed_dir: str | pathlib.Path, row_limit: int = 500) -> dict[str, typing.Any]:
 68def retrieve_csv_data(
 69    question: str,
 70    parsed_dir: str | Path,
 71    row_limit: int = CSV_ROW_LIMIT,
 72) -> dict[str, Any]:
 73    """Best-effort retrieval of raw CSV rows for data-centric chat questions.
 74
 75    Heuristically matches the user's *question* against parsed artifact
 76    CSV filenames and column headers.  When a match is found, up to
 77    *row_limit* rows are read and formatted as a structured text block
 78    for injection into the AI prompt.
 79
 80    Args:
 81        question: The user's chat question text.
 82        parsed_dir: Path to the directory containing parsed artifact
 83            CSV files.
 84        row_limit: Maximum total rows to include across all matched
 85            CSVs.  Defaults to :data:`CSV_ROW_LIMIT`.
 86
 87    Returns:
 88        A dictionary with a ``retrieved`` boolean.  When *True*, also
 89        includes ``artifacts`` (list of matched CSV filenames) and
 90        ``data`` (formatted row text).
 91    """
 92    question_text = _stringify(question)
 93    if not question_text:
 94        return {"retrieved": False}
 95
 96    parsed_path = Path(parsed_dir)
 97    if not parsed_path.exists() or not parsed_path.is_dir():
 98        return {"retrieved": False}
 99
100    csv_paths = sorted(path for path in parsed_path.glob("*.csv") if path.is_file())
101    if not csv_paths:
102        return {"retrieved": False}
103
104    question_lower = question_text.lower()
105    keyword_detected = any(kw in question_lower for kw in CSV_RETRIEVAL_KEYWORDS)
106
107    target_paths = _match_target_paths(csv_paths, question_lower, keyword_detected)
108    if target_paths is None:
109        return {"retrieved": False}
110
111    target_paths = list(dict.fromkeys(target_paths))
112    artifacts = [path.name for path in target_paths]
113    formatted_blocks: list[str] = []
114    rows_remaining = row_limit
115
116    for csv_path in target_paths:
117        if rows_remaining <= 0:
118            break
119        headers, rows, total_row_count = _read_csv_rows(csv_path, limit=rows_remaining)
120        if not headers and not rows:
121            continue
122
123        rows_remaining -= len(rows)
124        formatted_blocks.append(
125            _format_csv_block(csv_path.name, headers, rows, total_row_count)
126        )
127
128    if not formatted_blocks:
129        return {
130            "retrieved": True,
131            "artifacts": artifacts,
132            "data": "No readable rows found in selected CSV files.",
133        }
134
135    return {
136        "retrieved": True,
137        "artifacts": artifacts,
138        "data": "\n\n".join(formatted_blocks),
139    }

Best-effort retrieval of raw CSV rows for data-centric chat questions.

Heuristically matches the user's question against parsed artifact CSV filenames and column headers. When a match is found, up to row_limit rows are read and formatted as a structured text block for injection into the AI prompt.

Arguments:
  • question: The user's chat question text.
  • parsed_dir: Path to the directory containing parsed artifact CSV files.
  • row_limit: Maximum total rows to include across all matched CSVs. Defaults to CSV_ROW_LIMIT.
Returns:

A dictionary with a retrieved boolean. When True, also includes artifacts (list of matched CSV filenames) and data (formatted row text).

def build_csv_aliases(csv_path: pathlib.Path) -> set[str]:
194def build_csv_aliases(csv_path: Path) -> set[str]:
195    """Build a set of lowercase name aliases for a CSV file.
196
197    Aliases include the full filename, stem, space-separated stem,
198    base name (without ``_partN`` suffixes), and leading segments
199    before the first underscore.
200
201    Args:
202        csv_path: Path to the CSV file.
203
204    Returns:
205        A set of non-empty lowercase alias strings.
206    """
207    stem = csv_path.stem.lower()
208    base = re.sub(r"_part\d+$", "", stem)
209    aliases = {
210        csv_path.name.lower(),
211        stem,
212        stem.replace("_", " "),
213        base,
214        base.replace("_", " "),
215    }
216    if "_" in stem:
217        aliases.add(stem.split("_", 1)[0])
218    if "_" in base:
219        aliases.add(base.split("_", 1)[0])
220    return {alias.strip() for alias in aliases if alias.strip()}

Build a set of lowercase name aliases for a CSV file.

Aliases include the full filename, stem, space-separated stem, base name (without _partN suffixes), and leading segments before the first underscore.

Arguments:
  • csv_path: Path to the CSV file.
Returns:

A set of non-empty lowercase alias strings.

def contains_heuristic_term(question_lower: str, term: str) -> bool:
223def contains_heuristic_term(question_lower: str, term: str) -> bool:
224    """Check whether *term* appears as a distinct token in *question_lower*.
225
226    Uses a word-boundary regex so that short substrings do not
227    produce false positives.  Terms shorter than 3 characters are
228    always rejected.
229
230    Args:
231        question_lower: Lowercased question text to search.
232        term: Candidate term to look for.
233
234    Returns:
235        *True* when *term* (>= 3 chars) appears on a word boundary
236        in *question_lower*.
237    """
238    normalized = term.strip().lower()
239    if len(normalized) < 3:
240        return False
241    pattern = rf"(?<![a-z0-9]){re.escape(normalized)}(?![a-z0-9])"
242    return re.search(pattern, question_lower) is not None

Check whether term appears as a distinct token in question_lower.

Uses a word-boundary regex so that short substrings do not produce false positives. Terms shorter than 3 characters are always rejected.

Arguments:
  • question_lower: Lowercased question text to search.
  • term: Candidate term to look for.
Returns:

True when term (>= 3 chars) appears on a word boundary in question_lower.