app.analyzer.chunking
Chunked analysis and hierarchical merge for large artifact datasets.
When artifact CSV data exceeds the AI model's context window, this module splits the data into row-boundary-aligned chunks, analyses each chunk independently, and hierarchically merges the per-chunk findings via additional AI calls until a single consolidated analysis remains.
Attributes:
- LOGGER: Module-level logger instance.
1"""Chunked analysis and hierarchical merge for large artifact datasets. 2 3When artifact CSV data exceeds the AI model's context window, this module 4splits the data into row-boundary-aligned chunks, analyses each chunk 5independently, and hierarchically merges the per-chunk findings via 6additional AI calls until a single consolidated analysis remains. 7 8Attributes: 9 LOGGER: Module-level logger instance. 10""" 11 12from __future__ import annotations 13 14import csv 15import io 16import logging 17from typing import Any 18 19from .constants import CSV_DATA_SECTION_RE, CSV_TRAILING_FENCE_RE 20from .utils import sanitize_filename, emit_analysis_progress 21 22LOGGER = logging.getLogger(__name__) 23 24__all__ = [ 25 "analyze_artifact_chunked", 26 "split_csv_and_suffix", 27 "split_csv_into_chunks", 28] 29 30 31def _serialize_row(row: list[str]) -> str: 32 """Serialize a single parsed CSV row back to a CSV string. 33 34 Uses the ``csv`` module so that fields containing commas, quotes, 35 or newlines are properly quoted. 36 37 Args: 38 row: List of field values. 39 40 Returns: 41 A single CSV line (without trailing newline). 42 """ 43 buf = io.StringIO() 44 writer = csv.writer(buf) 45 writer.writerow(row) 46 return buf.getvalue().rstrip("\r\n") 47 48 49def split_csv_into_chunks(csv_text: str, max_chars: int) -> list[str]: 50 """Split CSV text into chunks that each fit within *max_chars*. 51 52 Parsing is done via the ``csv`` module so that quoted fields with 53 embedded newlines are kept intact as single records. Every chunk 54 retains the original header row. 55 56 Args: 57 csv_text: Full CSV text including the header row. 58 max_chars: Maximum character count per chunk (including header). 59 60 Returns: 61 A list of CSV text chunks, each starting with the header row. 62 """ 63 if max_chars <= 0 or len(csv_text) <= max_chars: 64 return [csv_text] 65 66 reader = csv.reader(io.StringIO(csv_text)) 67 try: 68 header_fields = next(reader) 69 except StopIteration: 70 return [csv_text] 71 72 header_line = _serialize_row(header_fields) 73 74 data_rows: list[str] = [] 75 for row in reader: 76 data_rows.append(_serialize_row(row)) 77 78 if not data_rows: 79 return [csv_text] 80 81 header_overhead = len(header_line) + 1 # +1 for the joining newline 82 chunk_data_budget = max_chars - header_overhead 83 if chunk_data_budget <= 0: 84 return [csv_text] 85 86 chunks: list[str] = [] 87 current_rows: list[str] = [] 88 current_size = 0 89 90 for serialized_row in data_rows: 91 row_size = len(serialized_row) + 1 # +1 for joining newline 92 if current_rows and current_size + row_size > chunk_data_budget: 93 chunks.append(header_line + "\n" + "\n".join(current_rows)) 94 current_rows = [] 95 current_size = 0 96 current_rows.append(serialized_row) 97 current_size += row_size 98 99 if current_rows: 100 chunks.append(header_line + "\n" + "\n".join(current_rows)) 101 102 return chunks if chunks else [csv_text] 103 104 105def split_csv_and_suffix(raw_csv_tail: str) -> tuple[str, str]: 106 """Separate CSV rows from trailing content in a rendered prompt. 107 108 File-based templates may append a Markdown code fence and/or a 109 Final Context Reminder section after the CSV data placeholder. 110 This method extracts the actual CSV rows from those trailing 111 elements so that only the data is chunked, while the suffix is 112 appended to every chunk prompt. 113 114 Args: 115 raw_csv_tail: The portion of the rendered prompt that follows 116 the ``## Full Data (CSV)`` heading. 117 118 Returns: 119 A ``(csv_data, suffix)`` tuple. 120 """ 121 text = raw_csv_tail 122 123 reminder_marker = "## Final Context Reminder" 124 reminder_pos = text.find(reminder_marker) 125 context_suffix = "" 126 if reminder_pos >= 0: 127 context_suffix = "\n\n" + text[reminder_pos:].strip() 128 text = text[:reminder_pos] 129 130 trailing_fence = "" 131 fence_match = CSV_TRAILING_FENCE_RE.search(text) 132 if fence_match: 133 trailing_fence = fence_match.group() 134 text = text[: fence_match.start()] 135 136 csv_data = text.strip() 137 138 suffix = "" 139 if trailing_fence: 140 suffix += trailing_fence 141 if context_suffix: 142 suffix += context_suffix 143 return csv_data, suffix 144 145 146def analyze_artifact_chunked( 147 artifact_prompt: str, 148 artifact_key: str, 149 artifact_name: str, 150 investigation_context: str, 151 model: str, 152 *, 153 system_prompt: str, 154 ai_response_max_tokens: int, 155 chunk_csv_budget: int, 156 chunk_merge_prompt_template: str, 157 max_merge_rounds: int, 158 call_ai_with_retry_fn: Any, 159 ai_provider: Any, 160 audit_log_fn: Any = None, 161 save_case_prompt_fn: Any = None, 162 progress_callback: Any | None = None, 163) -> str: 164 """Analyze an artifact in multiple chunks when data exceeds context budget. 165 166 Splits the CSV portion of the prompt into row-boundary-aligned 167 chunks, analyzes each independently via the AI provider, then 168 merges the per-chunk findings hierarchically. 169 170 Args: 171 artifact_prompt: The fully rendered artifact analysis prompt. 172 artifact_key: Unique identifier for the artifact. 173 artifact_name: Human-readable artifact name. 174 investigation_context: The user's investigation context text. 175 model: AI model identifier for progress reporting. 176 system_prompt: The system prompt sent to the AI provider. 177 ai_response_max_tokens: Token budget for the AI response. 178 chunk_csv_budget: Character budget for CSV data per chunk. 179 chunk_merge_prompt_template: Template for merging chunk findings. 180 max_merge_rounds: Maximum hierarchical merge iterations. 181 call_ai_with_retry_fn: Callable wrapping AI calls with retry. 182 ai_provider: The AI provider instance. 183 audit_log_fn: Optional callable ``(action, details)`` for audit. 184 save_case_prompt_fn: Optional callable ``(filename, system, user)`` 185 for saving prompts. 186 progress_callback: Optional callback for streaming progress. 187 188 Returns: 189 The merged analysis text from all chunks. 190 """ 191 marker_match = CSV_DATA_SECTION_RE.search(artifact_prompt) 192 if marker_match is None: 193 return call_ai_with_retry_fn( 194 lambda: ai_provider.analyze( 195 system_prompt=system_prompt, 196 user_prompt=artifact_prompt, 197 max_tokens=ai_response_max_tokens, 198 ) 199 ) 200 201 instructions_portion = artifact_prompt[: marker_match.end()] 202 raw_csv_tail = artifact_prompt[marker_match.end():] 203 204 csv_data, context_suffix = split_csv_and_suffix(raw_csv_tail) 205 206 suffix_chars = len(context_suffix) 207 instructions_chars = len(instructions_portion) + len(system_prompt) + suffix_chars 208 csv_budget = max(1000, chunk_csv_budget - instructions_chars) 209 210 chunks = split_csv_into_chunks(csv_data, csv_budget) 211 total_chunks = len(chunks) 212 213 if total_chunks <= 1: 214 return call_ai_with_retry_fn( 215 lambda: ai_provider.analyze( 216 system_prompt=system_prompt, 217 user_prompt=artifact_prompt, 218 max_tokens=ai_response_max_tokens, 219 ) 220 ) 221 222 LOGGER.info( 223 "Chunked analysis for %s: splitting into %d chunks (budget %d chars/chunk).", 224 artifact_key, total_chunks, csv_budget, 225 ) 226 if audit_log_fn is not None: 227 audit_log_fn( 228 "chunked_analysis_started", 229 { 230 "artifact_key": artifact_key, 231 "total_chunks": total_chunks, 232 "csv_budget_per_chunk": csv_budget, 233 }, 234 ) 235 236 chunk_findings: list[str] = [] 237 for chunk_index, chunk_csv in enumerate(chunks, start=1): 238 chunk_prompt = f"{instructions_portion}{chunk_csv}{context_suffix}" 239 chunk_label = f"chunk {chunk_index}/{total_chunks}" 240 241 if progress_callback is not None: 242 emit_analysis_progress( 243 progress_callback, artifact_key, "thinking", 244 { 245 "artifact_key": artifact_key, 246 "artifact_name": artifact_name, 247 "thinking_text": f"Analyzing {chunk_label}...", 248 "partial_text": "", 249 "model": model, 250 }, 251 ) 252 253 safe_key = sanitize_filename(artifact_key) 254 if save_case_prompt_fn is not None: 255 save_case_prompt_fn( 256 f"artifact_{safe_key}_chunk_{chunk_index}.md", 257 system_prompt, 258 chunk_prompt, 259 ) 260 261 LOGGER.info("Analyzing %s %s...", artifact_key, chunk_label) 262 chunk_text = call_ai_with_retry_fn( 263 lambda prompt=chunk_prompt: ai_provider.analyze( 264 system_prompt=system_prompt, 265 user_prompt=prompt, 266 max_tokens=ai_response_max_tokens, 267 ) 268 ) 269 chunk_findings.append(f"### Chunk {chunk_index} of {total_chunks}\n{chunk_text}") 270 271 merged_text = _hierarchical_merge_findings( 272 chunk_findings=chunk_findings, 273 artifact_key=artifact_key, 274 artifact_name=artifact_name, 275 investigation_context=investigation_context, 276 model=model, 277 system_prompt=system_prompt, 278 ai_response_max_tokens=ai_response_max_tokens, 279 chunk_csv_budget=chunk_csv_budget, 280 chunk_merge_prompt_template=chunk_merge_prompt_template, 281 max_merge_rounds=max_merge_rounds, 282 call_ai_with_retry_fn=call_ai_with_retry_fn, 283 ai_provider=ai_provider, 284 save_case_prompt_fn=save_case_prompt_fn, 285 progress_callback=progress_callback, 286 ) 287 LOGGER.info( 288 "Chunked analysis for %s complete: %d chunks merged.", 289 artifact_key, total_chunks, 290 ) 291 return merged_text 292 293 294def _build_merge_prompt( 295 findings_text: str, 296 batch_count: int, 297 artifact_key: str, 298 artifact_name: str, 299 investigation_context: str, 300 chunk_merge_prompt_template: str, 301) -> str: 302 """Fill the chunk-merge template with the given findings. 303 304 Args: 305 findings_text: Combined text of per-chunk findings to merge. 306 batch_count: Number of chunks/batches. 307 artifact_key: Unique identifier for the artifact. 308 artifact_name: Human-readable artifact name. 309 investigation_context: The user's investigation context text. 310 chunk_merge_prompt_template: The merge template string. 311 312 Returns: 313 The fully rendered merge prompt string. 314 """ 315 prompt = chunk_merge_prompt_template 316 for placeholder, value in { 317 "chunk_count": str(batch_count), 318 "investigation_context": investigation_context.strip() or "No investigation context provided.", 319 "artifact_name": artifact_name, 320 "artifact_key": artifact_key, 321 "per_chunk_findings": findings_text, 322 }.items(): 323 prompt = prompt.replace(f"{{{{{placeholder}}}}}", value) 324 return prompt 325 326 327def _hierarchical_merge_findings( 328 chunk_findings: list[str], 329 artifact_key: str, 330 artifact_name: str, 331 investigation_context: str, 332 model: str, 333 *, 334 system_prompt: str, 335 ai_response_max_tokens: int, 336 chunk_csv_budget: int, 337 chunk_merge_prompt_template: str, 338 max_merge_rounds: int, 339 call_ai_with_retry_fn: Any, 340 ai_provider: Any, 341 save_case_prompt_fn: Any = None, 342 progress_callback: Any | None = None, 343) -> str: 344 """Merge chunk findings hierarchically until one result remains. 345 346 Args: 347 chunk_findings: List of per-chunk finding texts to merge. 348 artifact_key: Unique identifier for the artifact. 349 artifact_name: Human-readable artifact name. 350 investigation_context: The user's investigation context text. 351 model: AI model identifier for progress reporting. 352 system_prompt: The system prompt sent to the AI provider. 353 ai_response_max_tokens: Token budget for the AI response. 354 chunk_csv_budget: Character budget for CSV data per chunk. 355 chunk_merge_prompt_template: Template for merging findings. 356 max_merge_rounds: Maximum merge iterations. 357 call_ai_with_retry_fn: Callable wrapping AI calls with retry. 358 ai_provider: The AI provider instance. 359 save_case_prompt_fn: Optional callable for saving prompts. 360 progress_callback: Optional callback for streaming progress. 361 362 Returns: 363 A single merged analysis text. 364 """ 365 overhead = len(chunk_merge_prompt_template) + len(system_prompt) + 500 366 findings_budget = max(2000, chunk_csv_budget - overhead) 367 current_findings = list(chunk_findings) 368 merge_round = 0 369 370 while len(current_findings) > 1: 371 merge_round += 1 372 373 if merge_round > max_merge_rounds: 374 LOGGER.warning( 375 "Hierarchical merge for %s hit %d-round limit with %d findings remaining. " 376 "Falling back to concatenation.", 377 artifact_key, max_merge_rounds, len(current_findings), 378 ) 379 if progress_callback is not None: 380 emit_analysis_progress( 381 progress_callback, artifact_key, "thinking", 382 { 383 "artifact_key": artifact_key, 384 "artifact_name": artifact_name, 385 "thinking_text": ( 386 f"Merge round limit reached ({max_merge_rounds}). " 387 f"Concatenating {len(current_findings)} remaining findings..." 388 ), 389 "partial_text": "", 390 "model": model, 391 }, 392 ) 393 total_chars = sum(len(f) for f in current_findings) 394 if total_chars > findings_budget: 395 per_finding_budget = max(200, findings_budget // len(current_findings)) 396 capped = [] 397 for f in current_findings: 398 if len(f) > per_finding_budget: 399 capped.append(f[:per_finding_budget] + "\n[... truncated ...]") 400 else: 401 capped.append(f) 402 concatenated = "\n\n".join(capped) 403 else: 404 concatenated = "\n\n".join(current_findings) 405 406 merge_prompt = _build_merge_prompt( 407 findings_text=concatenated, 408 batch_count=len(current_findings), 409 artifact_key=artifact_key, 410 artifact_name=artifact_name, 411 investigation_context=investigation_context, 412 chunk_merge_prompt_template=chunk_merge_prompt_template, 413 ) 414 safe_key = sanitize_filename(artifact_key) 415 if save_case_prompt_fn is not None: 416 save_case_prompt_fn( 417 f"artifact_{safe_key}_merge_fallback.md", 418 system_prompt, 419 merge_prompt, 420 ) 421 return call_ai_with_retry_fn( 422 lambda prompt=merge_prompt: ai_provider.analyze( 423 system_prompt=system_prompt, 424 user_prompt=prompt, 425 max_tokens=ai_response_max_tokens, 426 ) 427 ) 428 429 batches: list[list[str]] = [] 430 current_batch: list[str] = [] 431 current_batch_size = 0 432 433 for finding in current_findings: 434 entry_size = len(finding) + 2 435 if current_batch and current_batch_size + entry_size > findings_budget: 436 batches.append(current_batch) 437 current_batch = [] 438 current_batch_size = 0 439 current_batch.append(finding) 440 current_batch_size += entry_size 441 442 if current_batch: 443 batches.append(current_batch) 444 445 if len(batches) == 1 and merge_round == 1: 446 pass 447 448 if len(batches) >= len(current_findings): 449 batches = [current_findings] 450 451 total_batches = len(batches) 452 label_prefix = f"merge round {merge_round}" if merge_round > 1 else "merge" 453 454 LOGGER.info( 455 "Hierarchical %s for %s: %d batches from %d findings (budget %d chars).", 456 label_prefix, artifact_key, total_batches, 457 len(current_findings), findings_budget, 458 ) 459 460 if progress_callback is not None: 461 emit_analysis_progress( 462 progress_callback, artifact_key, "thinking", 463 { 464 "artifact_key": artifact_key, 465 "artifact_name": artifact_name, 466 "thinking_text": ( 467 f"Merging findings ({label_prefix}: " 468 f"{len(current_findings)} findings into {total_batches} groups)..." 469 ), 470 "partial_text": "", 471 "model": model, 472 }, 473 ) 474 475 next_findings: list[str] = [] 476 for batch_index, batch in enumerate(batches, start=1): 477 batch_text = "\n\n".join(batch) 478 merge_prompt = _build_merge_prompt( 479 findings_text=batch_text, 480 batch_count=len(batch), 481 artifact_key=artifact_key, 482 artifact_name=artifact_name, 483 investigation_context=investigation_context, 484 chunk_merge_prompt_template=chunk_merge_prompt_template, 485 ) 486 487 safe_key = sanitize_filename(artifact_key) 488 if save_case_prompt_fn is not None: 489 save_case_prompt_fn( 490 f"artifact_{safe_key}_merge_r{merge_round}_b{batch_index}.md", 491 system_prompt, 492 merge_prompt, 493 ) 494 495 merged = call_ai_with_retry_fn( 496 lambda prompt=merge_prompt: ai_provider.analyze( 497 system_prompt=system_prompt, 498 user_prompt=prompt, 499 max_tokens=ai_response_max_tokens, 500 ) 501 ) 502 next_findings.append(f"### Merged batch {batch_index}\n{merged}") 503 504 current_findings = next_findings 505 506 return current_findings[0] if current_findings else ""
147def analyze_artifact_chunked( 148 artifact_prompt: str, 149 artifact_key: str, 150 artifact_name: str, 151 investigation_context: str, 152 model: str, 153 *, 154 system_prompt: str, 155 ai_response_max_tokens: int, 156 chunk_csv_budget: int, 157 chunk_merge_prompt_template: str, 158 max_merge_rounds: int, 159 call_ai_with_retry_fn: Any, 160 ai_provider: Any, 161 audit_log_fn: Any = None, 162 save_case_prompt_fn: Any = None, 163 progress_callback: Any | None = None, 164) -> str: 165 """Analyze an artifact in multiple chunks when data exceeds context budget. 166 167 Splits the CSV portion of the prompt into row-boundary-aligned 168 chunks, analyzes each independently via the AI provider, then 169 merges the per-chunk findings hierarchically. 170 171 Args: 172 artifact_prompt: The fully rendered artifact analysis prompt. 173 artifact_key: Unique identifier for the artifact. 174 artifact_name: Human-readable artifact name. 175 investigation_context: The user's investigation context text. 176 model: AI model identifier for progress reporting. 177 system_prompt: The system prompt sent to the AI provider. 178 ai_response_max_tokens: Token budget for the AI response. 179 chunk_csv_budget: Character budget for CSV data per chunk. 180 chunk_merge_prompt_template: Template for merging chunk findings. 181 max_merge_rounds: Maximum hierarchical merge iterations. 182 call_ai_with_retry_fn: Callable wrapping AI calls with retry. 183 ai_provider: The AI provider instance. 184 audit_log_fn: Optional callable ``(action, details)`` for audit. 185 save_case_prompt_fn: Optional callable ``(filename, system, user)`` 186 for saving prompts. 187 progress_callback: Optional callback for streaming progress. 188 189 Returns: 190 The merged analysis text from all chunks. 191 """ 192 marker_match = CSV_DATA_SECTION_RE.search(artifact_prompt) 193 if marker_match is None: 194 return call_ai_with_retry_fn( 195 lambda: ai_provider.analyze( 196 system_prompt=system_prompt, 197 user_prompt=artifact_prompt, 198 max_tokens=ai_response_max_tokens, 199 ) 200 ) 201 202 instructions_portion = artifact_prompt[: marker_match.end()] 203 raw_csv_tail = artifact_prompt[marker_match.end():] 204 205 csv_data, context_suffix = split_csv_and_suffix(raw_csv_tail) 206 207 suffix_chars = len(context_suffix) 208 instructions_chars = len(instructions_portion) + len(system_prompt) + suffix_chars 209 csv_budget = max(1000, chunk_csv_budget - instructions_chars) 210 211 chunks = split_csv_into_chunks(csv_data, csv_budget) 212 total_chunks = len(chunks) 213 214 if total_chunks <= 1: 215 return call_ai_with_retry_fn( 216 lambda: ai_provider.analyze( 217 system_prompt=system_prompt, 218 user_prompt=artifact_prompt, 219 max_tokens=ai_response_max_tokens, 220 ) 221 ) 222 223 LOGGER.info( 224 "Chunked analysis for %s: splitting into %d chunks (budget %d chars/chunk).", 225 artifact_key, total_chunks, csv_budget, 226 ) 227 if audit_log_fn is not None: 228 audit_log_fn( 229 "chunked_analysis_started", 230 { 231 "artifact_key": artifact_key, 232 "total_chunks": total_chunks, 233 "csv_budget_per_chunk": csv_budget, 234 }, 235 ) 236 237 chunk_findings: list[str] = [] 238 for chunk_index, chunk_csv in enumerate(chunks, start=1): 239 chunk_prompt = f"{instructions_portion}{chunk_csv}{context_suffix}" 240 chunk_label = f"chunk {chunk_index}/{total_chunks}" 241 242 if progress_callback is not None: 243 emit_analysis_progress( 244 progress_callback, artifact_key, "thinking", 245 { 246 "artifact_key": artifact_key, 247 "artifact_name": artifact_name, 248 "thinking_text": f"Analyzing {chunk_label}...", 249 "partial_text": "", 250 "model": model, 251 }, 252 ) 253 254 safe_key = sanitize_filename(artifact_key) 255 if save_case_prompt_fn is not None: 256 save_case_prompt_fn( 257 f"artifact_{safe_key}_chunk_{chunk_index}.md", 258 system_prompt, 259 chunk_prompt, 260 ) 261 262 LOGGER.info("Analyzing %s %s...", artifact_key, chunk_label) 263 chunk_text = call_ai_with_retry_fn( 264 lambda prompt=chunk_prompt: ai_provider.analyze( 265 system_prompt=system_prompt, 266 user_prompt=prompt, 267 max_tokens=ai_response_max_tokens, 268 ) 269 ) 270 chunk_findings.append(f"### Chunk {chunk_index} of {total_chunks}\n{chunk_text}") 271 272 merged_text = _hierarchical_merge_findings( 273 chunk_findings=chunk_findings, 274 artifact_key=artifact_key, 275 artifact_name=artifact_name, 276 investigation_context=investigation_context, 277 model=model, 278 system_prompt=system_prompt, 279 ai_response_max_tokens=ai_response_max_tokens, 280 chunk_csv_budget=chunk_csv_budget, 281 chunk_merge_prompt_template=chunk_merge_prompt_template, 282 max_merge_rounds=max_merge_rounds, 283 call_ai_with_retry_fn=call_ai_with_retry_fn, 284 ai_provider=ai_provider, 285 save_case_prompt_fn=save_case_prompt_fn, 286 progress_callback=progress_callback, 287 ) 288 LOGGER.info( 289 "Chunked analysis for %s complete: %d chunks merged.", 290 artifact_key, total_chunks, 291 ) 292 return merged_text
Analyze an artifact in multiple chunks when data exceeds context budget.
Splits the CSV portion of the prompt into row-boundary-aligned chunks, analyzes each independently via the AI provider, then merges the per-chunk findings hierarchically.
Arguments:
- artifact_prompt: The fully rendered artifact analysis prompt.
- artifact_key: Unique identifier for the artifact.
- artifact_name: Human-readable artifact name.
- investigation_context: The user's investigation context text.
- model: AI model identifier for progress reporting.
- system_prompt: The system prompt sent to the AI provider.
- ai_response_max_tokens: Token budget for the AI response.
- chunk_csv_budget: Character budget for CSV data per chunk.
- chunk_merge_prompt_template: Template for merging chunk findings.
- max_merge_rounds: Maximum hierarchical merge iterations.
- call_ai_with_retry_fn: Callable wrapping AI calls with retry.
- ai_provider: The AI provider instance.
- audit_log_fn: Optional callable
(action, details)for audit. - save_case_prompt_fn: Optional callable
(filename, system, user)for saving prompts. - progress_callback: Optional callback for streaming progress.
Returns:
The merged analysis text from all chunks.
106def split_csv_and_suffix(raw_csv_tail: str) -> tuple[str, str]: 107 """Separate CSV rows from trailing content in a rendered prompt. 108 109 File-based templates may append a Markdown code fence and/or a 110 Final Context Reminder section after the CSV data placeholder. 111 This method extracts the actual CSV rows from those trailing 112 elements so that only the data is chunked, while the suffix is 113 appended to every chunk prompt. 114 115 Args: 116 raw_csv_tail: The portion of the rendered prompt that follows 117 the ``## Full Data (CSV)`` heading. 118 119 Returns: 120 A ``(csv_data, suffix)`` tuple. 121 """ 122 text = raw_csv_tail 123 124 reminder_marker = "## Final Context Reminder" 125 reminder_pos = text.find(reminder_marker) 126 context_suffix = "" 127 if reminder_pos >= 0: 128 context_suffix = "\n\n" + text[reminder_pos:].strip() 129 text = text[:reminder_pos] 130 131 trailing_fence = "" 132 fence_match = CSV_TRAILING_FENCE_RE.search(text) 133 if fence_match: 134 trailing_fence = fence_match.group() 135 text = text[: fence_match.start()] 136 137 csv_data = text.strip() 138 139 suffix = "" 140 if trailing_fence: 141 suffix += trailing_fence 142 if context_suffix: 143 suffix += context_suffix 144 return csv_data, suffix
Separate CSV rows from trailing content in a rendered prompt.
File-based templates may append a Markdown code fence and/or a Final Context Reminder section after the CSV data placeholder. This method extracts the actual CSV rows from those trailing elements so that only the data is chunked, while the suffix is appended to every chunk prompt.
Arguments:
- raw_csv_tail: The portion of the rendered prompt that follows
the
## Full Data (CSV)heading.
Returns:
A
(csv_data, suffix)tuple.
50def split_csv_into_chunks(csv_text: str, max_chars: int) -> list[str]: 51 """Split CSV text into chunks that each fit within *max_chars*. 52 53 Parsing is done via the ``csv`` module so that quoted fields with 54 embedded newlines are kept intact as single records. Every chunk 55 retains the original header row. 56 57 Args: 58 csv_text: Full CSV text including the header row. 59 max_chars: Maximum character count per chunk (including header). 60 61 Returns: 62 A list of CSV text chunks, each starting with the header row. 63 """ 64 if max_chars <= 0 or len(csv_text) <= max_chars: 65 return [csv_text] 66 67 reader = csv.reader(io.StringIO(csv_text)) 68 try: 69 header_fields = next(reader) 70 except StopIteration: 71 return [csv_text] 72 73 header_line = _serialize_row(header_fields) 74 75 data_rows: list[str] = [] 76 for row in reader: 77 data_rows.append(_serialize_row(row)) 78 79 if not data_rows: 80 return [csv_text] 81 82 header_overhead = len(header_line) + 1 # +1 for the joining newline 83 chunk_data_budget = max_chars - header_overhead 84 if chunk_data_budget <= 0: 85 return [csv_text] 86 87 chunks: list[str] = [] 88 current_rows: list[str] = [] 89 current_size = 0 90 91 for serialized_row in data_rows: 92 row_size = len(serialized_row) + 1 # +1 for joining newline 93 if current_rows and current_size + row_size > chunk_data_budget: 94 chunks.append(header_line + "\n" + "\n".join(current_rows)) 95 current_rows = [] 96 current_size = 0 97 current_rows.append(serialized_row) 98 current_size += row_size 99 100 if current_rows: 101 chunks.append(header_line + "\n" + "\n".join(current_rows)) 102 103 return chunks if chunks else [csv_text]
Split CSV text into chunks that each fit within max_chars.
Parsing is done via the csv module so that quoted fields with
embedded newlines are kept intact as single records. Every chunk
retains the original header row.
Arguments:
- csv_text: Full CSV text including the header row.
- max_chars: Maximum character count per chunk (including header).
Returns:
A list of CSV text chunks, each starting with the header row.