app.ai_providers

Multi-provider AI abstraction layer for AIFT forensic analysis.

This package provides a unified interface for interacting with multiple AI providers used in AI Forensic Triage (AIFT) analysis workflows. It abstracts away provider-specific SDK differences behind a common AIProvider base class, enabling the rest of the application to perform AI-powered forensic analysis without coupling to any single vendor.

Supported providers:

  • Claude (Anthropic) -- via the anthropic Python SDK.
  • OpenAI -- via the openai Python SDK (Chat Completions and Responses APIs).
  • Moonshot Kimi -- via the openai Python SDK pointed at the Moonshot API base URL.
  • OpenAI-compatible local endpoints -- Ollama, LM Studio, vLLM, or any server exposing an OpenAI-compatible /v1/chat/completions endpoint.
 1"""Multi-provider AI abstraction layer for AIFT forensic analysis.
 2
 3This package provides a unified interface for interacting with multiple AI
 4providers used in AI Forensic Triage (AIFT) analysis workflows. It abstracts
 5away provider-specific SDK differences behind a common ``AIProvider`` base
 6class, enabling the rest of the application to perform AI-powered forensic
 7analysis without coupling to any single vendor.
 8
 9Supported providers:
10
11* **Claude (Anthropic)** -- via the ``anthropic`` Python SDK.
12* **OpenAI** -- via the ``openai`` Python SDK (Chat Completions and
13  Responses APIs).
14* **Moonshot Kimi** -- via the ``openai`` Python SDK pointed at the
15  Moonshot API base URL.
16* **OpenAI-compatible local endpoints** -- Ollama, LM Studio, vLLM, or
17  any server exposing an OpenAI-compatible ``/v1/chat/completions``
18  endpoint.
19"""
20
21from __future__ import annotations
22
23from .base import (
24    AIProvider,
25    AIProviderError,
26    DEFAULT_CLAUDE_MODEL,
27    DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
28    DEFAULT_KIMI_BASE_URL,
29    DEFAULT_KIMI_FILE_UPLOAD_PURPOSE,
30    DEFAULT_KIMI_MODEL,
31    DEFAULT_LOCAL_BASE_URL,
32    DEFAULT_LOCAL_MODEL,
33    DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
34    DEFAULT_MAX_TOKENS,
35    DEFAULT_OPENAI_MODEL,
36    RATE_LIMIT_MAX_RETRIES,
37    RateLimitState,
38    _extract_retry_after_seconds,
39    _is_context_length_error,
40    _normalize_api_key_value,
41    _normalize_openai_compatible_base_url,
42    _resolve_api_key,
43    _resolve_api_key_candidates,
44)
45from .claude_provider import ClaudeProvider
46from .factory import create_provider
47from .kimi_provider import KimiProvider
48from .local_provider import LocalProvider
49from .openai_provider import OpenAIProvider
50from .utils import (
51    _extract_anthropic_text,
52    _extract_openai_text,
53    normalize_attachment_input as _normalize_attachment_input,
54    normalize_attachment_inputs as _normalize_attachment_inputs,
55)
56
57__all__ = [
58    "AIProvider",
59    "AIProviderError",
60    "ClaudeProvider",
61    "OpenAIProvider",
62    "KimiProvider",
63    "LocalProvider",
64    "create_provider",
65    "base",
66    "claude_provider",
67    "factory",
68    "kimi_provider",
69    "local_provider",
70    "openai_provider",
71    "utils",
72]
class AIProvider(abc.ABC):
153class AIProvider(ABC):
154    """Abstract base class defining the interface for all AI providers.
155
156    Every concrete provider (Claude, OpenAI, Kimi, Local) implements this
157    interface so that the forensic analysis engine can call any provider
158    interchangeably.
159
160    Subclasses must implement:
161        * ``analyze`` -- single-shot prompt-to-text generation.
162        * ``analyze_stream`` -- incremental (streaming) text generation.
163        * ``get_model_info`` -- provider/model metadata dictionary.
164
165    Subclasses may override:
166        * ``analyze_with_attachments`` -- analysis with CSV file attachments.
167
168    Attributes:
169        attach_csv_as_file (bool): Whether to attempt uploading CSV artifacts
170            as file attachments rather than inlining them into the prompt.
171    """
172
173    @abstractmethod
174    def analyze(
175        self,
176        system_prompt: str,
177        user_prompt: str,
178        max_tokens: int = DEFAULT_MAX_TOKENS,
179    ) -> str:
180        """Send a prompt to the provider and return the complete generated text.
181
182        Args:
183            system_prompt: The system-level instruction text.
184            user_prompt: The user-facing prompt with investigation context.
185            max_tokens: Maximum number of tokens the model may generate.
186
187        Returns:
188            The generated text response as a string.
189
190        Raises:
191            AIProviderError: If the request fails for any reason.
192        """
193
194    @abstractmethod
195    def analyze_stream(
196        self,
197        system_prompt: str,
198        user_prompt: str,
199        max_tokens: int = DEFAULT_MAX_TOKENS,
200    ) -> Iterator[str]:
201        """Stream generated text chunks for the provided prompt.
202
203        Args:
204            system_prompt: The system-level instruction text.
205            user_prompt: The user-facing prompt with investigation context.
206            max_tokens: Maximum number of tokens the model may generate.
207
208        Yields:
209            Individual text chunks (deltas) as they are generated.
210
211        Raises:
212            AIProviderError: If the streaming request fails or produces no output.
213        """
214
215    @abstractmethod
216    def get_model_info(self) -> dict[str, str]:
217        """Return provider and model metadata for audit logging and reports.
218
219        Returns:
220            A dictionary with at least ``"provider"`` and ``"model"`` keys.
221        """
222
223    def analyze_with_attachments(
224        self,
225        system_prompt: str,
226        user_prompt: str,
227        attachments: list[Mapping[str, str]] | None,
228        max_tokens: int = DEFAULT_MAX_TOKENS,
229    ) -> str:
230        """Analyze with optional file attachments.
231
232        Providers that support file uploads override this method. The default
233        implementation inlines attachment content into the prompt so the model
234        always receives the evidence data, then delegates to ``analyze``.
235
236        Args:
237            system_prompt: The system-level instruction text.
238            user_prompt: The user-facing prompt with investigation context.
239            attachments: Optional list of attachment descriptors with
240                ``"path"``, ``"name"``, and ``"mime_type"`` keys.
241            max_tokens: Maximum number of tokens the model may generate.
242
243        Returns:
244            The generated text response as a string.
245
246        Raises:
247            AIProviderError: If the request fails.
248        """
249        from .utils import _inline_attachment_data_into_prompt
250
251        effective_prompt = user_prompt
252        if attachments:
253            effective_prompt, inlined = _inline_attachment_data_into_prompt(
254                user_prompt=user_prompt,
255                attachments=attachments,
256            )
257            if inlined:
258                logger.info("Base provider inlined attachment data into prompt.")
259
260        return self.analyze(
261            system_prompt=system_prompt,
262            user_prompt=effective_prompt,
263            max_tokens=max_tokens,
264        )
265
266    def _prepare_csv_attachments(
267        self,
268        attachments: list[Mapping[str, str]] | None,
269        *,
270        supports_file_attachments: bool = True,
271    ) -> list[dict[str, str]] | None:
272        """Apply shared CSV-attachment preflight checks and normalization.
273
274        Args:
275            attachments: Raw attachment descriptors from the caller.
276            supports_file_attachments: Whether the provider's SDK client
277                exposes the necessary file-upload APIs.
278
279        Returns:
280            A list of normalized attachment dicts, or ``None`` if attachment
281            mode should be skipped.
282        """
283        from .utils import normalize_attachment_inputs
284
285        if not bool(getattr(self, "attach_csv_as_file", False)):
286            return None
287        if not attachments:
288            return None
289        if getattr(self, "_csv_attachment_supported", None) is False:
290            return None
291        if not supports_file_attachments:
292            if hasattr(self, "_csv_attachment_supported"):
293                setattr(self, "_csv_attachment_supported", False)
294            return None
295
296        normalized_attachments = normalize_attachment_inputs(attachments)
297        if not normalized_attachments:
298            return None
299        return normalized_attachments

Abstract base class defining the interface for all AI providers.

Every concrete provider (Claude, OpenAI, Kimi, Local) implements this interface so that the forensic analysis engine can call any provider interchangeably.

Subclasses must implement:
Subclasses may override:
Attributes:
  • attach_csv_as_file (bool): Whether to attempt uploading CSV artifacts as file attachments rather than inlining them into the prompt.
@abstractmethod
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
173    @abstractmethod
174    def analyze(
175        self,
176        system_prompt: str,
177        user_prompt: str,
178        max_tokens: int = DEFAULT_MAX_TOKENS,
179    ) -> str:
180        """Send a prompt to the provider and return the complete generated text.
181
182        Args:
183            system_prompt: The system-level instruction text.
184            user_prompt: The user-facing prompt with investigation context.
185            max_tokens: Maximum number of tokens the model may generate.
186
187        Returns:
188            The generated text response as a string.
189
190        Raises:
191            AIProviderError: If the request fails for any reason.
192        """

Send a prompt to the provider and return the complete generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum number of tokens the model may generate.
Returns:

The generated text response as a string.

Raises:
  • AIProviderError: If the request fails for any reason.
@abstractmethod
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
194    @abstractmethod
195    def analyze_stream(
196        self,
197        system_prompt: str,
198        user_prompt: str,
199        max_tokens: int = DEFAULT_MAX_TOKENS,
200    ) -> Iterator[str]:
201        """Stream generated text chunks for the provided prompt.
202
203        Args:
204            system_prompt: The system-level instruction text.
205            user_prompt: The user-facing prompt with investigation context.
206            max_tokens: Maximum number of tokens the model may generate.
207
208        Yields:
209            Individual text chunks (deltas) as they are generated.
210
211        Raises:
212            AIProviderError: If the streaming request fails or produces no output.
213        """

Stream generated text chunks for the provided prompt.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum number of tokens the model may generate.
Yields:

Individual text chunks (deltas) as they are generated.

Raises:
  • AIProviderError: If the streaming request fails or produces no output.
@abstractmethod
def get_model_info(self) -> dict[str, str]:
215    @abstractmethod
216    def get_model_info(self) -> dict[str, str]:
217        """Return provider and model metadata for audit logging and reports.
218
219        Returns:
220            A dictionary with at least ``"provider"`` and ``"model"`` keys.
221        """

Return provider and model metadata for audit logging and reports.

Returns:

A dictionary with at least "provider" and "model" keys.

def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
223    def analyze_with_attachments(
224        self,
225        system_prompt: str,
226        user_prompt: str,
227        attachments: list[Mapping[str, str]] | None,
228        max_tokens: int = DEFAULT_MAX_TOKENS,
229    ) -> str:
230        """Analyze with optional file attachments.
231
232        Providers that support file uploads override this method. The default
233        implementation inlines attachment content into the prompt so the model
234        always receives the evidence data, then delegates to ``analyze``.
235
236        Args:
237            system_prompt: The system-level instruction text.
238            user_prompt: The user-facing prompt with investigation context.
239            attachments: Optional list of attachment descriptors with
240                ``"path"``, ``"name"``, and ``"mime_type"`` keys.
241            max_tokens: Maximum number of tokens the model may generate.
242
243        Returns:
244            The generated text response as a string.
245
246        Raises:
247            AIProviderError: If the request fails.
248        """
249        from .utils import _inline_attachment_data_into_prompt
250
251        effective_prompt = user_prompt
252        if attachments:
253            effective_prompt, inlined = _inline_attachment_data_into_prompt(
254                user_prompt=user_prompt,
255                attachments=attachments,
256            )
257            if inlined:
258                logger.info("Base provider inlined attachment data into prompt.")
259
260        return self.analyze(
261            system_prompt=system_prompt,
262            user_prompt=effective_prompt,
263            max_tokens=max_tokens,
264        )

Analyze with optional file attachments.

Providers that support file uploads override this method. The default implementation inlines attachment content into the prompt so the model always receives the evidence data, then delegates to analyze.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors with "path", "name", and "mime_type" keys.
  • max_tokens: Maximum number of tokens the model may generate.
Returns:

The generated text response as a string.

Raises:
  • AIProviderError: If the request fails.
class AIProviderError(builtins.RuntimeError):
139class AIProviderError(RuntimeError):
140    """Raised when an AI provider request fails with a user-facing message.
141
142    All provider implementations translate SDK-specific exceptions into this
143    single exception type so that callers only need one ``except`` clause for
144    AI-related failures. The message is safe for display in the web UI.
145    """

Raised when an AI provider request fails with a user-facing message.

All provider implementations translate SDK-specific exceptions into this single exception type so that callers only need one except clause for AI-related failures. The message is safe for display in the web UI.

class ClaudeProvider(app.ai_providers.AIProvider):
 44class ClaudeProvider(AIProvider):
 45    """Anthropic Claude provider implementation.
 46
 47    Supports both synchronous and streaming generation, CSV file attachments
 48    via content blocks (base64-encoded PDFs or inline text), and automatic
 49    token-limit retry when ``max_tokens`` exceeds the model's maximum.
 50
 51    Attributes:
 52        api_key (str): The Anthropic API key.
 53        model (str): The Claude model identifier.
 54        attach_csv_as_file (bool): Whether to upload CSV artifacts as
 55            content blocks.
 56        request_timeout_seconds (float): HTTP timeout in seconds.
 57        client: The ``anthropic.Anthropic`` SDK client instance.
 58    """
 59
 60    def __init__(
 61        self,
 62        api_key: str,
 63        model: str = DEFAULT_CLAUDE_MODEL,
 64        attach_csv_as_file: bool = True,
 65        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 66    ) -> None:
 67        """Initialize the Claude provider.
 68
 69        Args:
 70            api_key: Anthropic API key. Must be non-empty.
 71            model: Claude model identifier.
 72            attach_csv_as_file: If ``True``, send CSV artifacts as structured
 73                content blocks.
 74            request_timeout_seconds: HTTP timeout in seconds.
 75
 76        Raises:
 77            AIProviderError: If the ``anthropic`` SDK is not installed or
 78                the API key is empty.
 79        """
 80        try:
 81            import anthropic
 82        except ImportError as error:
 83            raise AIProviderError(
 84                "anthropic SDK is not installed. Install it with `pip install anthropic`."
 85            ) from error
 86
 87        normalized_api_key = _normalize_api_key_value(api_key)
 88        if not normalized_api_key:
 89            raise AIProviderError(
 90                "Claude API key is not configured. "
 91                "Set `ai.claude.api_key` in config.yaml or the ANTHROPIC_API_KEY environment variable."
 92            )
 93
 94        self._anthropic = anthropic
 95        self.api_key = normalized_api_key
 96        self.model = model
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self._csv_attachment_supported: bool | None = None
 99        self.request_timeout_seconds = _resolve_timeout_seconds(
100            request_timeout_seconds,
101            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
102        )
103        self.client = anthropic.Anthropic(
104            api_key=normalized_api_key,
105            timeout=self.request_timeout_seconds,
106        )
107        logger.info("Initialized Claude provider with model %s (timeout %.1fs)", model, self.request_timeout_seconds)
108
109    def analyze(
110        self,
111        system_prompt: str,
112        user_prompt: str,
113        max_tokens: int = DEFAULT_MAX_TOKENS,
114    ) -> str:
115        """Send a prompt to Claude and return the generated text.
116
117        Args:
118            system_prompt: The system-level instruction text.
119            user_prompt: The user-facing prompt with investigation context.
120            max_tokens: Maximum completion tokens.
121
122        Returns:
123            The generated analysis text.
124
125        Raises:
126            AIProviderError: On any API or network failure.
127        """
128        return self.analyze_with_attachments(
129            system_prompt=system_prompt,
130            user_prompt=user_prompt,
131            attachments=None,
132            max_tokens=max_tokens,
133        )
134
135    def analyze_stream(
136        self,
137        system_prompt: str,
138        user_prompt: str,
139        max_tokens: int = DEFAULT_MAX_TOKENS,
140    ) -> Iterator[str]:
141        """Stream generated text chunks from Claude.
142
143        Args:
144            system_prompt: The system-level instruction text.
145            user_prompt: The user-facing prompt with investigation context.
146            max_tokens: Maximum completion tokens.
147
148        Yields:
149            Text chunk strings as they are generated.
150
151        Raises:
152            AIProviderError: On empty response or API failure.
153        """
154        def _stream() -> Iterator[str]:
155            request_kwargs: dict[str, Any] = {
156                "model": self.model,
157                "max_tokens": max_tokens,
158                "system": system_prompt,
159                "messages": [{"role": "user", "content": user_prompt}],
160                "stream": True,
161            }
162            try:
163                stream = _run_with_rate_limit_retries(
164                    request_fn=lambda: self._with_token_limit_retry(
165                        lambda kw: self.client.messages.create(**kw),
166                        request_kwargs,
167                    ),
168                    rate_limit_error_type=self._anthropic.RateLimitError,
169                    provider_name="Claude",
170                )
171                emitted = False
172                for event in stream:
173                    chunk_text = _extract_anthropic_stream_text(event)
174                    if not chunk_text:
175                        continue
176                    emitted = True
177                    yield chunk_text
178                if not emitted:
179                    raise AIProviderError("Claude returned an empty response.")
180            except AIProviderError:
181                raise
182            except self._anthropic.APIConnectionError as error:
183                raise AIProviderError(
184                    "Unable to connect to Claude API. Check network access and endpoint configuration."
185                ) from error
186            except self._anthropic.AuthenticationError as error:
187                raise AIProviderError(
188                    "Claude authentication failed. Check `ai.claude.api_key` or ANTHROPIC_API_KEY."
189                ) from error
190            except self._anthropic.BadRequestError as error:
191                if _is_context_length_error(error):
192                    raise AIProviderError(
193                        "Claude request exceeded the model context length. Reduce prompt size and retry."
194                    ) from error
195                raise AIProviderError(f"Claude request was rejected: {error}") from error
196            except self._anthropic.APIError as error:
197                raise AIProviderError(f"Claude API error: {error}") from error
198            except Exception as error:
199                raise AIProviderError(f"Unexpected Claude provider error: {error}") from error
200
201        return _stream()
202
203    def analyze_with_attachments(
204        self,
205        system_prompt: str,
206        user_prompt: str,
207        attachments: list[Mapping[str, str]] | None,
208        max_tokens: int = DEFAULT_MAX_TOKENS,
209    ) -> str:
210        """Analyze with optional CSV file attachments via Claude content blocks.
211
212        Args:
213            system_prompt: The system-level instruction text.
214            user_prompt: The user-facing prompt with investigation context.
215            attachments: Optional list of attachment descriptors.
216            max_tokens: Maximum completion tokens.
217
218        Returns:
219            The generated analysis text.
220
221        Raises:
222            AIProviderError: On any API or network failure.
223        """
224        def _request() -> str:
225            attachment_response = self._request_with_csv_attachments(
226                system_prompt=system_prompt,
227                user_prompt=user_prompt,
228                max_tokens=max_tokens,
229                attachments=attachments,
230            )
231            if attachment_response:
232                return attachment_response
233
234            effective_prompt = user_prompt
235            if attachments:
236                effective_prompt, inlined = _inline_attachment_data_into_prompt(
237                    user_prompt=user_prompt,
238                    attachments=attachments,
239                )
240                if inlined:
241                    logger.info("Claude attachment fallback inlined attachment data into prompt.")
242
243            response = self._create_message_with_stream_fallback(
244                system_prompt=system_prompt,
245                messages=[{"role": "user", "content": effective_prompt}],
246                max_tokens=max_tokens,
247            )
248            text = _extract_anthropic_text(response)
249            if not text:
250                raise AIProviderError("Claude returned an empty response.")
251            return text
252
253        return self._run_claude_request(_request)
254
255    def _run_claude_request(self, request_fn: Callable[[], _T]) -> _T:
256        """Execute a Claude request with rate-limit retries and error mapping.
257
258        Args:
259            request_fn: A zero-argument callable that performs the API request.
260
261        Returns:
262            The return value of ``request_fn`` on success.
263
264        Raises:
265            AIProviderError: On any Anthropic SDK error.
266        """
267        try:
268            return _run_with_rate_limit_retries(
269                request_fn=request_fn,
270                rate_limit_error_type=self._anthropic.RateLimitError,
271                provider_name="Claude",
272            )
273        except AIProviderError:
274            raise
275        except self._anthropic.APIConnectionError as error:
276            raise AIProviderError(
277                "Unable to connect to Claude API. Check network access and endpoint configuration."
278            ) from error
279        except self._anthropic.AuthenticationError as error:
280            raise AIProviderError(
281                "Claude authentication failed. Check `ai.claude.api_key` or ANTHROPIC_API_KEY."
282            ) from error
283        except self._anthropic.BadRequestError as error:
284            if _is_context_length_error(error):
285                raise AIProviderError(
286                    "Claude request exceeded the model context length. Reduce prompt size and retry."
287                ) from error
288            raise AIProviderError(f"Claude request was rejected: {error}") from error
289        except self._anthropic.APIError as error:
290            raise AIProviderError(f"Claude API error: {error}") from error
291        except Exception as error:
292            raise AIProviderError(f"Unexpected Claude provider error: {error}") from error
293
294    def _request_with_csv_attachments(
295        self,
296        system_prompt: str,
297        user_prompt: str,
298        max_tokens: int,
299        attachments: list[Mapping[str, str]] | None,
300    ) -> str | None:
301        """Attempt to send a request with CSV files as Claude content blocks.
302
303        Args:
304            system_prompt: The system-level instruction text.
305            user_prompt: The user-facing prompt text.
306            max_tokens: Maximum completion tokens.
307            attachments: Optional list of attachment descriptors.
308
309        Returns:
310            The generated text if attachment mode succeeded, or ``None``
311            if attachments were skipped or unsupported.
312        """
313        normalized_attachments = self._prepare_csv_attachments(attachments)
314        if not normalized_attachments:
315            return None
316
317        try:
318            content_blocks: list[dict[str, Any]] = [{"type": "text", "text": user_prompt}]
319            for attachment in normalized_attachments:
320                attachment_path = Path(attachment["path"])
321                mime_type = attachment["mime_type"].lower()
322                if mime_type == "application/pdf":
323                    encoded_data = base64.b64encode(attachment_path.read_bytes()).decode("ascii")
324                    content_blocks.append(
325                        {
326                            "type": "document",
327                            "source": {
328                                "type": "base64",
329                                "media_type": "application/pdf",
330                                "data": encoded_data,
331                            },
332                        }
333                    )
334                else:
335                    attachment_name = attachment.get("name", attachment_path.name)
336                    try:
337                        attachment_text = attachment_path.read_text(
338                            encoding="utf-8-sig", errors="replace"
339                        )
340                    except OSError:
341                        continue
342                    content_blocks.append(
343                        {
344                            "type": "text",
345                            "text": (
346                                f"--- BEGIN ATTACHMENT: {attachment_name} ---\n"
347                                f"{attachment_text.rstrip()}\n"
348                                f"--- END ATTACHMENT: {attachment_name} ---"
349                            ),
350                        }
351                    )
352
353            response = self._create_message_with_stream_fallback(
354                system_prompt=system_prompt,
355                messages=[{"role": "user", "content": content_blocks}],
356                max_tokens=max_tokens,
357            )
358            text = _extract_anthropic_text(response)
359            if not text:
360                raise AIProviderError("Claude returned an empty response for file-attachment mode.")
361
362            self._csv_attachment_supported = True
363            return text
364        except Exception as error:
365            if _is_attachment_unsupported_error(error):
366                self._csv_attachment_supported = False
367                logger.info(
368                    "Claude endpoint does not support CSV attachments; "
369                    "falling back to standard text mode."
370                )
371                return None
372            raise
373
374    def _create_message_with_stream_fallback(
375        self,
376        system_prompt: str,
377        messages: list[dict[str, Any]],
378        max_tokens: int,
379    ) -> Any:
380        """Create a Claude message, falling back to streaming for long requests.
381
382        Args:
383            system_prompt: The system-level instruction text.
384            messages: The conversation messages list.
385            max_tokens: Maximum completion tokens.
386
387        Returns:
388            The Anthropic ``Message`` response object.
389        """
390        request_kwargs: dict[str, Any] = {
391            "model": self.model,
392            "max_tokens": max_tokens,
393            "system": system_prompt,
394            "messages": messages,
395        }
396        try:
397            return self._with_token_limit_retry(
398                lambda kw: self.client.messages.create(**kw),
399                request_kwargs,
400            )
401        except ValueError as error:
402            if not _is_anthropic_streaming_required_error(error):
403                raise
404            logger.info(
405                "Claude SDK requires streaming for long request; retrying with messages.stream()."
406            )
407            return self._with_token_limit_retry(
408                lambda kw: self._stream_and_collect(**kw),
409                request_kwargs,
410            )
411
412    def _stream_and_collect(self, **kwargs: Any) -> Any:
413        """Stream a Claude request and return the final message.
414
415        Args:
416            **kwargs: Keyword arguments for ``client.messages.stream``.
417
418        Returns:
419            The final Anthropic ``Message`` response object.
420        """
421        with self.client.messages.stream(**kwargs) as stream:
422            return stream.get_final_message()
423
424    def _with_token_limit_retry(
425        self,
426        create_fn: Callable[[dict[str, Any]], Any],
427        request_kwargs: dict[str, Any],
428    ) -> Any:
429        """Execute a Claude API call with automatic token-limit retry.
430
431        If the initial request is rejected because ``max_tokens`` exceeds
432        the model's supported maximum, retries once with the lower limit
433        extracted from the error message.
434
435        This single method replaces the three near-identical retry methods
436        that existed previously.
437
438        Args:
439            create_fn: A callable that takes the request kwargs dict and
440                performs the API call.
441            request_kwargs: Keyword arguments for the API call.
442
443        Returns:
444            The API response object.
445
446        Raises:
447            anthropic.BadRequestError: If the request fails for a reason
448                other than token limits, or if the retry also fails.
449        """
450        effective_kwargs: dict[str, Any] = dict(request_kwargs)
451        for _ in range(2):
452            try:
453                return create_fn(effective_kwargs)
454            except self._anthropic.BadRequestError as error:
455                requested_tokens = int(effective_kwargs.get("max_tokens", 0))
456                retry_token_count = _resolve_completion_token_retry_limit(
457                    error=error,
458                    requested_tokens=requested_tokens,
459                )
460                if retry_token_count is None:
461                    raise
462                logger.warning(
463                    "Claude rejected max_tokens=%d; retrying with max_tokens=%d.",
464                    requested_tokens,
465                    retry_token_count,
466                )
467                effective_kwargs["max_tokens"] = retry_token_count
468        return create_fn(effective_kwargs)
469
470    def get_model_info(self) -> dict[str, str]:
471        """Return Claude provider and model metadata.
472
473        Returns:
474            A dictionary with ``"provider"`` and ``"model"`` keys.
475        """
476        return {"provider": "claude", "model": self.model}

Anthropic Claude provider implementation.

Supports both synchronous and streaming generation, CSV file attachments via content blocks (base64-encoded PDFs or inline text), and automatic token-limit retry when max_tokens exceeds the model's maximum.

Attributes:
  • api_key (str): The Anthropic API key.
  • model (str): The Claude model identifier.
  • attach_csv_as_file (bool): Whether to upload CSV artifacts as content blocks.
  • request_timeout_seconds (float): HTTP timeout in seconds.
  • client: The anthropic.Anthropic SDK client instance.
ClaudeProvider( api_key: str, model: str = 'claude-opus-4-6', attach_csv_as_file: bool = True, request_timeout_seconds: float = 600.0)
 60    def __init__(
 61        self,
 62        api_key: str,
 63        model: str = DEFAULT_CLAUDE_MODEL,
 64        attach_csv_as_file: bool = True,
 65        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 66    ) -> None:
 67        """Initialize the Claude provider.
 68
 69        Args:
 70            api_key: Anthropic API key. Must be non-empty.
 71            model: Claude model identifier.
 72            attach_csv_as_file: If ``True``, send CSV artifacts as structured
 73                content blocks.
 74            request_timeout_seconds: HTTP timeout in seconds.
 75
 76        Raises:
 77            AIProviderError: If the ``anthropic`` SDK is not installed or
 78                the API key is empty.
 79        """
 80        try:
 81            import anthropic
 82        except ImportError as error:
 83            raise AIProviderError(
 84                "anthropic SDK is not installed. Install it with `pip install anthropic`."
 85            ) from error
 86
 87        normalized_api_key = _normalize_api_key_value(api_key)
 88        if not normalized_api_key:
 89            raise AIProviderError(
 90                "Claude API key is not configured. "
 91                "Set `ai.claude.api_key` in config.yaml or the ANTHROPIC_API_KEY environment variable."
 92            )
 93
 94        self._anthropic = anthropic
 95        self.api_key = normalized_api_key
 96        self.model = model
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self._csv_attachment_supported: bool | None = None
 99        self.request_timeout_seconds = _resolve_timeout_seconds(
100            request_timeout_seconds,
101            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
102        )
103        self.client = anthropic.Anthropic(
104            api_key=normalized_api_key,
105            timeout=self.request_timeout_seconds,
106        )
107        logger.info("Initialized Claude provider with model %s (timeout %.1fs)", model, self.request_timeout_seconds)

Initialize the Claude provider.

Arguments:
  • api_key: Anthropic API key. Must be non-empty.
  • model: Claude model identifier.
  • attach_csv_as_file: If True, send CSV artifacts as structured content blocks.
  • request_timeout_seconds: HTTP timeout in seconds.
Raises:
  • AIProviderError: If the anthropic SDK is not installed or the API key is empty.
api_key
model
attach_csv_as_file
request_timeout_seconds
client
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
109    def analyze(
110        self,
111        system_prompt: str,
112        user_prompt: str,
113        max_tokens: int = DEFAULT_MAX_TOKENS,
114    ) -> str:
115        """Send a prompt to Claude and return the generated text.
116
117        Args:
118            system_prompt: The system-level instruction text.
119            user_prompt: The user-facing prompt with investigation context.
120            max_tokens: Maximum completion tokens.
121
122        Returns:
123            The generated analysis text.
124
125        Raises:
126            AIProviderError: On any API or network failure.
127        """
128        return self.analyze_with_attachments(
129            system_prompt=system_prompt,
130            user_prompt=user_prompt,
131            attachments=None,
132            max_tokens=max_tokens,
133        )

Send a prompt to Claude and return the generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
135    def analyze_stream(
136        self,
137        system_prompt: str,
138        user_prompt: str,
139        max_tokens: int = DEFAULT_MAX_TOKENS,
140    ) -> Iterator[str]:
141        """Stream generated text chunks from Claude.
142
143        Args:
144            system_prompt: The system-level instruction text.
145            user_prompt: The user-facing prompt with investigation context.
146            max_tokens: Maximum completion tokens.
147
148        Yields:
149            Text chunk strings as they are generated.
150
151        Raises:
152            AIProviderError: On empty response or API failure.
153        """
154        def _stream() -> Iterator[str]:
155            request_kwargs: dict[str, Any] = {
156                "model": self.model,
157                "max_tokens": max_tokens,
158                "system": system_prompt,
159                "messages": [{"role": "user", "content": user_prompt}],
160                "stream": True,
161            }
162            try:
163                stream = _run_with_rate_limit_retries(
164                    request_fn=lambda: self._with_token_limit_retry(
165                        lambda kw: self.client.messages.create(**kw),
166                        request_kwargs,
167                    ),
168                    rate_limit_error_type=self._anthropic.RateLimitError,
169                    provider_name="Claude",
170                )
171                emitted = False
172                for event in stream:
173                    chunk_text = _extract_anthropic_stream_text(event)
174                    if not chunk_text:
175                        continue
176                    emitted = True
177                    yield chunk_text
178                if not emitted:
179                    raise AIProviderError("Claude returned an empty response.")
180            except AIProviderError:
181                raise
182            except self._anthropic.APIConnectionError as error:
183                raise AIProviderError(
184                    "Unable to connect to Claude API. Check network access and endpoint configuration."
185                ) from error
186            except self._anthropic.AuthenticationError as error:
187                raise AIProviderError(
188                    "Claude authentication failed. Check `ai.claude.api_key` or ANTHROPIC_API_KEY."
189                ) from error
190            except self._anthropic.BadRequestError as error:
191                if _is_context_length_error(error):
192                    raise AIProviderError(
193                        "Claude request exceeded the model context length. Reduce prompt size and retry."
194                    ) from error
195                raise AIProviderError(f"Claude request was rejected: {error}") from error
196            except self._anthropic.APIError as error:
197                raise AIProviderError(f"Claude API error: {error}") from error
198            except Exception as error:
199                raise AIProviderError(f"Unexpected Claude provider error: {error}") from error
200
201        return _stream()

Stream generated text chunks from Claude.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Yields:

Text chunk strings as they are generated.

Raises:
  • AIProviderError: On empty response or API failure.
def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
203    def analyze_with_attachments(
204        self,
205        system_prompt: str,
206        user_prompt: str,
207        attachments: list[Mapping[str, str]] | None,
208        max_tokens: int = DEFAULT_MAX_TOKENS,
209    ) -> str:
210        """Analyze with optional CSV file attachments via Claude content blocks.
211
212        Args:
213            system_prompt: The system-level instruction text.
214            user_prompt: The user-facing prompt with investigation context.
215            attachments: Optional list of attachment descriptors.
216            max_tokens: Maximum completion tokens.
217
218        Returns:
219            The generated analysis text.
220
221        Raises:
222            AIProviderError: On any API or network failure.
223        """
224        def _request() -> str:
225            attachment_response = self._request_with_csv_attachments(
226                system_prompt=system_prompt,
227                user_prompt=user_prompt,
228                max_tokens=max_tokens,
229                attachments=attachments,
230            )
231            if attachment_response:
232                return attachment_response
233
234            effective_prompt = user_prompt
235            if attachments:
236                effective_prompt, inlined = _inline_attachment_data_into_prompt(
237                    user_prompt=user_prompt,
238                    attachments=attachments,
239                )
240                if inlined:
241                    logger.info("Claude attachment fallback inlined attachment data into prompt.")
242
243            response = self._create_message_with_stream_fallback(
244                system_prompt=system_prompt,
245                messages=[{"role": "user", "content": effective_prompt}],
246                max_tokens=max_tokens,
247            )
248            text = _extract_anthropic_text(response)
249            if not text:
250                raise AIProviderError("Claude returned an empty response.")
251            return text
252
253        return self._run_claude_request(_request)

Analyze with optional CSV file attachments via Claude content blocks.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def get_model_info(self) -> dict[str, str]:
470    def get_model_info(self) -> dict[str, str]:
471        """Return Claude provider and model metadata.
472
473        Returns:
474            A dictionary with ``"provider"`` and ``"model"`` keys.
475        """
476        return {"provider": "claude", "model": self.model}

Return Claude provider and model metadata.

Returns:

A dictionary with "provider" and "model" keys.

class OpenAIProvider(app.ai_providers.AIProvider):
 44class OpenAIProvider(AIProvider):
 45    """OpenAI API provider implementation.
 46
 47    Attributes:
 48        api_key (str): The OpenAI API key.
 49        model (str): The OpenAI model identifier.
 50        attach_csv_as_file (bool): Whether to upload CSV artifacts as
 51            file attachments via the Responses API.
 52        request_timeout_seconds (float): HTTP timeout in seconds.
 53        client: The ``openai.OpenAI`` SDK client instance.
 54    """
 55
 56    def __init__(
 57        self,
 58        api_key: str,
 59        model: str = DEFAULT_OPENAI_MODEL,
 60        attach_csv_as_file: bool = True,
 61        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 62    ) -> None:
 63        """Initialize the OpenAI provider.
 64
 65        Args:
 66            api_key: OpenAI API key. Must be non-empty.
 67            model: OpenAI model identifier.
 68            attach_csv_as_file: If ``True``, attempt file uploads via
 69                the Responses API.
 70            request_timeout_seconds: HTTP timeout in seconds.
 71
 72        Raises:
 73            AIProviderError: If the ``openai`` SDK is not installed or
 74                the API key is empty.
 75        """
 76        try:
 77            import openai
 78        except ImportError as error:
 79            raise AIProviderError(
 80                "openai SDK is not installed. Install it with `pip install openai`."
 81            ) from error
 82
 83        normalized_api_key = _normalize_api_key_value(api_key)
 84        if not normalized_api_key:
 85            raise AIProviderError(
 86                "OpenAI API key is not configured. "
 87                "Set `ai.openai.api_key` in config.yaml or the OPENAI_API_KEY environment variable."
 88            )
 89
 90        self._openai = openai
 91        self.api_key = normalized_api_key
 92        self.model = model
 93        self.attach_csv_as_file = bool(attach_csv_as_file)
 94        self._csv_attachment_supported: bool | None = None
 95        self.request_timeout_seconds = _resolve_timeout_seconds(
 96            request_timeout_seconds,
 97            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 98        )
 99        self.client = openai.OpenAI(
100            api_key=normalized_api_key,
101            timeout=self.request_timeout_seconds,
102        )
103        logger.info("Initialized OpenAI provider with model %s (timeout %.1fs)", model, self.request_timeout_seconds)
104
105    def analyze(
106        self,
107        system_prompt: str,
108        user_prompt: str,
109        max_tokens: int = DEFAULT_MAX_TOKENS,
110    ) -> str:
111        """Send a prompt to OpenAI and return the generated text.
112
113        Args:
114            system_prompt: The system-level instruction text.
115            user_prompt: The user-facing prompt with investigation context.
116            max_tokens: Maximum completion tokens.
117
118        Returns:
119            The generated analysis text.
120
121        Raises:
122            AIProviderError: On any API or network failure.
123        """
124        return self.analyze_with_attachments(
125            system_prompt=system_prompt,
126            user_prompt=user_prompt,
127            attachments=None,
128            max_tokens=max_tokens,
129        )
130
131    def analyze_stream(
132        self,
133        system_prompt: str,
134        user_prompt: str,
135        max_tokens: int = DEFAULT_MAX_TOKENS,
136    ) -> Iterator[str]:
137        """Stream generated text chunks from OpenAI.
138
139        Args:
140            system_prompt: The system-level instruction text.
141            user_prompt: The user-facing prompt with investigation context.
142            max_tokens: Maximum completion tokens.
143
144        Yields:
145            Text chunk strings as they are generated.
146
147        Raises:
148            AIProviderError: On empty response or API failure.
149        """
150        def _stream() -> Iterator[str]:
151            messages = [
152                {"role": "system", "content": system_prompt},
153                {"role": "user", "content": user_prompt},
154            ]
155            stream = self._run_openai_request(
156                lambda: self._create_chat_completion(
157                    messages=messages,
158                    max_tokens=max_tokens,
159                    stream=True,
160                )
161            )
162            emitted = False
163            try:
164                for chunk in stream:
165                    choices = getattr(chunk, "choices", None)
166                    if not choices:
167                        continue
168                    choice = choices[0]
169                    delta = getattr(choice, "delta", None)
170                    if delta is None and isinstance(choice, dict):
171                        delta = choice.get("delta")
172                    chunk_text = _extract_openai_delta_text(
173                        delta,
174                        ("content", "reasoning_content", "reasoning", "refusal"),
175                    )
176                    if not chunk_text:
177                        continue
178                    emitted = True
179                    yield chunk_text
180            except AIProviderError:
181                raise
182            except self._openai.APIConnectionError as error:
183                raise AIProviderError(
184                    "Unable to connect to OpenAI API. Check network access and endpoint configuration."
185                ) from error
186            except self._openai.AuthenticationError as error:
187                raise AIProviderError(
188                    "OpenAI authentication failed. Check `ai.openai.api_key` or OPENAI_API_KEY."
189                ) from error
190            except self._openai.BadRequestError as error:
191                if _is_context_length_error(error):
192                    raise AIProviderError(
193                        "OpenAI request exceeded the model context length. Reduce prompt size and retry."
194                    ) from error
195                raise AIProviderError(f"OpenAI request was rejected: {error}") from error
196            except self._openai.APIError as error:
197                raise AIProviderError(f"OpenAI API error: {error}") from error
198            except Exception as error:
199                raise AIProviderError(f"Unexpected OpenAI provider error: {error}") from error
200
201            if not emitted:
202                raise AIProviderError("OpenAI returned an empty response.")
203
204        return _stream()
205
206    def analyze_with_attachments(
207        self,
208        system_prompt: str,
209        user_prompt: str,
210        attachments: list[Mapping[str, str]] | None,
211        max_tokens: int = DEFAULT_MAX_TOKENS,
212    ) -> str:
213        """Analyze with optional CSV file attachments via the Responses API.
214
215        Args:
216            system_prompt: The system-level instruction text.
217            user_prompt: The user-facing prompt with investigation context.
218            attachments: Optional list of attachment descriptors.
219            max_tokens: Maximum completion tokens.
220
221        Returns:
222            The generated analysis text.
223
224        Raises:
225            AIProviderError: On any API or network failure.
226        """
227        def _request() -> str:
228            return self._request_non_stream(
229                system_prompt=system_prompt,
230                user_prompt=user_prompt,
231                max_tokens=max_tokens,
232                attachments=attachments,
233            )
234
235        return self._run_openai_request(_request)
236
237    def _run_openai_request(self, request_fn: Callable[[], _T]) -> _T:
238        """Execute an OpenAI request with rate-limit retries and error mapping.
239
240        Args:
241            request_fn: A zero-argument callable that performs the request.
242
243        Returns:
244            The return value of ``request_fn`` on success.
245
246        Raises:
247            AIProviderError: On any OpenAI SDK error.
248        """
249        try:
250            return _run_with_rate_limit_retries(
251                request_fn=request_fn,
252                rate_limit_error_type=self._openai.RateLimitError,
253                provider_name="OpenAI",
254            )
255        except AIProviderError:
256            raise
257        except self._openai.APIConnectionError as error:
258            raise AIProviderError(
259                "Unable to connect to OpenAI API. Check network access and endpoint configuration."
260            ) from error
261        except self._openai.AuthenticationError as error:
262            raise AIProviderError(
263                "OpenAI authentication failed. Check `ai.openai.api_key` or OPENAI_API_KEY."
264            ) from error
265        except self._openai.BadRequestError as error:
266            if _is_context_length_error(error):
267                raise AIProviderError(
268                    "OpenAI request exceeded the model context length. Reduce prompt size and retry."
269                ) from error
270            raise AIProviderError(f"OpenAI request was rejected: {error}") from error
271        except self._openai.APIError as error:
272            raise AIProviderError(f"OpenAI API error: {error}") from error
273        except Exception as error:
274            raise AIProviderError(f"Unexpected OpenAI provider error: {error}") from error
275
276    def _request_non_stream(
277        self,
278        system_prompt: str,
279        user_prompt: str,
280        max_tokens: int,
281        attachments: list[Mapping[str, str]] | None = None,
282    ) -> str:
283        """Perform a non-streaming OpenAI request with attachment handling.
284
285        Tries file-attachment mode first, then falls back to inlining
286        attachment data, and finally issues a plain Chat Completions request.
287
288        Args:
289            system_prompt: The system-level instruction text.
290            user_prompt: The user-facing prompt text.
291            max_tokens: Maximum completion tokens.
292            attachments: Optional list of attachment descriptors.
293
294        Returns:
295            The generated analysis text.
296
297        Raises:
298            AIProviderError: If the response is empty.
299        """
300        attachment_response = self._request_with_csv_attachments(
301            system_prompt=system_prompt,
302            user_prompt=user_prompt,
303            max_tokens=max_tokens,
304            attachments=attachments,
305        )
306        if attachment_response:
307            return attachment_response
308
309        prompt_for_completion = user_prompt
310        if attachments:
311            prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt(
312                user_prompt=user_prompt,
313                attachments=attachments,
314            )
315            if inlined_attachment_data:
316                logger.info("OpenAI attachment fallback inlined attachment data into prompt.")
317
318        messages = [
319            {"role": "system", "content": system_prompt},
320            {"role": "user", "content": prompt_for_completion},
321        ]
322        response = self._create_chat_completion(
323            messages=messages,
324            max_tokens=max_tokens,
325        )
326        text = _extract_openai_text(response)
327        if not text:
328            raise AIProviderError("OpenAI returned an empty response.")
329        return text
330
331    def _create_chat_completion(
332        self,
333        messages: list[dict[str, str]],
334        max_tokens: int,
335        stream: bool = False,
336    ) -> Any:
337        """Create a Chat Completions request with token parameter fallback.
338
339        Tries ``max_completion_tokens`` first, then falls back to
340        ``max_tokens`` if the endpoint reports the parameter as unsupported.
341        Also retries with a reduced token count when the provider rejects
342        the requested maximum.
343
344        Args:
345            messages: The conversation messages list.
346            max_tokens: Maximum completion tokens.
347            stream: If ``True``, return a streaming response iterator.
348
349        Returns:
350            The OpenAI ``ChatCompletion`` response or streaming iterator.
351        """
352        def _create_with_token_parameter(token_parameter: str, token_count: int) -> Any:
353            """Try creating with a specific token parameter, retrying on token limit."""
354            request_kwargs: dict[str, Any] = {
355                "model": self.model,
356                "messages": messages,
357                token_parameter: token_count,
358            }
359            if stream:
360                request_kwargs["stream"] = True
361            try:
362                return self.client.chat.completions.create(**request_kwargs)
363            except self._openai.BadRequestError as error:
364                retry_token_count = _resolve_completion_token_retry_limit(
365                    error=error,
366                    requested_tokens=token_count,
367                )
368                if retry_token_count is None:
369                    raise
370                logger.warning(
371                    "OpenAI rejected %s=%d; retrying with %s=%d.",
372                    token_parameter,
373                    token_count,
374                    token_parameter,
375                    retry_token_count,
376                )
377                request_kwargs[token_parameter] = retry_token_count
378                return self.client.chat.completions.create(**request_kwargs)
379
380        try:
381            return _create_with_token_parameter(
382                token_parameter="max_completion_tokens",
383                token_count=max_tokens,
384            )
385        except self._openai.BadRequestError as error:
386            if not _is_unsupported_parameter_error(error, "max_completion_tokens"):
387                raise
388            return _create_with_token_parameter(
389                token_parameter="max_tokens",
390                token_count=max_tokens,
391            )
392
393    def _request_with_csv_attachments(
394        self,
395        system_prompt: str,
396        user_prompt: str,
397        max_tokens: int,
398        attachments: list[Mapping[str, str]] | None,
399    ) -> str | None:
400        """Attempt to send a request with CSV files via the Responses API.
401
402        Args:
403            system_prompt: The system-level instruction text.
404            user_prompt: The user-facing prompt text.
405            max_tokens: Maximum completion tokens.
406            attachments: Optional list of attachment descriptors.
407
408        Returns:
409            The generated text if succeeded, or ``None`` if skipped.
410        """
411        normalized_attachments = self._prepare_csv_attachments(
412            attachments,
413            supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"),
414        )
415        if not normalized_attachments:
416            return None
417
418        try:
419            text = upload_and_request_via_responses_api(
420                client=self.client,
421                openai_module=self._openai,
422                model=self.model,
423                normalized_attachments=normalized_attachments,
424                system_prompt=system_prompt,
425                user_prompt=user_prompt,
426                max_tokens=max_tokens,
427                provider_name="OpenAI",
428                upload_purpose="assistants",
429                convert_csv_to_txt=True,
430            )
431            self._csv_attachment_supported = True
432            return text
433        except Exception as error:
434            if _is_attachment_unsupported_error(error):
435                self._csv_attachment_supported = False
436                logger.info(
437                    "OpenAI endpoint does not support CSV attachments via /files + /responses; "
438                    "falling back to chat.completions text mode."
439                )
440                return None
441            raise
442
443    def get_model_info(self) -> dict[str, str]:
444        """Return OpenAI provider and model metadata.
445
446        Returns:
447            A dictionary with ``"provider"`` and ``"model"`` keys.
448        """
449        return {"provider": "openai", "model": self.model}

OpenAI API provider implementation.

Attributes:
  • api_key (str): The OpenAI API key.
  • model (str): The OpenAI model identifier.
  • attach_csv_as_file (bool): Whether to upload CSV artifacts as file attachments via the Responses API.
  • request_timeout_seconds (float): HTTP timeout in seconds.
  • client: The openai.OpenAI SDK client instance.
OpenAIProvider( api_key: str, model: str = 'gpt-5.4', attach_csv_as_file: bool = True, request_timeout_seconds: float = 600.0)
 56    def __init__(
 57        self,
 58        api_key: str,
 59        model: str = DEFAULT_OPENAI_MODEL,
 60        attach_csv_as_file: bool = True,
 61        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 62    ) -> None:
 63        """Initialize the OpenAI provider.
 64
 65        Args:
 66            api_key: OpenAI API key. Must be non-empty.
 67            model: OpenAI model identifier.
 68            attach_csv_as_file: If ``True``, attempt file uploads via
 69                the Responses API.
 70            request_timeout_seconds: HTTP timeout in seconds.
 71
 72        Raises:
 73            AIProviderError: If the ``openai`` SDK is not installed or
 74                the API key is empty.
 75        """
 76        try:
 77            import openai
 78        except ImportError as error:
 79            raise AIProviderError(
 80                "openai SDK is not installed. Install it with `pip install openai`."
 81            ) from error
 82
 83        normalized_api_key = _normalize_api_key_value(api_key)
 84        if not normalized_api_key:
 85            raise AIProviderError(
 86                "OpenAI API key is not configured. "
 87                "Set `ai.openai.api_key` in config.yaml or the OPENAI_API_KEY environment variable."
 88            )
 89
 90        self._openai = openai
 91        self.api_key = normalized_api_key
 92        self.model = model
 93        self.attach_csv_as_file = bool(attach_csv_as_file)
 94        self._csv_attachment_supported: bool | None = None
 95        self.request_timeout_seconds = _resolve_timeout_seconds(
 96            request_timeout_seconds,
 97            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 98        )
 99        self.client = openai.OpenAI(
100            api_key=normalized_api_key,
101            timeout=self.request_timeout_seconds,
102        )
103        logger.info("Initialized OpenAI provider with model %s (timeout %.1fs)", model, self.request_timeout_seconds)

Initialize the OpenAI provider.

Arguments:
  • api_key: OpenAI API key. Must be non-empty.
  • model: OpenAI model identifier.
  • attach_csv_as_file: If True, attempt file uploads via the Responses API.
  • request_timeout_seconds: HTTP timeout in seconds.
Raises:
  • AIProviderError: If the openai SDK is not installed or the API key is empty.
api_key
model
attach_csv_as_file
request_timeout_seconds
client
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
105    def analyze(
106        self,
107        system_prompt: str,
108        user_prompt: str,
109        max_tokens: int = DEFAULT_MAX_TOKENS,
110    ) -> str:
111        """Send a prompt to OpenAI and return the generated text.
112
113        Args:
114            system_prompt: The system-level instruction text.
115            user_prompt: The user-facing prompt with investigation context.
116            max_tokens: Maximum completion tokens.
117
118        Returns:
119            The generated analysis text.
120
121        Raises:
122            AIProviderError: On any API or network failure.
123        """
124        return self.analyze_with_attachments(
125            system_prompt=system_prompt,
126            user_prompt=user_prompt,
127            attachments=None,
128            max_tokens=max_tokens,
129        )

Send a prompt to OpenAI and return the generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
131    def analyze_stream(
132        self,
133        system_prompt: str,
134        user_prompt: str,
135        max_tokens: int = DEFAULT_MAX_TOKENS,
136    ) -> Iterator[str]:
137        """Stream generated text chunks from OpenAI.
138
139        Args:
140            system_prompt: The system-level instruction text.
141            user_prompt: The user-facing prompt with investigation context.
142            max_tokens: Maximum completion tokens.
143
144        Yields:
145            Text chunk strings as they are generated.
146
147        Raises:
148            AIProviderError: On empty response or API failure.
149        """
150        def _stream() -> Iterator[str]:
151            messages = [
152                {"role": "system", "content": system_prompt},
153                {"role": "user", "content": user_prompt},
154            ]
155            stream = self._run_openai_request(
156                lambda: self._create_chat_completion(
157                    messages=messages,
158                    max_tokens=max_tokens,
159                    stream=True,
160                )
161            )
162            emitted = False
163            try:
164                for chunk in stream:
165                    choices = getattr(chunk, "choices", None)
166                    if not choices:
167                        continue
168                    choice = choices[0]
169                    delta = getattr(choice, "delta", None)
170                    if delta is None and isinstance(choice, dict):
171                        delta = choice.get("delta")
172                    chunk_text = _extract_openai_delta_text(
173                        delta,
174                        ("content", "reasoning_content", "reasoning", "refusal"),
175                    )
176                    if not chunk_text:
177                        continue
178                    emitted = True
179                    yield chunk_text
180            except AIProviderError:
181                raise
182            except self._openai.APIConnectionError as error:
183                raise AIProviderError(
184                    "Unable to connect to OpenAI API. Check network access and endpoint configuration."
185                ) from error
186            except self._openai.AuthenticationError as error:
187                raise AIProviderError(
188                    "OpenAI authentication failed. Check `ai.openai.api_key` or OPENAI_API_KEY."
189                ) from error
190            except self._openai.BadRequestError as error:
191                if _is_context_length_error(error):
192                    raise AIProviderError(
193                        "OpenAI request exceeded the model context length. Reduce prompt size and retry."
194                    ) from error
195                raise AIProviderError(f"OpenAI request was rejected: {error}") from error
196            except self._openai.APIError as error:
197                raise AIProviderError(f"OpenAI API error: {error}") from error
198            except Exception as error:
199                raise AIProviderError(f"Unexpected OpenAI provider error: {error}") from error
200
201            if not emitted:
202                raise AIProviderError("OpenAI returned an empty response.")
203
204        return _stream()

Stream generated text chunks from OpenAI.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Yields:

Text chunk strings as they are generated.

Raises:
  • AIProviderError: On empty response or API failure.
def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
206    def analyze_with_attachments(
207        self,
208        system_prompt: str,
209        user_prompt: str,
210        attachments: list[Mapping[str, str]] | None,
211        max_tokens: int = DEFAULT_MAX_TOKENS,
212    ) -> str:
213        """Analyze with optional CSV file attachments via the Responses API.
214
215        Args:
216            system_prompt: The system-level instruction text.
217            user_prompt: The user-facing prompt with investigation context.
218            attachments: Optional list of attachment descriptors.
219            max_tokens: Maximum completion tokens.
220
221        Returns:
222            The generated analysis text.
223
224        Raises:
225            AIProviderError: On any API or network failure.
226        """
227        def _request() -> str:
228            return self._request_non_stream(
229                system_prompt=system_prompt,
230                user_prompt=user_prompt,
231                max_tokens=max_tokens,
232                attachments=attachments,
233            )
234
235        return self._run_openai_request(_request)

Analyze with optional CSV file attachments via the Responses API.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def get_model_info(self) -> dict[str, str]:
443    def get_model_info(self) -> dict[str, str]:
444        """Return OpenAI provider and model metadata.
445
446        Returns:
447            A dictionary with ``"provider"`` and ``"model"`` keys.
448        """
449        return {"provider": "openai", "model": self.model}

Return OpenAI provider and model metadata.

Returns:

A dictionary with "provider" and "model" keys.

class KimiProvider(app.ai_providers.AIProvider):
 46class KimiProvider(AIProvider):
 47    """Moonshot Kimi API provider implementation.
 48
 49    Attributes:
 50        api_key (str): The Moonshot/Kimi API key.
 51        model (str): The Kimi model identifier.
 52        base_url (str): The normalized Kimi API base URL.
 53        attach_csv_as_file (bool): Whether to upload CSV artifacts as
 54            file attachments.
 55        request_timeout_seconds (float): HTTP timeout in seconds.
 56        client: The ``openai.OpenAI`` SDK client instance configured for Kimi.
 57    """
 58
 59    def __init__(
 60        self,
 61        api_key: str,
 62        model: str = DEFAULT_KIMI_MODEL,
 63        base_url: str = DEFAULT_KIMI_BASE_URL,
 64        attach_csv_as_file: bool = True,
 65        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 66    ) -> None:
 67        """Initialize the Kimi provider.
 68
 69        Args:
 70            api_key: Moonshot/Kimi API key. Must be non-empty.
 71            model: Kimi model identifier. Deprecated aliases are mapped.
 72            base_url: Kimi API base URL.
 73            attach_csv_as_file: If ``True``, attempt file uploads.
 74            request_timeout_seconds: HTTP timeout in seconds.
 75
 76        Raises:
 77            AIProviderError: If the ``openai`` SDK is not installed or
 78                the API key is empty.
 79        """
 80        try:
 81            import openai
 82        except ImportError as error:
 83            raise AIProviderError(
 84                "openai SDK is not installed. Install it with `pip install openai`."
 85            ) from error
 86
 87        normalized_api_key = _normalize_api_key_value(api_key)
 88        if not normalized_api_key:
 89            raise AIProviderError(
 90                "Kimi API key is not configured. "
 91                "Set `ai.kimi.api_key` in config.yaml or the MOONSHOT_API_KEY environment variable."
 92            )
 93
 94        self._openai = openai
 95        self.api_key = normalized_api_key
 96        self.model = _normalize_kimi_model_name(model)
 97        self.base_url = _normalize_openai_compatible_base_url(
 98            base_url=base_url,
 99            default_base_url=DEFAULT_KIMI_BASE_URL,
100        )
101        self.attach_csv_as_file = bool(attach_csv_as_file)
102        self._csv_attachment_supported: bool | None = None
103        self.request_timeout_seconds = _resolve_timeout_seconds(
104            request_timeout_seconds,
105            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
106        )
107        self.client = openai.OpenAI(
108            api_key=normalized_api_key,
109            base_url=self.base_url,
110            timeout=self.request_timeout_seconds,
111        )
112        logger.info("Initialized Kimi provider at %s with model %s (timeout %.1fs)", self.base_url, self.model, self.request_timeout_seconds)
113
114    def analyze(
115        self,
116        system_prompt: str,
117        user_prompt: str,
118        max_tokens: int = DEFAULT_MAX_TOKENS,
119    ) -> str:
120        """Send a prompt to Kimi and return the generated text.
121
122        Args:
123            system_prompt: The system-level instruction text.
124            user_prompt: The user-facing prompt with investigation context.
125            max_tokens: Maximum completion tokens.
126
127        Returns:
128            The generated analysis text.
129
130        Raises:
131            AIProviderError: On any API or network failure.
132        """
133        return self.analyze_with_attachments(
134            system_prompt=system_prompt,
135            user_prompt=user_prompt,
136            attachments=None,
137            max_tokens=max_tokens,
138        )
139
140    def analyze_stream(
141        self,
142        system_prompt: str,
143        user_prompt: str,
144        max_tokens: int = DEFAULT_MAX_TOKENS,
145    ) -> Iterator[str]:
146        """Stream generated text chunks from Kimi.
147
148        Args:
149            system_prompt: The system-level instruction text.
150            user_prompt: The user-facing prompt with investigation context.
151            max_tokens: Maximum completion tokens.
152
153        Yields:
154            Text chunk strings as they are generated.
155
156        Raises:
157            AIProviderError: On empty response or API failure.
158        """
159        def _stream() -> Iterator[str]:
160            stream = self._run_kimi_request(
161                lambda: self.client.chat.completions.create(
162                    model=self.model,
163                    max_tokens=max_tokens,
164                    messages=[
165                        {"role": "system", "content": system_prompt},
166                        {"role": "user", "content": user_prompt},
167                    ],
168                    stream=True,
169                )
170            )
171            emitted = False
172            try:
173                for chunk in stream:
174                    choices = getattr(chunk, "choices", None)
175                    if not choices:
176                        continue
177                    choice = choices[0]
178                    delta = getattr(choice, "delta", None)
179                    if delta is None and isinstance(choice, dict):
180                        delta = choice.get("delta")
181                    chunk_text = _extract_openai_delta_text(
182                        delta,
183                        ("content", "reasoning_content", "reasoning", "refusal"),
184                    )
185                    if not chunk_text:
186                        continue
187                    emitted = True
188                    yield chunk_text
189            except AIProviderError:
190                raise
191            except self._openai.APIConnectionError as error:
192                raise AIProviderError(
193                    "Unable to connect to Kimi API. Check `ai.kimi.base_url` and network access."
194                ) from error
195            except self._openai.AuthenticationError as error:
196                raise AIProviderError(
197                    "Kimi authentication failed. Check `ai.kimi.api_key`, MOONSHOT_API_KEY, or KIMI_API_KEY."
198                ) from error
199            except self._openai.BadRequestError as error:
200                if _is_context_length_error(error):
201                    raise AIProviderError(
202                        "Kimi request exceeded the model context length. Reduce prompt size and retry."
203                    ) from error
204                raise AIProviderError(f"Kimi request was rejected: {error}") from error
205            except self._openai.APIError as error:
206                if _is_kimi_model_not_available_error(error):
207                    raise AIProviderError(
208                        "Kimi rejected the configured model. "
209                        f"Current model: `{self.model}`. "
210                        "Set `ai.kimi.model` to a model enabled for your Moonshot account "
211                        "(for example `kimi-k2-turbo-preview`) and retry."
212                    ) from error
213                raise AIProviderError(f"Kimi API error: {error}") from error
214            except Exception as error:
215                raise AIProviderError(f"Unexpected Kimi provider error: {error}") from error
216
217            if not emitted:
218                raise AIProviderError("Kimi returned an empty response.")
219
220        return _stream()
221
222    def analyze_with_attachments(
223        self,
224        system_prompt: str,
225        user_prompt: str,
226        attachments: list[Mapping[str, str]] | None,
227        max_tokens: int = DEFAULT_MAX_TOKENS,
228    ) -> str:
229        """Analyze with optional CSV file attachments via the Kimi Responses API.
230
231        Args:
232            system_prompt: The system-level instruction text.
233            user_prompt: The user-facing prompt with investigation context.
234            attachments: Optional list of attachment descriptors.
235            max_tokens: Maximum completion tokens.
236
237        Returns:
238            The generated analysis text.
239
240        Raises:
241            AIProviderError: On any API or network failure.
242        """
243        def _request() -> str:
244            return self._request_non_stream(
245                system_prompt=system_prompt,
246                user_prompt=user_prompt,
247                max_tokens=max_tokens,
248                attachments=attachments,
249            )
250
251        return self._run_kimi_request(_request)
252
253    def _run_kimi_request(self, request_fn: Callable[[], _T]) -> _T:
254        """Execute a Kimi request with rate-limit retries and error mapping.
255
256        Args:
257            request_fn: A zero-argument callable that performs the request.
258
259        Returns:
260            The return value of ``request_fn`` on success.
261
262        Raises:
263            AIProviderError: On any OpenAI SDK error (with Kimi messages).
264        """
265        try:
266            return _run_with_rate_limit_retries(
267                request_fn=request_fn,
268                rate_limit_error_type=self._openai.RateLimitError,
269                provider_name="Kimi",
270            )
271        except AIProviderError:
272            raise
273        except self._openai.APIConnectionError as error:
274            raise AIProviderError(
275                "Unable to connect to Kimi API. Check `ai.kimi.base_url` and network access."
276            ) from error
277        except self._openai.AuthenticationError as error:
278            raise AIProviderError(
279                "Kimi authentication failed. Check `ai.kimi.api_key`, MOONSHOT_API_KEY, or KIMI_API_KEY."
280            ) from error
281        except self._openai.BadRequestError as error:
282            if _is_context_length_error(error):
283                raise AIProviderError(
284                    "Kimi request exceeded the model context length. Reduce prompt size and retry."
285                ) from error
286            raise AIProviderError(f"Kimi request was rejected: {error}") from error
287        except self._openai.APIError as error:
288            if _is_kimi_model_not_available_error(error):
289                raise AIProviderError(
290                    "Kimi rejected the configured model. "
291                    f"Current model: `{self.model}`. "
292                    "Set `ai.kimi.model` to a model enabled for your Moonshot account "
293                    "(for example `kimi-k2-turbo-preview`) and retry."
294                ) from error
295            raise AIProviderError(f"Kimi API error: {error}") from error
296        except Exception as error:
297            raise AIProviderError(f"Unexpected Kimi provider error: {error}") from error
298
299    def _request_non_stream(
300        self,
301        system_prompt: str,
302        user_prompt: str,
303        max_tokens: int,
304        attachments: list[Mapping[str, str]] | None = None,
305    ) -> str:
306        """Perform a non-streaming Kimi request with attachment handling.
307
308        Args:
309            system_prompt: The system-level instruction text.
310            user_prompt: The user-facing prompt text.
311            max_tokens: Maximum completion tokens.
312            attachments: Optional list of attachment descriptors.
313
314        Returns:
315            The generated analysis text.
316
317        Raises:
318            AIProviderError: If the response is empty.
319        """
320        attachment_response = self._request_with_csv_attachments(
321            system_prompt=system_prompt,
322            user_prompt=user_prompt,
323            max_tokens=max_tokens,
324            attachments=attachments,
325        )
326        if attachment_response:
327            return attachment_response
328
329        prompt_for_completion = user_prompt
330        if attachments:
331            prompt_for_completion, inlined = _inline_attachment_data_into_prompt(
332                user_prompt=user_prompt,
333                attachments=attachments,
334            )
335            if inlined:
336                logger.info("Kimi attachment fallback inlined attachment data into prompt.")
337
338        response = self.client.chat.completions.create(
339            model=self.model,
340            max_tokens=max_tokens,
341            messages=[
342                {"role": "system", "content": system_prompt},
343                {"role": "user", "content": prompt_for_completion},
344            ],
345        )
346        text = _extract_openai_text(response)
347        if not text:
348            raise AIProviderError("Kimi returned an empty response.")
349        return text
350
351    def _request_with_csv_attachments(
352        self,
353        system_prompt: str,
354        user_prompt: str,
355        max_tokens: int,
356        attachments: list[Mapping[str, str]] | None,
357    ) -> str | None:
358        """Attempt to send a request with CSV files via the Kimi Responses API.
359
360        Args:
361            system_prompt: The system-level instruction text.
362            user_prompt: The user-facing prompt text.
363            max_tokens: Maximum completion tokens.
364            attachments: Optional list of attachment descriptors.
365
366        Returns:
367            The generated text if succeeded, or ``None`` if skipped.
368        """
369        normalized_attachments = self._prepare_csv_attachments(
370            attachments,
371            supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"),
372        )
373        if not normalized_attachments:
374            return None
375
376        try:
377            text = upload_and_request_via_responses_api(
378                client=self.client,
379                openai_module=self._openai,
380                model=self.model,
381                normalized_attachments=normalized_attachments,
382                system_prompt=system_prompt,
383                user_prompt=user_prompt,
384                max_tokens=max_tokens,
385                provider_name="Kimi",
386                upload_purpose=DEFAULT_KIMI_FILE_UPLOAD_PURPOSE,
387                convert_csv_to_txt=False,
388            )
389            self._csv_attachment_supported = True
390            return text
391        except Exception as error:
392            if _is_attachment_unsupported_error(error):
393                self._csv_attachment_supported = False
394                logger.info(
395                    "Kimi endpoint does not support CSV attachments via /files + /responses; "
396                    "falling back to chat.completions text mode."
397                )
398                return None
399            raise
400
401    def get_model_info(self) -> dict[str, str]:
402        """Return Kimi provider and model metadata.
403
404        Returns:
405            A dictionary with ``"provider"`` and ``"model"`` keys.
406        """
407        return {"provider": "kimi", "model": self.model}

Moonshot Kimi API provider implementation.

Attributes:
  • api_key (str): The Moonshot/Kimi API key.
  • model (str): The Kimi model identifier.
  • base_url (str): The normalized Kimi API base URL.
  • attach_csv_as_file (bool): Whether to upload CSV artifacts as file attachments.
  • request_timeout_seconds (float): HTTP timeout in seconds.
  • client: The openai.OpenAI SDK client instance configured for Kimi.
KimiProvider( api_key: str, model: str = 'kimi-k2-turbo-preview', base_url: str = 'https://api.moonshot.ai/v1', attach_csv_as_file: bool = True, request_timeout_seconds: float = 600.0)
 59    def __init__(
 60        self,
 61        api_key: str,
 62        model: str = DEFAULT_KIMI_MODEL,
 63        base_url: str = DEFAULT_KIMI_BASE_URL,
 64        attach_csv_as_file: bool = True,
 65        request_timeout_seconds: float = DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
 66    ) -> None:
 67        """Initialize the Kimi provider.
 68
 69        Args:
 70            api_key: Moonshot/Kimi API key. Must be non-empty.
 71            model: Kimi model identifier. Deprecated aliases are mapped.
 72            base_url: Kimi API base URL.
 73            attach_csv_as_file: If ``True``, attempt file uploads.
 74            request_timeout_seconds: HTTP timeout in seconds.
 75
 76        Raises:
 77            AIProviderError: If the ``openai`` SDK is not installed or
 78                the API key is empty.
 79        """
 80        try:
 81            import openai
 82        except ImportError as error:
 83            raise AIProviderError(
 84                "openai SDK is not installed. Install it with `pip install openai`."
 85            ) from error
 86
 87        normalized_api_key = _normalize_api_key_value(api_key)
 88        if not normalized_api_key:
 89            raise AIProviderError(
 90                "Kimi API key is not configured. "
 91                "Set `ai.kimi.api_key` in config.yaml or the MOONSHOT_API_KEY environment variable."
 92            )
 93
 94        self._openai = openai
 95        self.api_key = normalized_api_key
 96        self.model = _normalize_kimi_model_name(model)
 97        self.base_url = _normalize_openai_compatible_base_url(
 98            base_url=base_url,
 99            default_base_url=DEFAULT_KIMI_BASE_URL,
100        )
101        self.attach_csv_as_file = bool(attach_csv_as_file)
102        self._csv_attachment_supported: bool | None = None
103        self.request_timeout_seconds = _resolve_timeout_seconds(
104            request_timeout_seconds,
105            DEFAULT_CLOUD_REQUEST_TIMEOUT_SECONDS,
106        )
107        self.client = openai.OpenAI(
108            api_key=normalized_api_key,
109            base_url=self.base_url,
110            timeout=self.request_timeout_seconds,
111        )
112        logger.info("Initialized Kimi provider at %s with model %s (timeout %.1fs)", self.base_url, self.model, self.request_timeout_seconds)

Initialize the Kimi provider.

Arguments:
  • api_key: Moonshot/Kimi API key. Must be non-empty.
  • model: Kimi model identifier. Deprecated aliases are mapped.
  • base_url: Kimi API base URL.
  • attach_csv_as_file: If True, attempt file uploads.
  • request_timeout_seconds: HTTP timeout in seconds.
Raises:
  • AIProviderError: If the openai SDK is not installed or the API key is empty.
api_key
model
base_url
attach_csv_as_file
request_timeout_seconds
client
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
114    def analyze(
115        self,
116        system_prompt: str,
117        user_prompt: str,
118        max_tokens: int = DEFAULT_MAX_TOKENS,
119    ) -> str:
120        """Send a prompt to Kimi and return the generated text.
121
122        Args:
123            system_prompt: The system-level instruction text.
124            user_prompt: The user-facing prompt with investigation context.
125            max_tokens: Maximum completion tokens.
126
127        Returns:
128            The generated analysis text.
129
130        Raises:
131            AIProviderError: On any API or network failure.
132        """
133        return self.analyze_with_attachments(
134            system_prompt=system_prompt,
135            user_prompt=user_prompt,
136            attachments=None,
137            max_tokens=max_tokens,
138        )

Send a prompt to Kimi and return the generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
140    def analyze_stream(
141        self,
142        system_prompt: str,
143        user_prompt: str,
144        max_tokens: int = DEFAULT_MAX_TOKENS,
145    ) -> Iterator[str]:
146        """Stream generated text chunks from Kimi.
147
148        Args:
149            system_prompt: The system-level instruction text.
150            user_prompt: The user-facing prompt with investigation context.
151            max_tokens: Maximum completion tokens.
152
153        Yields:
154            Text chunk strings as they are generated.
155
156        Raises:
157            AIProviderError: On empty response or API failure.
158        """
159        def _stream() -> Iterator[str]:
160            stream = self._run_kimi_request(
161                lambda: self.client.chat.completions.create(
162                    model=self.model,
163                    max_tokens=max_tokens,
164                    messages=[
165                        {"role": "system", "content": system_prompt},
166                        {"role": "user", "content": user_prompt},
167                    ],
168                    stream=True,
169                )
170            )
171            emitted = False
172            try:
173                for chunk in stream:
174                    choices = getattr(chunk, "choices", None)
175                    if not choices:
176                        continue
177                    choice = choices[0]
178                    delta = getattr(choice, "delta", None)
179                    if delta is None and isinstance(choice, dict):
180                        delta = choice.get("delta")
181                    chunk_text = _extract_openai_delta_text(
182                        delta,
183                        ("content", "reasoning_content", "reasoning", "refusal"),
184                    )
185                    if not chunk_text:
186                        continue
187                    emitted = True
188                    yield chunk_text
189            except AIProviderError:
190                raise
191            except self._openai.APIConnectionError as error:
192                raise AIProviderError(
193                    "Unable to connect to Kimi API. Check `ai.kimi.base_url` and network access."
194                ) from error
195            except self._openai.AuthenticationError as error:
196                raise AIProviderError(
197                    "Kimi authentication failed. Check `ai.kimi.api_key`, MOONSHOT_API_KEY, or KIMI_API_KEY."
198                ) from error
199            except self._openai.BadRequestError as error:
200                if _is_context_length_error(error):
201                    raise AIProviderError(
202                        "Kimi request exceeded the model context length. Reduce prompt size and retry."
203                    ) from error
204                raise AIProviderError(f"Kimi request was rejected: {error}") from error
205            except self._openai.APIError as error:
206                if _is_kimi_model_not_available_error(error):
207                    raise AIProviderError(
208                        "Kimi rejected the configured model. "
209                        f"Current model: `{self.model}`. "
210                        "Set `ai.kimi.model` to a model enabled for your Moonshot account "
211                        "(for example `kimi-k2-turbo-preview`) and retry."
212                    ) from error
213                raise AIProviderError(f"Kimi API error: {error}") from error
214            except Exception as error:
215                raise AIProviderError(f"Unexpected Kimi provider error: {error}") from error
216
217            if not emitted:
218                raise AIProviderError("Kimi returned an empty response.")
219
220        return _stream()

Stream generated text chunks from Kimi.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Yields:

Text chunk strings as they are generated.

Raises:
  • AIProviderError: On empty response or API failure.
def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
222    def analyze_with_attachments(
223        self,
224        system_prompt: str,
225        user_prompt: str,
226        attachments: list[Mapping[str, str]] | None,
227        max_tokens: int = DEFAULT_MAX_TOKENS,
228    ) -> str:
229        """Analyze with optional CSV file attachments via the Kimi Responses API.
230
231        Args:
232            system_prompt: The system-level instruction text.
233            user_prompt: The user-facing prompt with investigation context.
234            attachments: Optional list of attachment descriptors.
235            max_tokens: Maximum completion tokens.
236
237        Returns:
238            The generated analysis text.
239
240        Raises:
241            AIProviderError: On any API or network failure.
242        """
243        def _request() -> str:
244            return self._request_non_stream(
245                system_prompt=system_prompt,
246                user_prompt=user_prompt,
247                max_tokens=max_tokens,
248                attachments=attachments,
249            )
250
251        return self._run_kimi_request(_request)

Analyze with optional CSV file attachments via the Kimi Responses API.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def get_model_info(self) -> dict[str, str]:
401    def get_model_info(self) -> dict[str, str]:
402        """Return Kimi provider and model metadata.
403
404        Returns:
405            A dictionary with ``"provider"`` and ``"model"`` keys.
406        """
407        return {"provider": "kimi", "model": self.model}

Return Kimi provider and model metadata.

Returns:

A dictionary with "provider" and "model" keys.

class LocalProvider(app.ai_providers.AIProvider):
 48class LocalProvider(AIProvider):
 49    """OpenAI-compatible local provider implementation.
 50
 51    Attributes:
 52        base_url (str): The normalized local endpoint base URL.
 53        model (str): The local model identifier.
 54        api_key (str): The API key for the local endpoint.
 55        attach_csv_as_file (bool): Whether to attempt file-attachment mode.
 56        request_timeout_seconds (float): HTTP timeout in seconds.
 57        client: The ``openai.OpenAI`` SDK client instance.
 58    """
 59
 60    def __init__(
 61        self,
 62        base_url: str,
 63        model: str,
 64        api_key: str = "not-needed",
 65        attach_csv_as_file: bool = True,
 66        request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 67    ) -> None:
 68        """Initialize the local provider.
 69
 70        Args:
 71            base_url: Base URL for the local endpoint. Normalized to
 72                include ``/v1`` if missing.
 73            model: Model identifier.
 74            api_key: API key. Defaults to ``"not-needed"``.
 75            attach_csv_as_file: If ``True``, attempt file uploads.
 76            request_timeout_seconds: HTTP timeout in seconds.
 77
 78        Raises:
 79            AIProviderError: If the ``openai`` SDK is not installed.
 80        """
 81        try:
 82            import openai
 83        except ImportError as error:
 84            raise AIProviderError(
 85                "openai SDK is not installed. Install it with `pip install openai`."
 86            ) from error
 87
 88        normalized_api_key = _normalize_api_key_value(api_key) or "not-needed"
 89
 90        self._openai = openai
 91        self.base_url = _normalize_openai_compatible_base_url(
 92            base_url=base_url,
 93            default_base_url=DEFAULT_LOCAL_BASE_URL,
 94        )
 95        self.model = model
 96        self.api_key = normalized_api_key
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self.request_timeout_seconds = _resolve_timeout_seconds(
 99            request_timeout_seconds,
100            DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
101        )
102        self._api_timeout_error_type = getattr(openai, "APITimeoutError", None)
103        self._csv_attachment_supported: bool | None = None
104        self.client = openai.OpenAI(
105            api_key=normalized_api_key,
106            base_url=self.base_url,
107            timeout=self.request_timeout_seconds,
108            max_retries=0,
109        )
110        logger.info(
111            "Initialized local provider at %s with model %s (timeout %.1fs)",
112            self.base_url,
113            model,
114            self.request_timeout_seconds,
115        )
116
117    def analyze(
118        self,
119        system_prompt: str,
120        user_prompt: str,
121        max_tokens: int = DEFAULT_MAX_TOKENS,
122    ) -> str:
123        """Send a prompt to the local endpoint and return the generated text.
124
125        Args:
126            system_prompt: The system-level instruction text.
127            user_prompt: The user-facing prompt with investigation context.
128            max_tokens: Maximum completion tokens.
129
130        Returns:
131            The generated analysis text.
132
133        Raises:
134            AIProviderError: On any API or network failure.
135        """
136        return self.analyze_with_attachments(
137            system_prompt=system_prompt,
138            user_prompt=user_prompt,
139            attachments=None,
140            max_tokens=max_tokens,
141        )
142
143    def analyze_stream(
144        self,
145        system_prompt: str,
146        user_prompt: str,
147        max_tokens: int = DEFAULT_MAX_TOKENS,
148    ) -> Iterator[str]:
149        """Stream generated text chunks from the local endpoint.
150
151        Falls back to non-streaming if the endpoint reports streaming
152        is unsupported.
153
154        Args:
155            system_prompt: The system-level instruction text.
156            user_prompt: The user-facing prompt with investigation context.
157            max_tokens: Maximum completion tokens.
158
159        Yields:
160            Text chunk strings as they are generated.
161
162        Raises:
163            AIProviderError: On empty response or API failure.
164        """
165        def _stream() -> Iterator[str]:
166            try:
167                prompt_for_completion = self._build_chat_completion_prompt(
168                    user_prompt=user_prompt,
169                    attachments=None,
170                )
171                try:
172                    stream = _run_with_rate_limit_retries(
173                        request_fn=lambda: self.client.chat.completions.create(
174                            model=self.model,
175                            max_tokens=max_tokens,
176                            messages=[
177                                {"role": "system", "content": system_prompt},
178                                {"role": "user", "content": prompt_for_completion},
179                            ],
180                            stream=True,
181                        ),
182                        rate_limit_error_type=self._openai.RateLimitError,
183                        provider_name="Local provider",
184                    )
185                except self._openai.BadRequestError as error:
186                    lowered_error = str(error).lower()
187                    if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error):
188                        fallback_text = self._request_non_stream(
189                            system_prompt=system_prompt,
190                            user_prompt=user_prompt,
191                            max_tokens=max_tokens,
192                            attachments=None,
193                        )
194                        if fallback_text:
195                            yield fallback_text
196                            return
197                    raise
198
199                emitted = False
200                for chunk in stream:
201                    choices = getattr(chunk, "choices", None)
202                    if not choices:
203                        continue
204                    choice = choices[0]
205                    delta = getattr(choice, "delta", None)
206                    if delta is None and isinstance(choice, dict):
207                        delta = choice.get("delta")
208                    chunk_text = _extract_openai_delta_text(
209                        delta,
210                        ("content", "reasoning_content", "reasoning", "thinking"),
211                    )
212                    if not chunk_text:
213                        continue
214                    emitted = True
215                    yield chunk_text
216
217                if not emitted:
218                    raise AIProviderError(
219                        "Local AI provider returned an empty streamed response. "
220                        "Try a different local model or increase max tokens."
221                    )
222            except AIProviderError:
223                raise
224            except self._openai.APIConnectionError as error:
225                self._raise_connection_error(error)
226            except self._openai.AuthenticationError as error:
227                raise AIProviderError(
228                    "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
229                ) from error
230            except self._openai.BadRequestError as error:
231                if _is_context_length_error(error):
232                    raise AIProviderError(
233                        "Local model request exceeded the context length. Reduce prompt size and retry."
234                    ) from error
235                raise AIProviderError(f"Local provider request was rejected: {error}") from error
236            except self._openai.APIError as error:
237                self._raise_api_error(error)
238            except Exception as error:
239                raise AIProviderError(f"Unexpected local provider error: {error}") from error
240
241        return _stream()
242
243    def analyze_with_attachments(
244        self,
245        system_prompt: str,
246        user_prompt: str,
247        attachments: list[Mapping[str, str]] | None,
248        max_tokens: int = DEFAULT_MAX_TOKENS,
249    ) -> str:
250        """Analyze with optional CSV file attachments.
251
252        Args:
253            system_prompt: The system-level instruction text.
254            user_prompt: The user-facing prompt with investigation context.
255            attachments: Optional list of attachment descriptors.
256            max_tokens: Maximum completion tokens.
257
258        Returns:
259            The generated analysis text.
260
261        Raises:
262            AIProviderError: On any API or network failure.
263        """
264        def _request() -> str:
265            return self._request_non_stream(
266                system_prompt=system_prompt,
267                user_prompt=user_prompt,
268                max_tokens=max_tokens,
269                attachments=attachments,
270            )
271
272        return self._run_local_request(_request)
273
274    def _run_local_request(self, request_fn: Callable[[], _T]) -> _T:
275        """Execute a local request with rate-limit retries and error mapping.
276
277        Args:
278            request_fn: A zero-argument callable that performs the request.
279
280        Returns:
281            The return value of ``request_fn`` on success.
282
283        Raises:
284            AIProviderError: On any OpenAI SDK error (with local messages).
285        """
286        try:
287            return _run_with_rate_limit_retries(
288                request_fn=request_fn,
289                rate_limit_error_type=self._openai.RateLimitError,
290                provider_name="Local provider",
291            )
292        except AIProviderError:
293            raise
294        except self._openai.APIConnectionError as error:
295            self._raise_connection_error(error)
296        except self._openai.AuthenticationError as error:
297            raise AIProviderError(
298                "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
299            ) from error
300        except self._openai.BadRequestError as error:
301            if _is_context_length_error(error):
302                raise AIProviderError(
303                    "Local model request exceeded the context length. Reduce prompt size and retry."
304                ) from error
305            raise AIProviderError(f"Local provider request was rejected: {error}") from error
306        except self._openai.APIError as error:
307            self._raise_api_error(error)
308        except Exception as error:
309            raise AIProviderError(f"Unexpected local provider error: {error}") from error
310
311    def _raise_connection_error(self, error: Exception) -> None:
312        """Map APIConnectionError to AIProviderError with timeout detection.
313
314        Args:
315            error: The connection error to map.
316
317        Raises:
318            AIProviderError: Always raised with appropriate message.
319        """
320        if (
321            self._api_timeout_error_type is not None
322            and isinstance(error, self._api_timeout_error_type)
323        ) or "timeout" in str(error).lower():
324            raise AIProviderError(
325                "Local AI request timed out after "
326                f"{self.request_timeout_seconds:g} seconds. "
327                "Increase `ai.local.request_timeout_seconds` for long-running prompts."
328            ) from error
329        raise AIProviderError(
330            "Unable to connect to local AI endpoint. Check `ai.local.base_url` and ensure the server is running."
331        ) from error
332
333    def _raise_api_error(self, error: Exception) -> None:
334        """Map APIError to AIProviderError with 404 detection.
335
336        Args:
337            error: The API error to map.
338
339        Raises:
340            AIProviderError: Always raised with appropriate message.
341        """
342        error_text = str(error).lower()
343        if "404" in error_text or "not found" in error_text:
344            raise AIProviderError(
345                "Local AI endpoint returned 404 (not found). "
346                "This is often caused by a base URL missing `/v1`. "
347                f"Current base URL: {self.base_url}"
348            ) from error
349        raise AIProviderError(f"Local provider API error: {error}") from error
350
351    def analyze_with_progress(
352        self,
353        system_prompt: str,
354        user_prompt: str,
355        progress_callback: Callable[[dict[str, str]], None] | None,
356        attachments: list[Mapping[str, str]] | None = None,
357        max_tokens: int = DEFAULT_MAX_TOKENS,
358    ) -> str:
359        """Analyze with streamed progress updates when supported.
360
361        Streams the response and periodically invokes ``progress_callback``
362        with accumulated thinking and answer text. Falls back to
363        ``analyze_with_attachments`` when no callback is provided.
364
365        Args:
366            system_prompt: The system-level instruction text.
367            user_prompt: The user-facing prompt with investigation context.
368            progress_callback: Optional callable receiving progress dicts.
369            attachments: Optional list of attachment descriptors.
370            max_tokens: Maximum completion tokens.
371
372        Returns:
373            The generated analysis text with reasoning blocks removed.
374
375        Raises:
376            AIProviderError: On empty response or API failure.
377        """
378        if progress_callback is None:
379            return self.analyze_with_attachments(
380                system_prompt=system_prompt,
381                user_prompt=user_prompt,
382                attachments=attachments,
383                max_tokens=max_tokens,
384            )
385
386        def _request() -> str:
387            result = self._build_stream_or_result(
388                system_prompt=system_prompt,
389                user_prompt=user_prompt,
390                max_tokens=max_tokens,
391                attachments=attachments,
392            )
393            if isinstance(result, str):
394                return result
395            stream = result
396
397            thinking_parts: list[str] = []
398            answer_parts: list[str] = []
399            last_emit_at = 0.0
400            last_sent_thinking = ""
401            last_sent_answer = ""
402
403            for chunk in stream:
404                chunk_result = self._process_stream_chunk(chunk)
405                if chunk_result is None:
406                    continue
407
408                thinking_delta, answer_delta = chunk_result
409                if thinking_delta:
410                    thinking_parts.append(thinking_delta)
411                if answer_delta:
412                    answer_parts.append(answer_delta)
413
414                current_thinking = "".join(thinking_parts).strip()
415                current_answer = _clean_streamed_answer_text(
416                    answer_text="".join(answer_parts),
417                    thinking_text=current_thinking,
418                )
419
420                last_emit_at, last_sent_thinking, last_sent_answer = (
421                    self._emit_progress_if_needed(
422                        progress_callback=progress_callback,
423                        current_thinking=current_thinking,
424                        current_answer=current_answer,
425                        last_emit_at=last_emit_at,
426                        last_sent_thinking=last_sent_thinking,
427                        last_sent_answer=last_sent_answer,
428                    )
429                )
430
431            return self._finalize_stream_response(thinking_parts, answer_parts)
432
433        return self._run_local_request(_request)
434
435    def _build_stream_or_result(
436        self,
437        system_prompt: str,
438        user_prompt: str,
439        max_tokens: int,
440        attachments: list[Mapping[str, str]] | None,
441    ) -> Any | str:
442        """Set up the streaming request, returning a stream or a final string.
443
444        Attempts CSV file attachment first. If that succeeds, returns the
445        completed text directly. Otherwise creates a streaming chat completion.
446        Falls back to non-streaming if the endpoint rejects streaming.
447
448        Args:
449            system_prompt: The system-level instruction text.
450            user_prompt: The user-facing prompt text.
451            max_tokens: Maximum completion tokens.
452            attachments: Optional list of attachment descriptors.
453
454        Returns:
455            A streaming response object, or a ``str`` if the result was
456            obtained without streaming (attachment path or fallback).
457
458        Raises:
459            AIProviderError: If the non-streaming fallback also fails.
460        """
461        attachment_response = self._request_with_csv_attachments(
462            system_prompt=system_prompt,
463            user_prompt=user_prompt,
464            max_tokens=max_tokens,
465            attachments=attachments,
466        )
467        if attachment_response:
468            cleaned = _strip_leading_reasoning_blocks(attachment_response)
469            return cleaned or attachment_response.strip()
470
471        prompt_for_completion = self._build_chat_completion_prompt(
472            user_prompt=user_prompt,
473            attachments=attachments,
474        )
475
476        try:
477            return self.client.chat.completions.create(
478                model=self.model,
479                max_tokens=max_tokens,
480                messages=[
481                    {"role": "system", "content": system_prompt},
482                    {"role": "user", "content": prompt_for_completion},
483                ],
484                stream=True,
485            )
486        except self._openai.BadRequestError as error:
487            lowered_error = str(error).lower()
488            if "stream" in lowered_error and (
489                "unsupported" in lowered_error or "not support" in lowered_error
490            ):
491                return self._request_non_stream(
492                    system_prompt=system_prompt,
493                    user_prompt=user_prompt,
494                    max_tokens=max_tokens,
495                    attachments=attachments,
496                )
497            raise
498
499    @staticmethod
500    def _process_stream_chunk(chunk: Any) -> tuple[str, str] | None:
501        """Extract thinking and answer deltas from a single stream chunk.
502
503        Args:
504            chunk: A streaming response chunk from the OpenAI SDK.
505
506        Returns:
507            A ``(thinking_delta, answer_delta)`` tuple, or ``None`` if the
508            chunk contains no usable text.
509        """
510        choices = getattr(chunk, "choices", None)
511        if not choices:
512            return None
513        choice = choices[0]
514        delta = getattr(choice, "delta", None)
515        if delta is None and isinstance(choice, dict):
516            delta = choice.get("delta")
517        if delta is None:
518            return None
519
520        answer_delta = _extract_openai_delta_text(delta, ("content",))
521        thinking_delta = _extract_openai_delta_text(
522            delta,
523            ("reasoning_content", "reasoning", "thinking"),
524        )
525
526        if not answer_delta and not thinking_delta:
527            return None
528        return (thinking_delta, answer_delta)
529
530    @staticmethod
531    def _emit_progress_if_needed(
532        progress_callback: Callable[[dict[str, str]], None],
533        current_thinking: str,
534        current_answer: str,
535        last_emit_at: float,
536        last_sent_thinking: str,
537        last_sent_answer: str,
538    ) -> tuple[float, str, str]:
539        """Send a progress callback if enough content has changed.
540
541        Applies rate-limiting so the callback fires at most every 0.35 s
542        unless at least 80 characters have been added to either channel.
543
544        Args:
545            progress_callback: The callable to invoke with progress data.
546            current_thinking: Accumulated thinking text so far.
547            current_answer: Accumulated answer text so far.
548            last_emit_at: Monotonic timestamp of the last emission.
549            last_sent_thinking: Thinking text sent in the last emission.
550            last_sent_answer: Answer text sent in the last emission.
551
552        Returns:
553            Updated ``(last_emit_at, last_sent_thinking, last_sent_answer)``.
554        """
555        if not current_thinking and not current_answer:
556            return last_emit_at, last_sent_thinking, last_sent_answer
557
558        changed = (
559            current_thinking != last_sent_thinking
560            or current_answer != last_sent_answer
561        )
562        if not changed:
563            return last_emit_at, last_sent_thinking, last_sent_answer
564
565        now = time.monotonic()
566        if now - last_emit_at < 0.35 and (
567            len(current_thinking) - len(last_sent_thinking) < 80
568            and len(current_answer) - len(last_sent_answer) < 80
569        ):
570            return last_emit_at, last_sent_thinking, last_sent_answer
571
572        try:
573            progress_callback(
574                {
575                    "status": "thinking",
576                    "thinking_text": current_thinking,
577                    "partial_text": current_answer,
578                }
579            )
580        except Exception:
581            pass
582
583        return now, current_thinking, current_answer
584
585    @staticmethod
586    def _finalize_stream_response(
587        thinking_parts: list[str],
588        answer_parts: list[str],
589    ) -> str:
590        """Assemble the final response text from accumulated stream parts.
591
592        Args:
593            thinking_parts: Collected thinking-channel text fragments.
594            answer_parts: Collected answer-channel text fragments.
595
596        Returns:
597            The cleaned final answer, or the thinking text if no answer
598            was produced.
599
600        Raises:
601            AIProviderError: If both channels are empty.
602        """
603        final_thinking = "".join(thinking_parts).strip()
604        final_answer = _clean_streamed_answer_text(
605            answer_text="".join(answer_parts),
606            thinking_text=final_thinking,
607        )
608        if final_answer:
609            return final_answer
610        if final_thinking:
611            return final_thinking
612        raise AIProviderError(
613            "Local AI provider returned an empty streamed response. "
614            "Try a different local model or increase max tokens."
615        )
616
617    def _request_non_stream(
618        self,
619        system_prompt: str,
620        user_prompt: str,
621        max_tokens: int,
622        attachments: list[Mapping[str, str]] | None = None,
623    ) -> str:
624        """Perform a non-streaming local request with attachment handling.
625
626        Args:
627            system_prompt: The system-level instruction text.
628            user_prompt: The user-facing prompt text.
629            max_tokens: Maximum completion tokens.
630            attachments: Optional list of attachment descriptors.
631
632        Returns:
633            The generated analysis text with reasoning blocks removed.
634
635        Raises:
636            AIProviderError: If the response is empty.
637        """
638        attachment_response = self._request_with_csv_attachments(
639            system_prompt=system_prompt,
640            user_prompt=user_prompt,
641            max_tokens=max_tokens,
642            attachments=attachments,
643        )
644        if attachment_response:
645            cleaned_attachment_response = _strip_leading_reasoning_blocks(attachment_response)
646            if cleaned_attachment_response:
647                return cleaned_attachment_response
648            return attachment_response.strip()
649
650        prompt_for_completion = self._build_chat_completion_prompt(
651            user_prompt=user_prompt,
652            attachments=attachments,
653        )
654
655        response = self.client.chat.completions.create(
656            model=self.model,
657            max_tokens=max_tokens,
658            messages=[
659                {"role": "system", "content": system_prompt},
660                {"role": "user", "content": prompt_for_completion},
661            ],
662        )
663        text = _extract_openai_text(response)
664        if text:
665            cleaned_text = _strip_leading_reasoning_blocks(text)
666            if cleaned_text:
667                return cleaned_text
668            return text.strip()
669
670        finish_reason = None
671        choices = getattr(response, "choices", None)
672        if choices:
673            first_choice = choices[0]
674            finish_reason = getattr(first_choice, "finish_reason", None)
675            if finish_reason is None and isinstance(first_choice, dict):
676                finish_reason = first_choice.get("finish_reason")
677        reason_detail = f" (finish_reason={finish_reason})" if finish_reason else ""
678        raise AIProviderError(
679            "Local AI provider returned an empty response"
680            f"{reason_detail}. This can happen with reasoning-only outputs or very low token limits."
681        )
682
683    def _build_chat_completion_prompt(
684        self,
685        user_prompt: str,
686        attachments: list[Mapping[str, str]] | None,
687    ) -> str:
688        """Build the user prompt, inlining attachments if needed.
689
690        Args:
691            user_prompt: The original user-facing prompt text.
692            attachments: Optional list of attachment descriptors.
693
694        Returns:
695            The prompt string, potentially with attachment data appended.
696        """
697        prompt_for_completion = user_prompt
698        if attachments:
699            prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt(
700                user_prompt=user_prompt,
701                attachments=attachments,
702            )
703            if inlined_attachment_data:
704                logger.info("Local attachment fallback inlined attachment data into prompt.")
705        return prompt_for_completion
706
707    def _request_with_csv_attachments(
708        self,
709        system_prompt: str,
710        user_prompt: str,
711        max_tokens: int,
712        attachments: list[Mapping[str, str]] | None,
713    ) -> str | None:
714        """Attempt to send a request with CSV files via the local Responses API.
715
716        Args:
717            system_prompt: The system-level instruction text.
718            user_prompt: The user-facing prompt text.
719            max_tokens: Maximum completion tokens.
720            attachments: Optional list of attachment descriptors.
721
722        Returns:
723            The generated text if succeeded, or ``None`` if skipped.
724        """
725        normalized_attachments = self._prepare_csv_attachments(
726            attachments,
727            supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"),
728        )
729        if not normalized_attachments:
730            return None
731
732        try:
733            text = upload_and_request_via_responses_api(
734                client=self.client,
735                openai_module=self._openai,
736                model=self.model,
737                normalized_attachments=normalized_attachments,
738                system_prompt=system_prompt,
739                user_prompt=user_prompt,
740                max_tokens=max_tokens,
741                provider_name="Local provider",
742                upload_purpose="assistants",
743                convert_csv_to_txt=False,
744            )
745            self._csv_attachment_supported = True
746            return text
747        except Exception as error:
748            if _is_attachment_unsupported_error(error):
749                self._csv_attachment_supported = False
750                logger.info(
751                    "Local endpoint does not support file attachments via /files + /responses; "
752                    "falling back to chat.completions text mode."
753                )
754                return None
755            raise
756
757    def get_model_info(self) -> dict[str, str]:
758        """Return local provider and model metadata.
759
760        Returns:
761            A dictionary with ``"provider"`` and ``"model"`` keys.
762        """
763        return {"provider": "local", "model": self.model}

OpenAI-compatible local provider implementation.

Attributes:
  • base_url (str): The normalized local endpoint base URL.
  • model (str): The local model identifier.
  • api_key (str): The API key for the local endpoint.
  • attach_csv_as_file (bool): Whether to attempt file-attachment mode.
  • request_timeout_seconds (float): HTTP timeout in seconds.
  • client: The openai.OpenAI SDK client instance.
LocalProvider( base_url: str, model: str, api_key: str = 'not-needed', attach_csv_as_file: bool = True, request_timeout_seconds: float = 3600.0)
 60    def __init__(
 61        self,
 62        base_url: str,
 63        model: str,
 64        api_key: str = "not-needed",
 65        attach_csv_as_file: bool = True,
 66        request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 67    ) -> None:
 68        """Initialize the local provider.
 69
 70        Args:
 71            base_url: Base URL for the local endpoint. Normalized to
 72                include ``/v1`` if missing.
 73            model: Model identifier.
 74            api_key: API key. Defaults to ``"not-needed"``.
 75            attach_csv_as_file: If ``True``, attempt file uploads.
 76            request_timeout_seconds: HTTP timeout in seconds.
 77
 78        Raises:
 79            AIProviderError: If the ``openai`` SDK is not installed.
 80        """
 81        try:
 82            import openai
 83        except ImportError as error:
 84            raise AIProviderError(
 85                "openai SDK is not installed. Install it with `pip install openai`."
 86            ) from error
 87
 88        normalized_api_key = _normalize_api_key_value(api_key) or "not-needed"
 89
 90        self._openai = openai
 91        self.base_url = _normalize_openai_compatible_base_url(
 92            base_url=base_url,
 93            default_base_url=DEFAULT_LOCAL_BASE_URL,
 94        )
 95        self.model = model
 96        self.api_key = normalized_api_key
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self.request_timeout_seconds = _resolve_timeout_seconds(
 99            request_timeout_seconds,
100            DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
101        )
102        self._api_timeout_error_type = getattr(openai, "APITimeoutError", None)
103        self._csv_attachment_supported: bool | None = None
104        self.client = openai.OpenAI(
105            api_key=normalized_api_key,
106            base_url=self.base_url,
107            timeout=self.request_timeout_seconds,
108            max_retries=0,
109        )
110        logger.info(
111            "Initialized local provider at %s with model %s (timeout %.1fs)",
112            self.base_url,
113            model,
114            self.request_timeout_seconds,
115        )

Initialize the local provider.

Arguments:
  • base_url: Base URL for the local endpoint. Normalized to include /v1 if missing.
  • model: Model identifier.
  • api_key: API key. Defaults to "not-needed".
  • attach_csv_as_file: If True, attempt file uploads.
  • request_timeout_seconds: HTTP timeout in seconds.
Raises:
  • AIProviderError: If the openai SDK is not installed.
base_url
model
api_key
attach_csv_as_file
request_timeout_seconds
client
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
117    def analyze(
118        self,
119        system_prompt: str,
120        user_prompt: str,
121        max_tokens: int = DEFAULT_MAX_TOKENS,
122    ) -> str:
123        """Send a prompt to the local endpoint and return the generated text.
124
125        Args:
126            system_prompt: The system-level instruction text.
127            user_prompt: The user-facing prompt with investigation context.
128            max_tokens: Maximum completion tokens.
129
130        Returns:
131            The generated analysis text.
132
133        Raises:
134            AIProviderError: On any API or network failure.
135        """
136        return self.analyze_with_attachments(
137            system_prompt=system_prompt,
138            user_prompt=user_prompt,
139            attachments=None,
140            max_tokens=max_tokens,
141        )

Send a prompt to the local endpoint and return the generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
143    def analyze_stream(
144        self,
145        system_prompt: str,
146        user_prompt: str,
147        max_tokens: int = DEFAULT_MAX_TOKENS,
148    ) -> Iterator[str]:
149        """Stream generated text chunks from the local endpoint.
150
151        Falls back to non-streaming if the endpoint reports streaming
152        is unsupported.
153
154        Args:
155            system_prompt: The system-level instruction text.
156            user_prompt: The user-facing prompt with investigation context.
157            max_tokens: Maximum completion tokens.
158
159        Yields:
160            Text chunk strings as they are generated.
161
162        Raises:
163            AIProviderError: On empty response or API failure.
164        """
165        def _stream() -> Iterator[str]:
166            try:
167                prompt_for_completion = self._build_chat_completion_prompt(
168                    user_prompt=user_prompt,
169                    attachments=None,
170                )
171                try:
172                    stream = _run_with_rate_limit_retries(
173                        request_fn=lambda: self.client.chat.completions.create(
174                            model=self.model,
175                            max_tokens=max_tokens,
176                            messages=[
177                                {"role": "system", "content": system_prompt},
178                                {"role": "user", "content": prompt_for_completion},
179                            ],
180                            stream=True,
181                        ),
182                        rate_limit_error_type=self._openai.RateLimitError,
183                        provider_name="Local provider",
184                    )
185                except self._openai.BadRequestError as error:
186                    lowered_error = str(error).lower()
187                    if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error):
188                        fallback_text = self._request_non_stream(
189                            system_prompt=system_prompt,
190                            user_prompt=user_prompt,
191                            max_tokens=max_tokens,
192                            attachments=None,
193                        )
194                        if fallback_text:
195                            yield fallback_text
196                            return
197                    raise
198
199                emitted = False
200                for chunk in stream:
201                    choices = getattr(chunk, "choices", None)
202                    if not choices:
203                        continue
204                    choice = choices[0]
205                    delta = getattr(choice, "delta", None)
206                    if delta is None and isinstance(choice, dict):
207                        delta = choice.get("delta")
208                    chunk_text = _extract_openai_delta_text(
209                        delta,
210                        ("content", "reasoning_content", "reasoning", "thinking"),
211                    )
212                    if not chunk_text:
213                        continue
214                    emitted = True
215                    yield chunk_text
216
217                if not emitted:
218                    raise AIProviderError(
219                        "Local AI provider returned an empty streamed response. "
220                        "Try a different local model or increase max tokens."
221                    )
222            except AIProviderError:
223                raise
224            except self._openai.APIConnectionError as error:
225                self._raise_connection_error(error)
226            except self._openai.AuthenticationError as error:
227                raise AIProviderError(
228                    "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
229                ) from error
230            except self._openai.BadRequestError as error:
231                if _is_context_length_error(error):
232                    raise AIProviderError(
233                        "Local model request exceeded the context length. Reduce prompt size and retry."
234                    ) from error
235                raise AIProviderError(f"Local provider request was rejected: {error}") from error
236            except self._openai.APIError as error:
237                self._raise_api_error(error)
238            except Exception as error:
239                raise AIProviderError(f"Unexpected local provider error: {error}") from error
240
241        return _stream()

Stream generated text chunks from the local endpoint.

Falls back to non-streaming if the endpoint reports streaming is unsupported.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Yields:

Text chunk strings as they are generated.

Raises:
  • AIProviderError: On empty response or API failure.
def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
243    def analyze_with_attachments(
244        self,
245        system_prompt: str,
246        user_prompt: str,
247        attachments: list[Mapping[str, str]] | None,
248        max_tokens: int = DEFAULT_MAX_TOKENS,
249    ) -> str:
250        """Analyze with optional CSV file attachments.
251
252        Args:
253            system_prompt: The system-level instruction text.
254            user_prompt: The user-facing prompt with investigation context.
255            attachments: Optional list of attachment descriptors.
256            max_tokens: Maximum completion tokens.
257
258        Returns:
259            The generated analysis text.
260
261        Raises:
262            AIProviderError: On any API or network failure.
263        """
264        def _request() -> str:
265            return self._request_non_stream(
266                system_prompt=system_prompt,
267                user_prompt=user_prompt,
268                max_tokens=max_tokens,
269                attachments=attachments,
270            )
271
272        return self._run_local_request(_request)

Analyze with optional CSV file attachments.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_with_progress( self, system_prompt: str, user_prompt: str, progress_callback: Optional[Callable[[dict[str, str]], NoneType]], attachments: list[typing.Mapping[str, str]] | None = None, max_tokens: int = 256000) -> str:
351    def analyze_with_progress(
352        self,
353        system_prompt: str,
354        user_prompt: str,
355        progress_callback: Callable[[dict[str, str]], None] | None,
356        attachments: list[Mapping[str, str]] | None = None,
357        max_tokens: int = DEFAULT_MAX_TOKENS,
358    ) -> str:
359        """Analyze with streamed progress updates when supported.
360
361        Streams the response and periodically invokes ``progress_callback``
362        with accumulated thinking and answer text. Falls back to
363        ``analyze_with_attachments`` when no callback is provided.
364
365        Args:
366            system_prompt: The system-level instruction text.
367            user_prompt: The user-facing prompt with investigation context.
368            progress_callback: Optional callable receiving progress dicts.
369            attachments: Optional list of attachment descriptors.
370            max_tokens: Maximum completion tokens.
371
372        Returns:
373            The generated analysis text with reasoning blocks removed.
374
375        Raises:
376            AIProviderError: On empty response or API failure.
377        """
378        if progress_callback is None:
379            return self.analyze_with_attachments(
380                system_prompt=system_prompt,
381                user_prompt=user_prompt,
382                attachments=attachments,
383                max_tokens=max_tokens,
384            )
385
386        def _request() -> str:
387            result = self._build_stream_or_result(
388                system_prompt=system_prompt,
389                user_prompt=user_prompt,
390                max_tokens=max_tokens,
391                attachments=attachments,
392            )
393            if isinstance(result, str):
394                return result
395            stream = result
396
397            thinking_parts: list[str] = []
398            answer_parts: list[str] = []
399            last_emit_at = 0.0
400            last_sent_thinking = ""
401            last_sent_answer = ""
402
403            for chunk in stream:
404                chunk_result = self._process_stream_chunk(chunk)
405                if chunk_result is None:
406                    continue
407
408                thinking_delta, answer_delta = chunk_result
409                if thinking_delta:
410                    thinking_parts.append(thinking_delta)
411                if answer_delta:
412                    answer_parts.append(answer_delta)
413
414                current_thinking = "".join(thinking_parts).strip()
415                current_answer = _clean_streamed_answer_text(
416                    answer_text="".join(answer_parts),
417                    thinking_text=current_thinking,
418                )
419
420                last_emit_at, last_sent_thinking, last_sent_answer = (
421                    self._emit_progress_if_needed(
422                        progress_callback=progress_callback,
423                        current_thinking=current_thinking,
424                        current_answer=current_answer,
425                        last_emit_at=last_emit_at,
426                        last_sent_thinking=last_sent_thinking,
427                        last_sent_answer=last_sent_answer,
428                    )
429                )
430
431            return self._finalize_stream_response(thinking_parts, answer_parts)
432
433        return self._run_local_request(_request)

Analyze with streamed progress updates when supported.

Streams the response and periodically invokes progress_callback with accumulated thinking and answer text. Falls back to analyze_with_attachments when no callback is provided.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • progress_callback: Optional callable receiving progress dicts.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text with reasoning blocks removed.

Raises:
  • AIProviderError: On empty response or API failure.
def get_model_info(self) -> dict[str, str]:
757    def get_model_info(self) -> dict[str, str]:
758        """Return local provider and model metadata.
759
760        Returns:
761            A dictionary with ``"provider"`` and ``"model"`` keys.
762        """
763        return {"provider": "local", "model": self.model}

Return local provider and model metadata.

Returns:

A dictionary with "provider" and "model" keys.

def create_provider(config: dict[str, typing.Any]) -> AIProvider:
31def create_provider(config: dict[str, Any]) -> AIProvider:
32    """Create and return an AI provider instance based on application config.
33
34    Reads the ``ai.provider`` key from the configuration dictionary and
35    constructs the corresponding provider class with settings from the
36    provider-specific sub-section.
37
38    Args:
39        config: The application configuration dictionary, expected to
40            contain an ``"ai"`` section with a ``"provider"`` key.
41
42    Returns:
43        A configured ``AIProvider`` instance ready for use.
44
45    Raises:
46        ValueError: If the ``ai`` section is missing or the provider
47            name is not supported.
48        AIProviderError: If the selected provider cannot be initialized.
49    """
50    ai_config = config.get("ai", {})
51    if not isinstance(ai_config, dict):
52        raise ValueError("Invalid configuration: `ai` section must be a dictionary.")
53
54    provider_name = str(ai_config.get("provider", "claude")).strip().lower()
55
56    if provider_name == "claude":
57        return _create_claude_provider(ai_config)
58
59    if provider_name == "openai":
60        return _create_openai_provider(ai_config)
61
62    if provider_name == "local":
63        return _create_local_provider(ai_config)
64
65    if provider_name == "kimi":
66        return _create_kimi_provider(ai_config)
67
68    raise ValueError(
69        f"Unsupported AI provider '{provider_name}'. Expected one of: claude, openai, kimi, local."
70    )

Create and return an AI provider instance based on application config.

Reads the ai.provider key from the configuration dictionary and constructs the corresponding provider class with settings from the provider-specific sub-section.

Arguments:
  • config: The application configuration dictionary, expected to contain an "ai" section with a "provider" key.
Returns:

A configured AIProvider instance ready for use.

Raises:
  • ValueError: If the ai section is missing or the provider name is not supported.
  • AIProviderError: If the selected provider cannot be initialized.