app.analyzer.chunking

Chunked analysis and hierarchical merge for large artifact datasets.

When artifact CSV data exceeds the AI model's context window, this module splits the data into row-boundary-aligned chunks, analyses each chunk independently, and hierarchically merges the per-chunk findings via additional AI calls until a single consolidated analysis remains.

Attributes:
  • LOGGER: Module-level logger instance.
  1"""Chunked analysis and hierarchical merge for large artifact datasets.
  2
  3When artifact CSV data exceeds the AI model's context window, this module
  4splits the data into row-boundary-aligned chunks, analyses each chunk
  5independently, and hierarchically merges the per-chunk findings via
  6additional AI calls until a single consolidated analysis remains.
  7
  8Attributes:
  9    LOGGER: Module-level logger instance.
 10"""
 11
 12from __future__ import annotations
 13
 14import csv
 15import io
 16import logging
 17from typing import Any
 18
 19from .constants import CSV_DATA_SECTION_RE, CSV_TRAILING_FENCE_RE
 20from .utils import sanitize_filename, emit_analysis_progress
 21
 22LOGGER = logging.getLogger(__name__)
 23
 24__all__ = [
 25    "analyze_artifact_chunked",
 26    "split_csv_and_suffix",
 27    "split_csv_into_chunks",
 28]
 29
 30
 31def _serialize_row(row: list[str]) -> str:
 32    """Serialize a single parsed CSV row back to a CSV string.
 33
 34    Uses the ``csv`` module so that fields containing commas, quotes,
 35    or newlines are properly quoted.
 36
 37    Args:
 38        row: List of field values.
 39
 40    Returns:
 41        A single CSV line (without trailing newline).
 42    """
 43    buf = io.StringIO()
 44    writer = csv.writer(buf)
 45    writer.writerow(row)
 46    return buf.getvalue().rstrip("\r\n")
 47
 48
 49def split_csv_into_chunks(csv_text: str, max_chars: int) -> list[str]:
 50    """Split CSV text into chunks that each fit within *max_chars*.
 51
 52    Parsing is done via the ``csv`` module so that quoted fields with
 53    embedded newlines are kept intact as single records.  Every chunk
 54    retains the original header row.
 55
 56    Args:
 57        csv_text: Full CSV text including the header row.
 58        max_chars: Maximum character count per chunk (including header).
 59
 60    Returns:
 61        A list of CSV text chunks, each starting with the header row.
 62    """
 63    if max_chars <= 0 or len(csv_text) <= max_chars:
 64        return [csv_text]
 65
 66    reader = csv.reader(io.StringIO(csv_text))
 67    try:
 68        header_fields = next(reader)
 69    except StopIteration:
 70        return [csv_text]
 71
 72    header_line = _serialize_row(header_fields)
 73
 74    data_rows: list[str] = []
 75    for row in reader:
 76        data_rows.append(_serialize_row(row))
 77
 78    if not data_rows:
 79        return [csv_text]
 80
 81    header_overhead = len(header_line) + 1  # +1 for the joining newline
 82    chunk_data_budget = max_chars - header_overhead
 83    if chunk_data_budget <= 0:
 84        return [csv_text]
 85
 86    chunks: list[str] = []
 87    current_rows: list[str] = []
 88    current_size = 0
 89
 90    for serialized_row in data_rows:
 91        row_size = len(serialized_row) + 1  # +1 for joining newline
 92        if current_rows and current_size + row_size > chunk_data_budget:
 93            chunks.append(header_line + "\n" + "\n".join(current_rows))
 94            current_rows = []
 95            current_size = 0
 96        current_rows.append(serialized_row)
 97        current_size += row_size
 98
 99    if current_rows:
100        chunks.append(header_line + "\n" + "\n".join(current_rows))
101
102    return chunks if chunks else [csv_text]
103
104
105def split_csv_and_suffix(raw_csv_tail: str) -> tuple[str, str]:
106    """Separate CSV rows from trailing content in a rendered prompt.
107
108    File-based templates may append a Markdown code fence and/or a
109    Final Context Reminder section after the CSV data placeholder.
110    This method extracts the actual CSV rows from those trailing
111    elements so that only the data is chunked, while the suffix is
112    appended to every chunk prompt.
113
114    Args:
115        raw_csv_tail: The portion of the rendered prompt that follows
116            the ``## Full Data (CSV)`` heading.
117
118    Returns:
119        A ``(csv_data, suffix)`` tuple.
120    """
121    text = raw_csv_tail
122
123    reminder_marker = "## Final Context Reminder"
124    reminder_pos = text.find(reminder_marker)
125    context_suffix = ""
126    if reminder_pos >= 0:
127        context_suffix = "\n\n" + text[reminder_pos:].strip()
128        text = text[:reminder_pos]
129
130    trailing_fence = ""
131    fence_match = CSV_TRAILING_FENCE_RE.search(text)
132    if fence_match:
133        trailing_fence = fence_match.group()
134        text = text[: fence_match.start()]
135
136    csv_data = text.strip()
137
138    suffix = ""
139    if trailing_fence:
140        suffix += trailing_fence
141    if context_suffix:
142        suffix += context_suffix
143    return csv_data, suffix
144
145
146def analyze_artifact_chunked(
147    artifact_prompt: str,
148    artifact_key: str,
149    artifact_name: str,
150    investigation_context: str,
151    model: str,
152    *,
153    system_prompt: str,
154    ai_response_max_tokens: int,
155    chunk_csv_budget: int,
156    chunk_merge_prompt_template: str,
157    max_merge_rounds: int,
158    call_ai_with_retry_fn: Any,
159    ai_provider: Any,
160    audit_log_fn: Any = None,
161    save_case_prompt_fn: Any = None,
162    progress_callback: Any | None = None,
163) -> str:
164    """Analyze an artifact in multiple chunks when data exceeds context budget.
165
166    Splits the CSV portion of the prompt into row-boundary-aligned
167    chunks, analyzes each independently via the AI provider, then
168    merges the per-chunk findings hierarchically.
169
170    Args:
171        artifact_prompt: The fully rendered artifact analysis prompt.
172        artifact_key: Unique identifier for the artifact.
173        artifact_name: Human-readable artifact name.
174        investigation_context: The user's investigation context text.
175        model: AI model identifier for progress reporting.
176        system_prompt: The system prompt sent to the AI provider.
177        ai_response_max_tokens: Token budget for the AI response.
178        chunk_csv_budget: Character budget for CSV data per chunk.
179        chunk_merge_prompt_template: Template for merging chunk findings.
180        max_merge_rounds: Maximum hierarchical merge iterations.
181        call_ai_with_retry_fn: Callable wrapping AI calls with retry.
182        ai_provider: The AI provider instance.
183        audit_log_fn: Optional callable ``(action, details)`` for audit.
184        save_case_prompt_fn: Optional callable ``(filename, system, user)``
185            for saving prompts.
186        progress_callback: Optional callback for streaming progress.
187
188    Returns:
189        The merged analysis text from all chunks.
190    """
191    marker_match = CSV_DATA_SECTION_RE.search(artifact_prompt)
192    if marker_match is None:
193        return call_ai_with_retry_fn(
194            lambda: ai_provider.analyze(
195                system_prompt=system_prompt,
196                user_prompt=artifact_prompt,
197                max_tokens=ai_response_max_tokens,
198            )
199        )
200
201    instructions_portion = artifact_prompt[: marker_match.end()]
202    raw_csv_tail = artifact_prompt[marker_match.end():]
203
204    csv_data, context_suffix = split_csv_and_suffix(raw_csv_tail)
205
206    suffix_chars = len(context_suffix)
207    instructions_chars = len(instructions_portion) + len(system_prompt) + suffix_chars
208    csv_budget = max(1000, chunk_csv_budget - instructions_chars)
209
210    chunks = split_csv_into_chunks(csv_data, csv_budget)
211    total_chunks = len(chunks)
212
213    if total_chunks <= 1:
214        return call_ai_with_retry_fn(
215            lambda: ai_provider.analyze(
216                system_prompt=system_prompt,
217                user_prompt=artifact_prompt,
218                max_tokens=ai_response_max_tokens,
219            )
220        )
221
222    LOGGER.info(
223        "Chunked analysis for %s: splitting into %d chunks (budget %d chars/chunk).",
224        artifact_key, total_chunks, csv_budget,
225    )
226    if audit_log_fn is not None:
227        audit_log_fn(
228            "chunked_analysis_started",
229            {
230                "artifact_key": artifact_key,
231                "total_chunks": total_chunks,
232                "csv_budget_per_chunk": csv_budget,
233            },
234        )
235
236    chunk_findings: list[str] = []
237    for chunk_index, chunk_csv in enumerate(chunks, start=1):
238        chunk_prompt = f"{instructions_portion}{chunk_csv}{context_suffix}"
239        chunk_label = f"chunk {chunk_index}/{total_chunks}"
240
241        if progress_callback is not None:
242            emit_analysis_progress(
243                progress_callback, artifact_key, "thinking",
244                {
245                    "artifact_key": artifact_key,
246                    "artifact_name": artifact_name,
247                    "thinking_text": f"Analyzing {chunk_label}...",
248                    "partial_text": "",
249                    "model": model,
250                },
251            )
252
253        safe_key = sanitize_filename(artifact_key)
254        if save_case_prompt_fn is not None:
255            save_case_prompt_fn(
256                f"artifact_{safe_key}_chunk_{chunk_index}.md",
257                system_prompt,
258                chunk_prompt,
259            )
260
261        LOGGER.info("Analyzing %s %s...", artifact_key, chunk_label)
262        chunk_text = call_ai_with_retry_fn(
263            lambda prompt=chunk_prompt: ai_provider.analyze(
264                system_prompt=system_prompt,
265                user_prompt=prompt,
266                max_tokens=ai_response_max_tokens,
267            )
268        )
269        chunk_findings.append(f"### Chunk {chunk_index} of {total_chunks}\n{chunk_text}")
270
271    merged_text = _hierarchical_merge_findings(
272        chunk_findings=chunk_findings,
273        artifact_key=artifact_key,
274        artifact_name=artifact_name,
275        investigation_context=investigation_context,
276        model=model,
277        system_prompt=system_prompt,
278        ai_response_max_tokens=ai_response_max_tokens,
279        chunk_csv_budget=chunk_csv_budget,
280        chunk_merge_prompt_template=chunk_merge_prompt_template,
281        max_merge_rounds=max_merge_rounds,
282        call_ai_with_retry_fn=call_ai_with_retry_fn,
283        ai_provider=ai_provider,
284        save_case_prompt_fn=save_case_prompt_fn,
285        progress_callback=progress_callback,
286    )
287    LOGGER.info(
288        "Chunked analysis for %s complete: %d chunks merged.",
289        artifact_key, total_chunks,
290    )
291    return merged_text
292
293
294def _build_merge_prompt(
295    findings_text: str,
296    batch_count: int,
297    artifact_key: str,
298    artifact_name: str,
299    investigation_context: str,
300    chunk_merge_prompt_template: str,
301) -> str:
302    """Fill the chunk-merge template with the given findings.
303
304    Args:
305        findings_text: Combined text of per-chunk findings to merge.
306        batch_count: Number of chunks/batches.
307        artifact_key: Unique identifier for the artifact.
308        artifact_name: Human-readable artifact name.
309        investigation_context: The user's investigation context text.
310        chunk_merge_prompt_template: The merge template string.
311
312    Returns:
313        The fully rendered merge prompt string.
314    """
315    prompt = chunk_merge_prompt_template
316    for placeholder, value in {
317        "chunk_count": str(batch_count),
318        "investigation_context": investigation_context.strip() or "No investigation context provided.",
319        "artifact_name": artifact_name,
320        "artifact_key": artifact_key,
321        "per_chunk_findings": findings_text,
322    }.items():
323        prompt = prompt.replace(f"{{{{{placeholder}}}}}", value)
324    return prompt
325
326
327def _hierarchical_merge_findings(
328    chunk_findings: list[str],
329    artifact_key: str,
330    artifact_name: str,
331    investigation_context: str,
332    model: str,
333    *,
334    system_prompt: str,
335    ai_response_max_tokens: int,
336    chunk_csv_budget: int,
337    chunk_merge_prompt_template: str,
338    max_merge_rounds: int,
339    call_ai_with_retry_fn: Any,
340    ai_provider: Any,
341    save_case_prompt_fn: Any = None,
342    progress_callback: Any | None = None,
343) -> str:
344    """Merge chunk findings hierarchically until one result remains.
345
346    Args:
347        chunk_findings: List of per-chunk finding texts to merge.
348        artifact_key: Unique identifier for the artifact.
349        artifact_name: Human-readable artifact name.
350        investigation_context: The user's investigation context text.
351        model: AI model identifier for progress reporting.
352        system_prompt: The system prompt sent to the AI provider.
353        ai_response_max_tokens: Token budget for the AI response.
354        chunk_csv_budget: Character budget for CSV data per chunk.
355        chunk_merge_prompt_template: Template for merging findings.
356        max_merge_rounds: Maximum merge iterations.
357        call_ai_with_retry_fn: Callable wrapping AI calls with retry.
358        ai_provider: The AI provider instance.
359        save_case_prompt_fn: Optional callable for saving prompts.
360        progress_callback: Optional callback for streaming progress.
361
362    Returns:
363        A single merged analysis text.
364    """
365    overhead = len(chunk_merge_prompt_template) + len(system_prompt) + 500
366    findings_budget = max(2000, chunk_csv_budget - overhead)
367    current_findings = list(chunk_findings)
368    merge_round = 0
369
370    while len(current_findings) > 1:
371        merge_round += 1
372
373        if merge_round > max_merge_rounds:
374            LOGGER.warning(
375                "Hierarchical merge for %s hit %d-round limit with %d findings remaining. "
376                "Falling back to concatenation.",
377                artifact_key, max_merge_rounds, len(current_findings),
378            )
379            if progress_callback is not None:
380                emit_analysis_progress(
381                    progress_callback, artifact_key, "thinking",
382                    {
383                        "artifact_key": artifact_key,
384                        "artifact_name": artifact_name,
385                        "thinking_text": (
386                            f"Merge round limit reached ({max_merge_rounds}). "
387                            f"Concatenating {len(current_findings)} remaining findings..."
388                        ),
389                        "partial_text": "",
390                        "model": model,
391                    },
392                )
393            total_chars = sum(len(f) for f in current_findings)
394            if total_chars > findings_budget:
395                per_finding_budget = max(200, findings_budget // len(current_findings))
396                capped = []
397                for f in current_findings:
398                    if len(f) > per_finding_budget:
399                        capped.append(f[:per_finding_budget] + "\n[... truncated ...]")
400                    else:
401                        capped.append(f)
402                concatenated = "\n\n".join(capped)
403            else:
404                concatenated = "\n\n".join(current_findings)
405
406            merge_prompt = _build_merge_prompt(
407                findings_text=concatenated,
408                batch_count=len(current_findings),
409                artifact_key=artifact_key,
410                artifact_name=artifact_name,
411                investigation_context=investigation_context,
412                chunk_merge_prompt_template=chunk_merge_prompt_template,
413            )
414            safe_key = sanitize_filename(artifact_key)
415            if save_case_prompt_fn is not None:
416                save_case_prompt_fn(
417                    f"artifact_{safe_key}_merge_fallback.md",
418                    system_prompt,
419                    merge_prompt,
420                )
421            return call_ai_with_retry_fn(
422                lambda prompt=merge_prompt: ai_provider.analyze(
423                    system_prompt=system_prompt,
424                    user_prompt=prompt,
425                    max_tokens=ai_response_max_tokens,
426                )
427            )
428
429        batches: list[list[str]] = []
430        current_batch: list[str] = []
431        current_batch_size = 0
432
433        for finding in current_findings:
434            entry_size = len(finding) + 2
435            if current_batch and current_batch_size + entry_size > findings_budget:
436                batches.append(current_batch)
437                current_batch = []
438                current_batch_size = 0
439            current_batch.append(finding)
440            current_batch_size += entry_size
441
442        if current_batch:
443            batches.append(current_batch)
444
445        if len(batches) == 1 and merge_round == 1:
446            pass
447
448        if len(batches) >= len(current_findings):
449            batches = [current_findings]
450
451        total_batches = len(batches)
452        label_prefix = f"merge round {merge_round}" if merge_round > 1 else "merge"
453
454        LOGGER.info(
455            "Hierarchical %s for %s: %d batches from %d findings (budget %d chars).",
456            label_prefix, artifact_key, total_batches,
457            len(current_findings), findings_budget,
458        )
459
460        if progress_callback is not None:
461            emit_analysis_progress(
462                progress_callback, artifact_key, "thinking",
463                {
464                    "artifact_key": artifact_key,
465                    "artifact_name": artifact_name,
466                    "thinking_text": (
467                        f"Merging findings ({label_prefix}: "
468                        f"{len(current_findings)} findings into {total_batches} groups)..."
469                    ),
470                    "partial_text": "",
471                    "model": model,
472                },
473            )
474
475        next_findings: list[str] = []
476        for batch_index, batch in enumerate(batches, start=1):
477            batch_text = "\n\n".join(batch)
478            merge_prompt = _build_merge_prompt(
479                findings_text=batch_text,
480                batch_count=len(batch),
481                artifact_key=artifact_key,
482                artifact_name=artifact_name,
483                investigation_context=investigation_context,
484                chunk_merge_prompt_template=chunk_merge_prompt_template,
485            )
486
487            safe_key = sanitize_filename(artifact_key)
488            if save_case_prompt_fn is not None:
489                save_case_prompt_fn(
490                    f"artifact_{safe_key}_merge_r{merge_round}_b{batch_index}.md",
491                    system_prompt,
492                    merge_prompt,
493                )
494
495            merged = call_ai_with_retry_fn(
496                lambda prompt=merge_prompt: ai_provider.analyze(
497                    system_prompt=system_prompt,
498                    user_prompt=prompt,
499                    max_tokens=ai_response_max_tokens,
500                )
501            )
502            next_findings.append(f"### Merged batch {batch_index}\n{merged}")
503
504        current_findings = next_findings
505
506    return current_findings[0] if current_findings else ""
def analyze_artifact_chunked( artifact_prompt: str, artifact_key: str, artifact_name: str, investigation_context: str, model: str, *, system_prompt: str, ai_response_max_tokens: int, chunk_csv_budget: int, chunk_merge_prompt_template: str, max_merge_rounds: int, call_ai_with_retry_fn: Any, ai_provider: Any, audit_log_fn: Any = None, save_case_prompt_fn: Any = None, progress_callback: typing.Any | None = None) -> str:
147def analyze_artifact_chunked(
148    artifact_prompt: str,
149    artifact_key: str,
150    artifact_name: str,
151    investigation_context: str,
152    model: str,
153    *,
154    system_prompt: str,
155    ai_response_max_tokens: int,
156    chunk_csv_budget: int,
157    chunk_merge_prompt_template: str,
158    max_merge_rounds: int,
159    call_ai_with_retry_fn: Any,
160    ai_provider: Any,
161    audit_log_fn: Any = None,
162    save_case_prompt_fn: Any = None,
163    progress_callback: Any | None = None,
164) -> str:
165    """Analyze an artifact in multiple chunks when data exceeds context budget.
166
167    Splits the CSV portion of the prompt into row-boundary-aligned
168    chunks, analyzes each independently via the AI provider, then
169    merges the per-chunk findings hierarchically.
170
171    Args:
172        artifact_prompt: The fully rendered artifact analysis prompt.
173        artifact_key: Unique identifier for the artifact.
174        artifact_name: Human-readable artifact name.
175        investigation_context: The user's investigation context text.
176        model: AI model identifier for progress reporting.
177        system_prompt: The system prompt sent to the AI provider.
178        ai_response_max_tokens: Token budget for the AI response.
179        chunk_csv_budget: Character budget for CSV data per chunk.
180        chunk_merge_prompt_template: Template for merging chunk findings.
181        max_merge_rounds: Maximum hierarchical merge iterations.
182        call_ai_with_retry_fn: Callable wrapping AI calls with retry.
183        ai_provider: The AI provider instance.
184        audit_log_fn: Optional callable ``(action, details)`` for audit.
185        save_case_prompt_fn: Optional callable ``(filename, system, user)``
186            for saving prompts.
187        progress_callback: Optional callback for streaming progress.
188
189    Returns:
190        The merged analysis text from all chunks.
191    """
192    marker_match = CSV_DATA_SECTION_RE.search(artifact_prompt)
193    if marker_match is None:
194        return call_ai_with_retry_fn(
195            lambda: ai_provider.analyze(
196                system_prompt=system_prompt,
197                user_prompt=artifact_prompt,
198                max_tokens=ai_response_max_tokens,
199            )
200        )
201
202    instructions_portion = artifact_prompt[: marker_match.end()]
203    raw_csv_tail = artifact_prompt[marker_match.end():]
204
205    csv_data, context_suffix = split_csv_and_suffix(raw_csv_tail)
206
207    suffix_chars = len(context_suffix)
208    instructions_chars = len(instructions_portion) + len(system_prompt) + suffix_chars
209    csv_budget = max(1000, chunk_csv_budget - instructions_chars)
210
211    chunks = split_csv_into_chunks(csv_data, csv_budget)
212    total_chunks = len(chunks)
213
214    if total_chunks <= 1:
215        return call_ai_with_retry_fn(
216            lambda: ai_provider.analyze(
217                system_prompt=system_prompt,
218                user_prompt=artifact_prompt,
219                max_tokens=ai_response_max_tokens,
220            )
221        )
222
223    LOGGER.info(
224        "Chunked analysis for %s: splitting into %d chunks (budget %d chars/chunk).",
225        artifact_key, total_chunks, csv_budget,
226    )
227    if audit_log_fn is not None:
228        audit_log_fn(
229            "chunked_analysis_started",
230            {
231                "artifact_key": artifact_key,
232                "total_chunks": total_chunks,
233                "csv_budget_per_chunk": csv_budget,
234            },
235        )
236
237    chunk_findings: list[str] = []
238    for chunk_index, chunk_csv in enumerate(chunks, start=1):
239        chunk_prompt = f"{instructions_portion}{chunk_csv}{context_suffix}"
240        chunk_label = f"chunk {chunk_index}/{total_chunks}"
241
242        if progress_callback is not None:
243            emit_analysis_progress(
244                progress_callback, artifact_key, "thinking",
245                {
246                    "artifact_key": artifact_key,
247                    "artifact_name": artifact_name,
248                    "thinking_text": f"Analyzing {chunk_label}...",
249                    "partial_text": "",
250                    "model": model,
251                },
252            )
253
254        safe_key = sanitize_filename(artifact_key)
255        if save_case_prompt_fn is not None:
256            save_case_prompt_fn(
257                f"artifact_{safe_key}_chunk_{chunk_index}.md",
258                system_prompt,
259                chunk_prompt,
260            )
261
262        LOGGER.info("Analyzing %s %s...", artifact_key, chunk_label)
263        chunk_text = call_ai_with_retry_fn(
264            lambda prompt=chunk_prompt: ai_provider.analyze(
265                system_prompt=system_prompt,
266                user_prompt=prompt,
267                max_tokens=ai_response_max_tokens,
268            )
269        )
270        chunk_findings.append(f"### Chunk {chunk_index} of {total_chunks}\n{chunk_text}")
271
272    merged_text = _hierarchical_merge_findings(
273        chunk_findings=chunk_findings,
274        artifact_key=artifact_key,
275        artifact_name=artifact_name,
276        investigation_context=investigation_context,
277        model=model,
278        system_prompt=system_prompt,
279        ai_response_max_tokens=ai_response_max_tokens,
280        chunk_csv_budget=chunk_csv_budget,
281        chunk_merge_prompt_template=chunk_merge_prompt_template,
282        max_merge_rounds=max_merge_rounds,
283        call_ai_with_retry_fn=call_ai_with_retry_fn,
284        ai_provider=ai_provider,
285        save_case_prompt_fn=save_case_prompt_fn,
286        progress_callback=progress_callback,
287    )
288    LOGGER.info(
289        "Chunked analysis for %s complete: %d chunks merged.",
290        artifact_key, total_chunks,
291    )
292    return merged_text

Analyze an artifact in multiple chunks when data exceeds context budget.

Splits the CSV portion of the prompt into row-boundary-aligned chunks, analyzes each independently via the AI provider, then merges the per-chunk findings hierarchically.

Arguments:
  • artifact_prompt: The fully rendered artifact analysis prompt.
  • artifact_key: Unique identifier for the artifact.
  • artifact_name: Human-readable artifact name.
  • investigation_context: The user's investigation context text.
  • model: AI model identifier for progress reporting.
  • system_prompt: The system prompt sent to the AI provider.
  • ai_response_max_tokens: Token budget for the AI response.
  • chunk_csv_budget: Character budget for CSV data per chunk.
  • chunk_merge_prompt_template: Template for merging chunk findings.
  • max_merge_rounds: Maximum hierarchical merge iterations.
  • call_ai_with_retry_fn: Callable wrapping AI calls with retry.
  • ai_provider: The AI provider instance.
  • audit_log_fn: Optional callable (action, details) for audit.
  • save_case_prompt_fn: Optional callable (filename, system, user) for saving prompts.
  • progress_callback: Optional callback for streaming progress.
Returns:

The merged analysis text from all chunks.

def split_csv_and_suffix(raw_csv_tail: str) -> tuple[str, str]:
106def split_csv_and_suffix(raw_csv_tail: str) -> tuple[str, str]:
107    """Separate CSV rows from trailing content in a rendered prompt.
108
109    File-based templates may append a Markdown code fence and/or a
110    Final Context Reminder section after the CSV data placeholder.
111    This method extracts the actual CSV rows from those trailing
112    elements so that only the data is chunked, while the suffix is
113    appended to every chunk prompt.
114
115    Args:
116        raw_csv_tail: The portion of the rendered prompt that follows
117            the ``## Full Data (CSV)`` heading.
118
119    Returns:
120        A ``(csv_data, suffix)`` tuple.
121    """
122    text = raw_csv_tail
123
124    reminder_marker = "## Final Context Reminder"
125    reminder_pos = text.find(reminder_marker)
126    context_suffix = ""
127    if reminder_pos >= 0:
128        context_suffix = "\n\n" + text[reminder_pos:].strip()
129        text = text[:reminder_pos]
130
131    trailing_fence = ""
132    fence_match = CSV_TRAILING_FENCE_RE.search(text)
133    if fence_match:
134        trailing_fence = fence_match.group()
135        text = text[: fence_match.start()]
136
137    csv_data = text.strip()
138
139    suffix = ""
140    if trailing_fence:
141        suffix += trailing_fence
142    if context_suffix:
143        suffix += context_suffix
144    return csv_data, suffix

Separate CSV rows from trailing content in a rendered prompt.

File-based templates may append a Markdown code fence and/or a Final Context Reminder section after the CSV data placeholder. This method extracts the actual CSV rows from those trailing elements so that only the data is chunked, while the suffix is appended to every chunk prompt.

Arguments:
  • raw_csv_tail: The portion of the rendered prompt that follows the ## Full Data (CSV) heading.
Returns:

A (csv_data, suffix) tuple.

def split_csv_into_chunks(csv_text: str, max_chars: int) -> list[str]:
 50def split_csv_into_chunks(csv_text: str, max_chars: int) -> list[str]:
 51    """Split CSV text into chunks that each fit within *max_chars*.
 52
 53    Parsing is done via the ``csv`` module so that quoted fields with
 54    embedded newlines are kept intact as single records.  Every chunk
 55    retains the original header row.
 56
 57    Args:
 58        csv_text: Full CSV text including the header row.
 59        max_chars: Maximum character count per chunk (including header).
 60
 61    Returns:
 62        A list of CSV text chunks, each starting with the header row.
 63    """
 64    if max_chars <= 0 or len(csv_text) <= max_chars:
 65        return [csv_text]
 66
 67    reader = csv.reader(io.StringIO(csv_text))
 68    try:
 69        header_fields = next(reader)
 70    except StopIteration:
 71        return [csv_text]
 72
 73    header_line = _serialize_row(header_fields)
 74
 75    data_rows: list[str] = []
 76    for row in reader:
 77        data_rows.append(_serialize_row(row))
 78
 79    if not data_rows:
 80        return [csv_text]
 81
 82    header_overhead = len(header_line) + 1  # +1 for the joining newline
 83    chunk_data_budget = max_chars - header_overhead
 84    if chunk_data_budget <= 0:
 85        return [csv_text]
 86
 87    chunks: list[str] = []
 88    current_rows: list[str] = []
 89    current_size = 0
 90
 91    for serialized_row in data_rows:
 92        row_size = len(serialized_row) + 1  # +1 for joining newline
 93        if current_rows and current_size + row_size > chunk_data_budget:
 94            chunks.append(header_line + "\n" + "\n".join(current_rows))
 95            current_rows = []
 96            current_size = 0
 97        current_rows.append(serialized_row)
 98        current_size += row_size
 99
100    if current_rows:
101        chunks.append(header_line + "\n" + "\n".join(current_rows))
102
103    return chunks if chunks else [csv_text]

Split CSV text into chunks that each fit within max_chars.

Parsing is done via the csv module so that quoted fields with embedded newlines are kept intact as single records. Every chunk retains the original header row.

Arguments:
  • csv_text: Full CSV text including the header row.
  • max_chars: Maximum character count per chunk (including header).
Returns:

A list of CSV text chunks, each starting with the header row.