app.ai_providers.local_provider

OpenAI-compatible local provider implementation.

Uses the openai Python SDK pointed at a local endpoint (Ollama, LM Studio, vLLM, or similar). Supports synchronous and streaming generation, CSV file attachments via the Responses API when available, automatic reasoning-block stripping for local reasoning models, and configurable request timeouts.

Attributes:
  • logger: Module-level logger for local provider operations.
  1"""OpenAI-compatible local provider implementation.
  2
  3Uses the ``openai`` Python SDK pointed at a local endpoint (Ollama,
  4LM Studio, vLLM, or similar). Supports synchronous and streaming
  5generation, CSV file attachments via the Responses API when available,
  6automatic reasoning-block stripping for local reasoning models, and
  7configurable request timeouts.
  8
  9Attributes:
 10    logger: Module-level logger for local provider operations.
 11"""
 12
 13from __future__ import annotations
 14
 15import logging
 16import time
 17from typing import Any, Callable, Iterator, Mapping
 18
 19from .base import (
 20    AIProvider,
 21    AIProviderError,
 22    DEFAULT_LOCAL_BASE_URL,
 23    DEFAULT_MAX_TOKENS,
 24    DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 25    _is_attachment_unsupported_error,
 26    _is_context_length_error,
 27    _normalize_api_key_value,
 28    _normalize_openai_compatible_base_url,
 29    _resolve_timeout_seconds,
 30    _run_with_rate_limit_retries,
 31    _T,
 32)
 33from .utils import (
 34    _clean_streamed_answer_text,
 35    _extract_openai_delta_text,
 36    _extract_openai_text,
 37    _inline_attachment_data_into_prompt,
 38    _strip_leading_reasoning_blocks,
 39    upload_and_request_via_responses_api,
 40)
 41
 42logger = logging.getLogger(__name__)
 43
 44DEFAULT_LOCAL_MODEL = "llama3.1:70b"
 45
 46
 47class LocalProvider(AIProvider):
 48    """OpenAI-compatible local provider implementation.
 49
 50    Attributes:
 51        base_url (str): The normalized local endpoint base URL.
 52        model (str): The local model identifier.
 53        api_key (str): The API key for the local endpoint.
 54        attach_csv_as_file (bool): Whether to attempt file-attachment mode.
 55        request_timeout_seconds (float): HTTP timeout in seconds.
 56        client: The ``openai.OpenAI`` SDK client instance.
 57    """
 58
 59    def __init__(
 60        self,
 61        base_url: str,
 62        model: str,
 63        api_key: str = "not-needed",
 64        attach_csv_as_file: bool = True,
 65        request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 66    ) -> None:
 67        """Initialize the local provider.
 68
 69        Args:
 70            base_url: Base URL for the local endpoint. Normalized to
 71                include ``/v1`` if missing.
 72            model: Model identifier.
 73            api_key: API key. Defaults to ``"not-needed"``.
 74            attach_csv_as_file: If ``True``, attempt file uploads.
 75            request_timeout_seconds: HTTP timeout in seconds.
 76
 77        Raises:
 78            AIProviderError: If the ``openai`` SDK is not installed.
 79        """
 80        try:
 81            import openai
 82        except ImportError as error:
 83            raise AIProviderError(
 84                "openai SDK is not installed. Install it with `pip install openai`."
 85            ) from error
 86
 87        normalized_api_key = _normalize_api_key_value(api_key) or "not-needed"
 88
 89        self._openai = openai
 90        self.base_url = _normalize_openai_compatible_base_url(
 91            base_url=base_url,
 92            default_base_url=DEFAULT_LOCAL_BASE_URL,
 93        )
 94        self.model = model
 95        self.api_key = normalized_api_key
 96        self.attach_csv_as_file = bool(attach_csv_as_file)
 97        self.request_timeout_seconds = _resolve_timeout_seconds(
 98            request_timeout_seconds,
 99            DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
100        )
101        self._api_timeout_error_type = getattr(openai, "APITimeoutError", None)
102        self._csv_attachment_supported: bool | None = None
103        self.client = openai.OpenAI(
104            api_key=normalized_api_key,
105            base_url=self.base_url,
106            timeout=self.request_timeout_seconds,
107            max_retries=0,
108        )
109        logger.info(
110            "Initialized local provider at %s with model %s (timeout %.1fs)",
111            self.base_url,
112            model,
113            self.request_timeout_seconds,
114        )
115
116    def analyze(
117        self,
118        system_prompt: str,
119        user_prompt: str,
120        max_tokens: int = DEFAULT_MAX_TOKENS,
121    ) -> str:
122        """Send a prompt to the local endpoint and return the generated text.
123
124        Args:
125            system_prompt: The system-level instruction text.
126            user_prompt: The user-facing prompt with investigation context.
127            max_tokens: Maximum completion tokens.
128
129        Returns:
130            The generated analysis text.
131
132        Raises:
133            AIProviderError: On any API or network failure.
134        """
135        return self.analyze_with_attachments(
136            system_prompt=system_prompt,
137            user_prompt=user_prompt,
138            attachments=None,
139            max_tokens=max_tokens,
140        )
141
142    def analyze_stream(
143        self,
144        system_prompt: str,
145        user_prompt: str,
146        max_tokens: int = DEFAULT_MAX_TOKENS,
147    ) -> Iterator[str]:
148        """Stream generated text chunks from the local endpoint.
149
150        Falls back to non-streaming if the endpoint reports streaming
151        is unsupported.
152
153        Args:
154            system_prompt: The system-level instruction text.
155            user_prompt: The user-facing prompt with investigation context.
156            max_tokens: Maximum completion tokens.
157
158        Yields:
159            Text chunk strings as they are generated.
160
161        Raises:
162            AIProviderError: On empty response or API failure.
163        """
164        def _stream() -> Iterator[str]:
165            try:
166                prompt_for_completion = self._build_chat_completion_prompt(
167                    user_prompt=user_prompt,
168                    attachments=None,
169                )
170                try:
171                    stream = _run_with_rate_limit_retries(
172                        request_fn=lambda: self.client.chat.completions.create(
173                            model=self.model,
174                            max_tokens=max_tokens,
175                            messages=[
176                                {"role": "system", "content": system_prompt},
177                                {"role": "user", "content": prompt_for_completion},
178                            ],
179                            stream=True,
180                        ),
181                        rate_limit_error_type=self._openai.RateLimitError,
182                        provider_name="Local provider",
183                    )
184                except self._openai.BadRequestError as error:
185                    lowered_error = str(error).lower()
186                    if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error):
187                        fallback_text = self._request_non_stream(
188                            system_prompt=system_prompt,
189                            user_prompt=user_prompt,
190                            max_tokens=max_tokens,
191                            attachments=None,
192                        )
193                        if fallback_text:
194                            yield fallback_text
195                            return
196                    raise
197
198                emitted = False
199                for chunk in stream:
200                    choices = getattr(chunk, "choices", None)
201                    if not choices:
202                        continue
203                    choice = choices[0]
204                    delta = getattr(choice, "delta", None)
205                    if delta is None and isinstance(choice, dict):
206                        delta = choice.get("delta")
207                    chunk_text = _extract_openai_delta_text(
208                        delta,
209                        ("content", "reasoning_content", "reasoning", "thinking"),
210                    )
211                    if not chunk_text:
212                        continue
213                    emitted = True
214                    yield chunk_text
215
216                if not emitted:
217                    raise AIProviderError(
218                        "Local AI provider returned an empty streamed response. "
219                        "Try a different local model or increase max tokens."
220                    )
221            except AIProviderError:
222                raise
223            except self._openai.APIConnectionError as error:
224                self._raise_connection_error(error)
225            except self._openai.AuthenticationError as error:
226                raise AIProviderError(
227                    "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
228                ) from error
229            except self._openai.BadRequestError as error:
230                if _is_context_length_error(error):
231                    raise AIProviderError(
232                        "Local model request exceeded the context length. Reduce prompt size and retry."
233                    ) from error
234                raise AIProviderError(f"Local provider request was rejected: {error}") from error
235            except self._openai.APIError as error:
236                self._raise_api_error(error)
237            except Exception as error:
238                raise AIProviderError(f"Unexpected local provider error: {error}") from error
239
240        return _stream()
241
242    def analyze_with_attachments(
243        self,
244        system_prompt: str,
245        user_prompt: str,
246        attachments: list[Mapping[str, str]] | None,
247        max_tokens: int = DEFAULT_MAX_TOKENS,
248    ) -> str:
249        """Analyze with optional CSV file attachments.
250
251        Args:
252            system_prompt: The system-level instruction text.
253            user_prompt: The user-facing prompt with investigation context.
254            attachments: Optional list of attachment descriptors.
255            max_tokens: Maximum completion tokens.
256
257        Returns:
258            The generated analysis text.
259
260        Raises:
261            AIProviderError: On any API or network failure.
262        """
263        def _request() -> str:
264            return self._request_non_stream(
265                system_prompt=system_prompt,
266                user_prompt=user_prompt,
267                max_tokens=max_tokens,
268                attachments=attachments,
269            )
270
271        return self._run_local_request(_request)
272
273    def _run_local_request(self, request_fn: Callable[[], _T]) -> _T:
274        """Execute a local request with rate-limit retries and error mapping.
275
276        Args:
277            request_fn: A zero-argument callable that performs the request.
278
279        Returns:
280            The return value of ``request_fn`` on success.
281
282        Raises:
283            AIProviderError: On any OpenAI SDK error (with local messages).
284        """
285        try:
286            return _run_with_rate_limit_retries(
287                request_fn=request_fn,
288                rate_limit_error_type=self._openai.RateLimitError,
289                provider_name="Local provider",
290            )
291        except AIProviderError:
292            raise
293        except self._openai.APIConnectionError as error:
294            self._raise_connection_error(error)
295        except self._openai.AuthenticationError as error:
296            raise AIProviderError(
297                "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
298            ) from error
299        except self._openai.BadRequestError as error:
300            if _is_context_length_error(error):
301                raise AIProviderError(
302                    "Local model request exceeded the context length. Reduce prompt size and retry."
303                ) from error
304            raise AIProviderError(f"Local provider request was rejected: {error}") from error
305        except self._openai.APIError as error:
306            self._raise_api_error(error)
307        except Exception as error:
308            raise AIProviderError(f"Unexpected local provider error: {error}") from error
309
310    def _raise_connection_error(self, error: Exception) -> None:
311        """Map APIConnectionError to AIProviderError with timeout detection.
312
313        Args:
314            error: The connection error to map.
315
316        Raises:
317            AIProviderError: Always raised with appropriate message.
318        """
319        if (
320            self._api_timeout_error_type is not None
321            and isinstance(error, self._api_timeout_error_type)
322        ) or "timeout" in str(error).lower():
323            raise AIProviderError(
324                "Local AI request timed out after "
325                f"{self.request_timeout_seconds:g} seconds. "
326                "Increase `ai.local.request_timeout_seconds` for long-running prompts."
327            ) from error
328        raise AIProviderError(
329            "Unable to connect to local AI endpoint. Check `ai.local.base_url` and ensure the server is running."
330        ) from error
331
332    def _raise_api_error(self, error: Exception) -> None:
333        """Map APIError to AIProviderError with 404 detection.
334
335        Args:
336            error: The API error to map.
337
338        Raises:
339            AIProviderError: Always raised with appropriate message.
340        """
341        error_text = str(error).lower()
342        if "404" in error_text or "not found" in error_text:
343            raise AIProviderError(
344                "Local AI endpoint returned 404 (not found). "
345                "This is often caused by a base URL missing `/v1`. "
346                f"Current base URL: {self.base_url}"
347            ) from error
348        raise AIProviderError(f"Local provider API error: {error}") from error
349
350    def analyze_with_progress(
351        self,
352        system_prompt: str,
353        user_prompt: str,
354        progress_callback: Callable[[dict[str, str]], None] | None,
355        attachments: list[Mapping[str, str]] | None = None,
356        max_tokens: int = DEFAULT_MAX_TOKENS,
357    ) -> str:
358        """Analyze with streamed progress updates when supported.
359
360        Streams the response and periodically invokes ``progress_callback``
361        with accumulated thinking and answer text. Falls back to
362        ``analyze_with_attachments`` when no callback is provided.
363
364        Args:
365            system_prompt: The system-level instruction text.
366            user_prompt: The user-facing prompt with investigation context.
367            progress_callback: Optional callable receiving progress dicts.
368            attachments: Optional list of attachment descriptors.
369            max_tokens: Maximum completion tokens.
370
371        Returns:
372            The generated analysis text with reasoning blocks removed.
373
374        Raises:
375            AIProviderError: On empty response or API failure.
376        """
377        if progress_callback is None:
378            return self.analyze_with_attachments(
379                system_prompt=system_prompt,
380                user_prompt=user_prompt,
381                attachments=attachments,
382                max_tokens=max_tokens,
383            )
384
385        def _request() -> str:
386            result = self._build_stream_or_result(
387                system_prompt=system_prompt,
388                user_prompt=user_prompt,
389                max_tokens=max_tokens,
390                attachments=attachments,
391            )
392            if isinstance(result, str):
393                return result
394            stream = result
395
396            thinking_parts: list[str] = []
397            answer_parts: list[str] = []
398            last_emit_at = 0.0
399            last_sent_thinking = ""
400            last_sent_answer = ""
401
402            for chunk in stream:
403                chunk_result = self._process_stream_chunk(chunk)
404                if chunk_result is None:
405                    continue
406
407                thinking_delta, answer_delta = chunk_result
408                if thinking_delta:
409                    thinking_parts.append(thinking_delta)
410                if answer_delta:
411                    answer_parts.append(answer_delta)
412
413                current_thinking = "".join(thinking_parts).strip()
414                current_answer = _clean_streamed_answer_text(
415                    answer_text="".join(answer_parts),
416                    thinking_text=current_thinking,
417                )
418
419                last_emit_at, last_sent_thinking, last_sent_answer = (
420                    self._emit_progress_if_needed(
421                        progress_callback=progress_callback,
422                        current_thinking=current_thinking,
423                        current_answer=current_answer,
424                        last_emit_at=last_emit_at,
425                        last_sent_thinking=last_sent_thinking,
426                        last_sent_answer=last_sent_answer,
427                    )
428                )
429
430            return self._finalize_stream_response(thinking_parts, answer_parts)
431
432        return self._run_local_request(_request)
433
434    def _build_stream_or_result(
435        self,
436        system_prompt: str,
437        user_prompt: str,
438        max_tokens: int,
439        attachments: list[Mapping[str, str]] | None,
440    ) -> Any | str:
441        """Set up the streaming request, returning a stream or a final string.
442
443        Attempts CSV file attachment first. If that succeeds, returns the
444        completed text directly. Otherwise creates a streaming chat completion.
445        Falls back to non-streaming if the endpoint rejects streaming.
446
447        Args:
448            system_prompt: The system-level instruction text.
449            user_prompt: The user-facing prompt text.
450            max_tokens: Maximum completion tokens.
451            attachments: Optional list of attachment descriptors.
452
453        Returns:
454            A streaming response object, or a ``str`` if the result was
455            obtained without streaming (attachment path or fallback).
456
457        Raises:
458            AIProviderError: If the non-streaming fallback also fails.
459        """
460        attachment_response = self._request_with_csv_attachments(
461            system_prompt=system_prompt,
462            user_prompt=user_prompt,
463            max_tokens=max_tokens,
464            attachments=attachments,
465        )
466        if attachment_response:
467            cleaned = _strip_leading_reasoning_blocks(attachment_response)
468            return cleaned or attachment_response.strip()
469
470        prompt_for_completion = self._build_chat_completion_prompt(
471            user_prompt=user_prompt,
472            attachments=attachments,
473        )
474
475        try:
476            return self.client.chat.completions.create(
477                model=self.model,
478                max_tokens=max_tokens,
479                messages=[
480                    {"role": "system", "content": system_prompt},
481                    {"role": "user", "content": prompt_for_completion},
482                ],
483                stream=True,
484            )
485        except self._openai.BadRequestError as error:
486            lowered_error = str(error).lower()
487            if "stream" in lowered_error and (
488                "unsupported" in lowered_error or "not support" in lowered_error
489            ):
490                return self._request_non_stream(
491                    system_prompt=system_prompt,
492                    user_prompt=user_prompt,
493                    max_tokens=max_tokens,
494                    attachments=attachments,
495                )
496            raise
497
498    @staticmethod
499    def _process_stream_chunk(chunk: Any) -> tuple[str, str] | None:
500        """Extract thinking and answer deltas from a single stream chunk.
501
502        Args:
503            chunk: A streaming response chunk from the OpenAI SDK.
504
505        Returns:
506            A ``(thinking_delta, answer_delta)`` tuple, or ``None`` if the
507            chunk contains no usable text.
508        """
509        choices = getattr(chunk, "choices", None)
510        if not choices:
511            return None
512        choice = choices[0]
513        delta = getattr(choice, "delta", None)
514        if delta is None and isinstance(choice, dict):
515            delta = choice.get("delta")
516        if delta is None:
517            return None
518
519        answer_delta = _extract_openai_delta_text(delta, ("content",))
520        thinking_delta = _extract_openai_delta_text(
521            delta,
522            ("reasoning_content", "reasoning", "thinking"),
523        )
524
525        if not answer_delta and not thinking_delta:
526            return None
527        return (thinking_delta, answer_delta)
528
529    @staticmethod
530    def _emit_progress_if_needed(
531        progress_callback: Callable[[dict[str, str]], None],
532        current_thinking: str,
533        current_answer: str,
534        last_emit_at: float,
535        last_sent_thinking: str,
536        last_sent_answer: str,
537    ) -> tuple[float, str, str]:
538        """Send a progress callback if enough content has changed.
539
540        Applies rate-limiting so the callback fires at most every 0.35 s
541        unless at least 80 characters have been added to either channel.
542
543        Args:
544            progress_callback: The callable to invoke with progress data.
545            current_thinking: Accumulated thinking text so far.
546            current_answer: Accumulated answer text so far.
547            last_emit_at: Monotonic timestamp of the last emission.
548            last_sent_thinking: Thinking text sent in the last emission.
549            last_sent_answer: Answer text sent in the last emission.
550
551        Returns:
552            Updated ``(last_emit_at, last_sent_thinking, last_sent_answer)``.
553        """
554        if not current_thinking and not current_answer:
555            return last_emit_at, last_sent_thinking, last_sent_answer
556
557        changed = (
558            current_thinking != last_sent_thinking
559            or current_answer != last_sent_answer
560        )
561        if not changed:
562            return last_emit_at, last_sent_thinking, last_sent_answer
563
564        now = time.monotonic()
565        if now - last_emit_at < 0.35 and (
566            len(current_thinking) - len(last_sent_thinking) < 80
567            and len(current_answer) - len(last_sent_answer) < 80
568        ):
569            return last_emit_at, last_sent_thinking, last_sent_answer
570
571        try:
572            progress_callback(
573                {
574                    "status": "thinking",
575                    "thinking_text": current_thinking,
576                    "partial_text": current_answer,
577                }
578            )
579        except Exception:
580            pass
581
582        return now, current_thinking, current_answer
583
584    @staticmethod
585    def _finalize_stream_response(
586        thinking_parts: list[str],
587        answer_parts: list[str],
588    ) -> str:
589        """Assemble the final response text from accumulated stream parts.
590
591        Args:
592            thinking_parts: Collected thinking-channel text fragments.
593            answer_parts: Collected answer-channel text fragments.
594
595        Returns:
596            The cleaned final answer, or the thinking text if no answer
597            was produced.
598
599        Raises:
600            AIProviderError: If both channels are empty.
601        """
602        final_thinking = "".join(thinking_parts).strip()
603        final_answer = _clean_streamed_answer_text(
604            answer_text="".join(answer_parts),
605            thinking_text=final_thinking,
606        )
607        if final_answer:
608            return final_answer
609        if final_thinking:
610            return final_thinking
611        raise AIProviderError(
612            "Local AI provider returned an empty streamed response. "
613            "Try a different local model or increase max tokens."
614        )
615
616    def _request_non_stream(
617        self,
618        system_prompt: str,
619        user_prompt: str,
620        max_tokens: int,
621        attachments: list[Mapping[str, str]] | None = None,
622    ) -> str:
623        """Perform a non-streaming local request with attachment handling.
624
625        Args:
626            system_prompt: The system-level instruction text.
627            user_prompt: The user-facing prompt text.
628            max_tokens: Maximum completion tokens.
629            attachments: Optional list of attachment descriptors.
630
631        Returns:
632            The generated analysis text with reasoning blocks removed.
633
634        Raises:
635            AIProviderError: If the response is empty.
636        """
637        attachment_response = self._request_with_csv_attachments(
638            system_prompt=system_prompt,
639            user_prompt=user_prompt,
640            max_tokens=max_tokens,
641            attachments=attachments,
642        )
643        if attachment_response:
644            cleaned_attachment_response = _strip_leading_reasoning_blocks(attachment_response)
645            if cleaned_attachment_response:
646                return cleaned_attachment_response
647            return attachment_response.strip()
648
649        prompt_for_completion = self._build_chat_completion_prompt(
650            user_prompt=user_prompt,
651            attachments=attachments,
652        )
653
654        response = self.client.chat.completions.create(
655            model=self.model,
656            max_tokens=max_tokens,
657            messages=[
658                {"role": "system", "content": system_prompt},
659                {"role": "user", "content": prompt_for_completion},
660            ],
661        )
662        text = _extract_openai_text(response)
663        if text:
664            cleaned_text = _strip_leading_reasoning_blocks(text)
665            if cleaned_text:
666                return cleaned_text
667            return text.strip()
668
669        finish_reason = None
670        choices = getattr(response, "choices", None)
671        if choices:
672            first_choice = choices[0]
673            finish_reason = getattr(first_choice, "finish_reason", None)
674            if finish_reason is None and isinstance(first_choice, dict):
675                finish_reason = first_choice.get("finish_reason")
676        reason_detail = f" (finish_reason={finish_reason})" if finish_reason else ""
677        raise AIProviderError(
678            "Local AI provider returned an empty response"
679            f"{reason_detail}. This can happen with reasoning-only outputs or very low token limits."
680        )
681
682    def _build_chat_completion_prompt(
683        self,
684        user_prompt: str,
685        attachments: list[Mapping[str, str]] | None,
686    ) -> str:
687        """Build the user prompt, inlining attachments if needed.
688
689        Args:
690            user_prompt: The original user-facing prompt text.
691            attachments: Optional list of attachment descriptors.
692
693        Returns:
694            The prompt string, potentially with attachment data appended.
695        """
696        prompt_for_completion = user_prompt
697        if attachments:
698            prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt(
699                user_prompt=user_prompt,
700                attachments=attachments,
701            )
702            if inlined_attachment_data:
703                logger.info("Local attachment fallback inlined attachment data into prompt.")
704        return prompt_for_completion
705
706    def _request_with_csv_attachments(
707        self,
708        system_prompt: str,
709        user_prompt: str,
710        max_tokens: int,
711        attachments: list[Mapping[str, str]] | None,
712    ) -> str | None:
713        """Attempt to send a request with CSV files via the local Responses API.
714
715        Args:
716            system_prompt: The system-level instruction text.
717            user_prompt: The user-facing prompt text.
718            max_tokens: Maximum completion tokens.
719            attachments: Optional list of attachment descriptors.
720
721        Returns:
722            The generated text if succeeded, or ``None`` if skipped.
723        """
724        normalized_attachments = self._prepare_csv_attachments(
725            attachments,
726            supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"),
727        )
728        if not normalized_attachments:
729            return None
730
731        try:
732            text = upload_and_request_via_responses_api(
733                client=self.client,
734                openai_module=self._openai,
735                model=self.model,
736                normalized_attachments=normalized_attachments,
737                system_prompt=system_prompt,
738                user_prompt=user_prompt,
739                max_tokens=max_tokens,
740                provider_name="Local provider",
741                upload_purpose="assistants",
742                convert_csv_to_txt=False,
743            )
744            self._csv_attachment_supported = True
745            return text
746        except Exception as error:
747            if _is_attachment_unsupported_error(error):
748                self._csv_attachment_supported = False
749                logger.info(
750                    "Local endpoint does not support file attachments via /files + /responses; "
751                    "falling back to chat.completions text mode."
752                )
753                return None
754            raise
755
756    def get_model_info(self) -> dict[str, str]:
757        """Return local provider and model metadata.
758
759        Returns:
760            A dictionary with ``"provider"`` and ``"model"`` keys.
761        """
762        return {"provider": "local", "model": self.model}
logger = <Logger app.ai_providers.local_provider (WARNING)>
DEFAULT_LOCAL_MODEL = 'llama3.1:70b'
class LocalProvider(app.ai_providers.base.AIProvider):
 48class LocalProvider(AIProvider):
 49    """OpenAI-compatible local provider implementation.
 50
 51    Attributes:
 52        base_url (str): The normalized local endpoint base URL.
 53        model (str): The local model identifier.
 54        api_key (str): The API key for the local endpoint.
 55        attach_csv_as_file (bool): Whether to attempt file-attachment mode.
 56        request_timeout_seconds (float): HTTP timeout in seconds.
 57        client: The ``openai.OpenAI`` SDK client instance.
 58    """
 59
 60    def __init__(
 61        self,
 62        base_url: str,
 63        model: str,
 64        api_key: str = "not-needed",
 65        attach_csv_as_file: bool = True,
 66        request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 67    ) -> None:
 68        """Initialize the local provider.
 69
 70        Args:
 71            base_url: Base URL for the local endpoint. Normalized to
 72                include ``/v1`` if missing.
 73            model: Model identifier.
 74            api_key: API key. Defaults to ``"not-needed"``.
 75            attach_csv_as_file: If ``True``, attempt file uploads.
 76            request_timeout_seconds: HTTP timeout in seconds.
 77
 78        Raises:
 79            AIProviderError: If the ``openai`` SDK is not installed.
 80        """
 81        try:
 82            import openai
 83        except ImportError as error:
 84            raise AIProviderError(
 85                "openai SDK is not installed. Install it with `pip install openai`."
 86            ) from error
 87
 88        normalized_api_key = _normalize_api_key_value(api_key) or "not-needed"
 89
 90        self._openai = openai
 91        self.base_url = _normalize_openai_compatible_base_url(
 92            base_url=base_url,
 93            default_base_url=DEFAULT_LOCAL_BASE_URL,
 94        )
 95        self.model = model
 96        self.api_key = normalized_api_key
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self.request_timeout_seconds = _resolve_timeout_seconds(
 99            request_timeout_seconds,
100            DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
101        )
102        self._api_timeout_error_type = getattr(openai, "APITimeoutError", None)
103        self._csv_attachment_supported: bool | None = None
104        self.client = openai.OpenAI(
105            api_key=normalized_api_key,
106            base_url=self.base_url,
107            timeout=self.request_timeout_seconds,
108            max_retries=0,
109        )
110        logger.info(
111            "Initialized local provider at %s with model %s (timeout %.1fs)",
112            self.base_url,
113            model,
114            self.request_timeout_seconds,
115        )
116
117    def analyze(
118        self,
119        system_prompt: str,
120        user_prompt: str,
121        max_tokens: int = DEFAULT_MAX_TOKENS,
122    ) -> str:
123        """Send a prompt to the local endpoint and return the generated text.
124
125        Args:
126            system_prompt: The system-level instruction text.
127            user_prompt: The user-facing prompt with investigation context.
128            max_tokens: Maximum completion tokens.
129
130        Returns:
131            The generated analysis text.
132
133        Raises:
134            AIProviderError: On any API or network failure.
135        """
136        return self.analyze_with_attachments(
137            system_prompt=system_prompt,
138            user_prompt=user_prompt,
139            attachments=None,
140            max_tokens=max_tokens,
141        )
142
143    def analyze_stream(
144        self,
145        system_prompt: str,
146        user_prompt: str,
147        max_tokens: int = DEFAULT_MAX_TOKENS,
148    ) -> Iterator[str]:
149        """Stream generated text chunks from the local endpoint.
150
151        Falls back to non-streaming if the endpoint reports streaming
152        is unsupported.
153
154        Args:
155            system_prompt: The system-level instruction text.
156            user_prompt: The user-facing prompt with investigation context.
157            max_tokens: Maximum completion tokens.
158
159        Yields:
160            Text chunk strings as they are generated.
161
162        Raises:
163            AIProviderError: On empty response or API failure.
164        """
165        def _stream() -> Iterator[str]:
166            try:
167                prompt_for_completion = self._build_chat_completion_prompt(
168                    user_prompt=user_prompt,
169                    attachments=None,
170                )
171                try:
172                    stream = _run_with_rate_limit_retries(
173                        request_fn=lambda: self.client.chat.completions.create(
174                            model=self.model,
175                            max_tokens=max_tokens,
176                            messages=[
177                                {"role": "system", "content": system_prompt},
178                                {"role": "user", "content": prompt_for_completion},
179                            ],
180                            stream=True,
181                        ),
182                        rate_limit_error_type=self._openai.RateLimitError,
183                        provider_name="Local provider",
184                    )
185                except self._openai.BadRequestError as error:
186                    lowered_error = str(error).lower()
187                    if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error):
188                        fallback_text = self._request_non_stream(
189                            system_prompt=system_prompt,
190                            user_prompt=user_prompt,
191                            max_tokens=max_tokens,
192                            attachments=None,
193                        )
194                        if fallback_text:
195                            yield fallback_text
196                            return
197                    raise
198
199                emitted = False
200                for chunk in stream:
201                    choices = getattr(chunk, "choices", None)
202                    if not choices:
203                        continue
204                    choice = choices[0]
205                    delta = getattr(choice, "delta", None)
206                    if delta is None and isinstance(choice, dict):
207                        delta = choice.get("delta")
208                    chunk_text = _extract_openai_delta_text(
209                        delta,
210                        ("content", "reasoning_content", "reasoning", "thinking"),
211                    )
212                    if not chunk_text:
213                        continue
214                    emitted = True
215                    yield chunk_text
216
217                if not emitted:
218                    raise AIProviderError(
219                        "Local AI provider returned an empty streamed response. "
220                        "Try a different local model or increase max tokens."
221                    )
222            except AIProviderError:
223                raise
224            except self._openai.APIConnectionError as error:
225                self._raise_connection_error(error)
226            except self._openai.AuthenticationError as error:
227                raise AIProviderError(
228                    "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
229                ) from error
230            except self._openai.BadRequestError as error:
231                if _is_context_length_error(error):
232                    raise AIProviderError(
233                        "Local model request exceeded the context length. Reduce prompt size and retry."
234                    ) from error
235                raise AIProviderError(f"Local provider request was rejected: {error}") from error
236            except self._openai.APIError as error:
237                self._raise_api_error(error)
238            except Exception as error:
239                raise AIProviderError(f"Unexpected local provider error: {error}") from error
240
241        return _stream()
242
243    def analyze_with_attachments(
244        self,
245        system_prompt: str,
246        user_prompt: str,
247        attachments: list[Mapping[str, str]] | None,
248        max_tokens: int = DEFAULT_MAX_TOKENS,
249    ) -> str:
250        """Analyze with optional CSV file attachments.
251
252        Args:
253            system_prompt: The system-level instruction text.
254            user_prompt: The user-facing prompt with investigation context.
255            attachments: Optional list of attachment descriptors.
256            max_tokens: Maximum completion tokens.
257
258        Returns:
259            The generated analysis text.
260
261        Raises:
262            AIProviderError: On any API or network failure.
263        """
264        def _request() -> str:
265            return self._request_non_stream(
266                system_prompt=system_prompt,
267                user_prompt=user_prompt,
268                max_tokens=max_tokens,
269                attachments=attachments,
270            )
271
272        return self._run_local_request(_request)
273
274    def _run_local_request(self, request_fn: Callable[[], _T]) -> _T:
275        """Execute a local request with rate-limit retries and error mapping.
276
277        Args:
278            request_fn: A zero-argument callable that performs the request.
279
280        Returns:
281            The return value of ``request_fn`` on success.
282
283        Raises:
284            AIProviderError: On any OpenAI SDK error (with local messages).
285        """
286        try:
287            return _run_with_rate_limit_retries(
288                request_fn=request_fn,
289                rate_limit_error_type=self._openai.RateLimitError,
290                provider_name="Local provider",
291            )
292        except AIProviderError:
293            raise
294        except self._openai.APIConnectionError as error:
295            self._raise_connection_error(error)
296        except self._openai.AuthenticationError as error:
297            raise AIProviderError(
298                "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
299            ) from error
300        except self._openai.BadRequestError as error:
301            if _is_context_length_error(error):
302                raise AIProviderError(
303                    "Local model request exceeded the context length. Reduce prompt size and retry."
304                ) from error
305            raise AIProviderError(f"Local provider request was rejected: {error}") from error
306        except self._openai.APIError as error:
307            self._raise_api_error(error)
308        except Exception as error:
309            raise AIProviderError(f"Unexpected local provider error: {error}") from error
310
311    def _raise_connection_error(self, error: Exception) -> None:
312        """Map APIConnectionError to AIProviderError with timeout detection.
313
314        Args:
315            error: The connection error to map.
316
317        Raises:
318            AIProviderError: Always raised with appropriate message.
319        """
320        if (
321            self._api_timeout_error_type is not None
322            and isinstance(error, self._api_timeout_error_type)
323        ) or "timeout" in str(error).lower():
324            raise AIProviderError(
325                "Local AI request timed out after "
326                f"{self.request_timeout_seconds:g} seconds. "
327                "Increase `ai.local.request_timeout_seconds` for long-running prompts."
328            ) from error
329        raise AIProviderError(
330            "Unable to connect to local AI endpoint. Check `ai.local.base_url` and ensure the server is running."
331        ) from error
332
333    def _raise_api_error(self, error: Exception) -> None:
334        """Map APIError to AIProviderError with 404 detection.
335
336        Args:
337            error: The API error to map.
338
339        Raises:
340            AIProviderError: Always raised with appropriate message.
341        """
342        error_text = str(error).lower()
343        if "404" in error_text or "not found" in error_text:
344            raise AIProviderError(
345                "Local AI endpoint returned 404 (not found). "
346                "This is often caused by a base URL missing `/v1`. "
347                f"Current base URL: {self.base_url}"
348            ) from error
349        raise AIProviderError(f"Local provider API error: {error}") from error
350
351    def analyze_with_progress(
352        self,
353        system_prompt: str,
354        user_prompt: str,
355        progress_callback: Callable[[dict[str, str]], None] | None,
356        attachments: list[Mapping[str, str]] | None = None,
357        max_tokens: int = DEFAULT_MAX_TOKENS,
358    ) -> str:
359        """Analyze with streamed progress updates when supported.
360
361        Streams the response and periodically invokes ``progress_callback``
362        with accumulated thinking and answer text. Falls back to
363        ``analyze_with_attachments`` when no callback is provided.
364
365        Args:
366            system_prompt: The system-level instruction text.
367            user_prompt: The user-facing prompt with investigation context.
368            progress_callback: Optional callable receiving progress dicts.
369            attachments: Optional list of attachment descriptors.
370            max_tokens: Maximum completion tokens.
371
372        Returns:
373            The generated analysis text with reasoning blocks removed.
374
375        Raises:
376            AIProviderError: On empty response or API failure.
377        """
378        if progress_callback is None:
379            return self.analyze_with_attachments(
380                system_prompt=system_prompt,
381                user_prompt=user_prompt,
382                attachments=attachments,
383                max_tokens=max_tokens,
384            )
385
386        def _request() -> str:
387            result = self._build_stream_or_result(
388                system_prompt=system_prompt,
389                user_prompt=user_prompt,
390                max_tokens=max_tokens,
391                attachments=attachments,
392            )
393            if isinstance(result, str):
394                return result
395            stream = result
396
397            thinking_parts: list[str] = []
398            answer_parts: list[str] = []
399            last_emit_at = 0.0
400            last_sent_thinking = ""
401            last_sent_answer = ""
402
403            for chunk in stream:
404                chunk_result = self._process_stream_chunk(chunk)
405                if chunk_result is None:
406                    continue
407
408                thinking_delta, answer_delta = chunk_result
409                if thinking_delta:
410                    thinking_parts.append(thinking_delta)
411                if answer_delta:
412                    answer_parts.append(answer_delta)
413
414                current_thinking = "".join(thinking_parts).strip()
415                current_answer = _clean_streamed_answer_text(
416                    answer_text="".join(answer_parts),
417                    thinking_text=current_thinking,
418                )
419
420                last_emit_at, last_sent_thinking, last_sent_answer = (
421                    self._emit_progress_if_needed(
422                        progress_callback=progress_callback,
423                        current_thinking=current_thinking,
424                        current_answer=current_answer,
425                        last_emit_at=last_emit_at,
426                        last_sent_thinking=last_sent_thinking,
427                        last_sent_answer=last_sent_answer,
428                    )
429                )
430
431            return self._finalize_stream_response(thinking_parts, answer_parts)
432
433        return self._run_local_request(_request)
434
435    def _build_stream_or_result(
436        self,
437        system_prompt: str,
438        user_prompt: str,
439        max_tokens: int,
440        attachments: list[Mapping[str, str]] | None,
441    ) -> Any | str:
442        """Set up the streaming request, returning a stream or a final string.
443
444        Attempts CSV file attachment first. If that succeeds, returns the
445        completed text directly. Otherwise creates a streaming chat completion.
446        Falls back to non-streaming if the endpoint rejects streaming.
447
448        Args:
449            system_prompt: The system-level instruction text.
450            user_prompt: The user-facing prompt text.
451            max_tokens: Maximum completion tokens.
452            attachments: Optional list of attachment descriptors.
453
454        Returns:
455            A streaming response object, or a ``str`` if the result was
456            obtained without streaming (attachment path or fallback).
457
458        Raises:
459            AIProviderError: If the non-streaming fallback also fails.
460        """
461        attachment_response = self._request_with_csv_attachments(
462            system_prompt=system_prompt,
463            user_prompt=user_prompt,
464            max_tokens=max_tokens,
465            attachments=attachments,
466        )
467        if attachment_response:
468            cleaned = _strip_leading_reasoning_blocks(attachment_response)
469            return cleaned or attachment_response.strip()
470
471        prompt_for_completion = self._build_chat_completion_prompt(
472            user_prompt=user_prompt,
473            attachments=attachments,
474        )
475
476        try:
477            return self.client.chat.completions.create(
478                model=self.model,
479                max_tokens=max_tokens,
480                messages=[
481                    {"role": "system", "content": system_prompt},
482                    {"role": "user", "content": prompt_for_completion},
483                ],
484                stream=True,
485            )
486        except self._openai.BadRequestError as error:
487            lowered_error = str(error).lower()
488            if "stream" in lowered_error and (
489                "unsupported" in lowered_error or "not support" in lowered_error
490            ):
491                return self._request_non_stream(
492                    system_prompt=system_prompt,
493                    user_prompt=user_prompt,
494                    max_tokens=max_tokens,
495                    attachments=attachments,
496                )
497            raise
498
499    @staticmethod
500    def _process_stream_chunk(chunk: Any) -> tuple[str, str] | None:
501        """Extract thinking and answer deltas from a single stream chunk.
502
503        Args:
504            chunk: A streaming response chunk from the OpenAI SDK.
505
506        Returns:
507            A ``(thinking_delta, answer_delta)`` tuple, or ``None`` if the
508            chunk contains no usable text.
509        """
510        choices = getattr(chunk, "choices", None)
511        if not choices:
512            return None
513        choice = choices[0]
514        delta = getattr(choice, "delta", None)
515        if delta is None and isinstance(choice, dict):
516            delta = choice.get("delta")
517        if delta is None:
518            return None
519
520        answer_delta = _extract_openai_delta_text(delta, ("content",))
521        thinking_delta = _extract_openai_delta_text(
522            delta,
523            ("reasoning_content", "reasoning", "thinking"),
524        )
525
526        if not answer_delta and not thinking_delta:
527            return None
528        return (thinking_delta, answer_delta)
529
530    @staticmethod
531    def _emit_progress_if_needed(
532        progress_callback: Callable[[dict[str, str]], None],
533        current_thinking: str,
534        current_answer: str,
535        last_emit_at: float,
536        last_sent_thinking: str,
537        last_sent_answer: str,
538    ) -> tuple[float, str, str]:
539        """Send a progress callback if enough content has changed.
540
541        Applies rate-limiting so the callback fires at most every 0.35 s
542        unless at least 80 characters have been added to either channel.
543
544        Args:
545            progress_callback: The callable to invoke with progress data.
546            current_thinking: Accumulated thinking text so far.
547            current_answer: Accumulated answer text so far.
548            last_emit_at: Monotonic timestamp of the last emission.
549            last_sent_thinking: Thinking text sent in the last emission.
550            last_sent_answer: Answer text sent in the last emission.
551
552        Returns:
553            Updated ``(last_emit_at, last_sent_thinking, last_sent_answer)``.
554        """
555        if not current_thinking and not current_answer:
556            return last_emit_at, last_sent_thinking, last_sent_answer
557
558        changed = (
559            current_thinking != last_sent_thinking
560            or current_answer != last_sent_answer
561        )
562        if not changed:
563            return last_emit_at, last_sent_thinking, last_sent_answer
564
565        now = time.monotonic()
566        if now - last_emit_at < 0.35 and (
567            len(current_thinking) - len(last_sent_thinking) < 80
568            and len(current_answer) - len(last_sent_answer) < 80
569        ):
570            return last_emit_at, last_sent_thinking, last_sent_answer
571
572        try:
573            progress_callback(
574                {
575                    "status": "thinking",
576                    "thinking_text": current_thinking,
577                    "partial_text": current_answer,
578                }
579            )
580        except Exception:
581            pass
582
583        return now, current_thinking, current_answer
584
585    @staticmethod
586    def _finalize_stream_response(
587        thinking_parts: list[str],
588        answer_parts: list[str],
589    ) -> str:
590        """Assemble the final response text from accumulated stream parts.
591
592        Args:
593            thinking_parts: Collected thinking-channel text fragments.
594            answer_parts: Collected answer-channel text fragments.
595
596        Returns:
597            The cleaned final answer, or the thinking text if no answer
598            was produced.
599
600        Raises:
601            AIProviderError: If both channels are empty.
602        """
603        final_thinking = "".join(thinking_parts).strip()
604        final_answer = _clean_streamed_answer_text(
605            answer_text="".join(answer_parts),
606            thinking_text=final_thinking,
607        )
608        if final_answer:
609            return final_answer
610        if final_thinking:
611            return final_thinking
612        raise AIProviderError(
613            "Local AI provider returned an empty streamed response. "
614            "Try a different local model or increase max tokens."
615        )
616
617    def _request_non_stream(
618        self,
619        system_prompt: str,
620        user_prompt: str,
621        max_tokens: int,
622        attachments: list[Mapping[str, str]] | None = None,
623    ) -> str:
624        """Perform a non-streaming local request with attachment handling.
625
626        Args:
627            system_prompt: The system-level instruction text.
628            user_prompt: The user-facing prompt text.
629            max_tokens: Maximum completion tokens.
630            attachments: Optional list of attachment descriptors.
631
632        Returns:
633            The generated analysis text with reasoning blocks removed.
634
635        Raises:
636            AIProviderError: If the response is empty.
637        """
638        attachment_response = self._request_with_csv_attachments(
639            system_prompt=system_prompt,
640            user_prompt=user_prompt,
641            max_tokens=max_tokens,
642            attachments=attachments,
643        )
644        if attachment_response:
645            cleaned_attachment_response = _strip_leading_reasoning_blocks(attachment_response)
646            if cleaned_attachment_response:
647                return cleaned_attachment_response
648            return attachment_response.strip()
649
650        prompt_for_completion = self._build_chat_completion_prompt(
651            user_prompt=user_prompt,
652            attachments=attachments,
653        )
654
655        response = self.client.chat.completions.create(
656            model=self.model,
657            max_tokens=max_tokens,
658            messages=[
659                {"role": "system", "content": system_prompt},
660                {"role": "user", "content": prompt_for_completion},
661            ],
662        )
663        text = _extract_openai_text(response)
664        if text:
665            cleaned_text = _strip_leading_reasoning_blocks(text)
666            if cleaned_text:
667                return cleaned_text
668            return text.strip()
669
670        finish_reason = None
671        choices = getattr(response, "choices", None)
672        if choices:
673            first_choice = choices[0]
674            finish_reason = getattr(first_choice, "finish_reason", None)
675            if finish_reason is None and isinstance(first_choice, dict):
676                finish_reason = first_choice.get("finish_reason")
677        reason_detail = f" (finish_reason={finish_reason})" if finish_reason else ""
678        raise AIProviderError(
679            "Local AI provider returned an empty response"
680            f"{reason_detail}. This can happen with reasoning-only outputs or very low token limits."
681        )
682
683    def _build_chat_completion_prompt(
684        self,
685        user_prompt: str,
686        attachments: list[Mapping[str, str]] | None,
687    ) -> str:
688        """Build the user prompt, inlining attachments if needed.
689
690        Args:
691            user_prompt: The original user-facing prompt text.
692            attachments: Optional list of attachment descriptors.
693
694        Returns:
695            The prompt string, potentially with attachment data appended.
696        """
697        prompt_for_completion = user_prompt
698        if attachments:
699            prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt(
700                user_prompt=user_prompt,
701                attachments=attachments,
702            )
703            if inlined_attachment_data:
704                logger.info("Local attachment fallback inlined attachment data into prompt.")
705        return prompt_for_completion
706
707    def _request_with_csv_attachments(
708        self,
709        system_prompt: str,
710        user_prompt: str,
711        max_tokens: int,
712        attachments: list[Mapping[str, str]] | None,
713    ) -> str | None:
714        """Attempt to send a request with CSV files via the local Responses API.
715
716        Args:
717            system_prompt: The system-level instruction text.
718            user_prompt: The user-facing prompt text.
719            max_tokens: Maximum completion tokens.
720            attachments: Optional list of attachment descriptors.
721
722        Returns:
723            The generated text if succeeded, or ``None`` if skipped.
724        """
725        normalized_attachments = self._prepare_csv_attachments(
726            attachments,
727            supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"),
728        )
729        if not normalized_attachments:
730            return None
731
732        try:
733            text = upload_and_request_via_responses_api(
734                client=self.client,
735                openai_module=self._openai,
736                model=self.model,
737                normalized_attachments=normalized_attachments,
738                system_prompt=system_prompt,
739                user_prompt=user_prompt,
740                max_tokens=max_tokens,
741                provider_name="Local provider",
742                upload_purpose="assistants",
743                convert_csv_to_txt=False,
744            )
745            self._csv_attachment_supported = True
746            return text
747        except Exception as error:
748            if _is_attachment_unsupported_error(error):
749                self._csv_attachment_supported = False
750                logger.info(
751                    "Local endpoint does not support file attachments via /files + /responses; "
752                    "falling back to chat.completions text mode."
753                )
754                return None
755            raise
756
757    def get_model_info(self) -> dict[str, str]:
758        """Return local provider and model metadata.
759
760        Returns:
761            A dictionary with ``"provider"`` and ``"model"`` keys.
762        """
763        return {"provider": "local", "model": self.model}

OpenAI-compatible local provider implementation.

Attributes:
  • base_url (str): The normalized local endpoint base URL.
  • model (str): The local model identifier.
  • api_key (str): The API key for the local endpoint.
  • attach_csv_as_file (bool): Whether to attempt file-attachment mode.
  • request_timeout_seconds (float): HTTP timeout in seconds.
  • client: The openai.OpenAI SDK client instance.
LocalProvider( base_url: str, model: str, api_key: str = 'not-needed', attach_csv_as_file: bool = True, request_timeout_seconds: float = 3600.0)
 60    def __init__(
 61        self,
 62        base_url: str,
 63        model: str,
 64        api_key: str = "not-needed",
 65        attach_csv_as_file: bool = True,
 66        request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
 67    ) -> None:
 68        """Initialize the local provider.
 69
 70        Args:
 71            base_url: Base URL for the local endpoint. Normalized to
 72                include ``/v1`` if missing.
 73            model: Model identifier.
 74            api_key: API key. Defaults to ``"not-needed"``.
 75            attach_csv_as_file: If ``True``, attempt file uploads.
 76            request_timeout_seconds: HTTP timeout in seconds.
 77
 78        Raises:
 79            AIProviderError: If the ``openai`` SDK is not installed.
 80        """
 81        try:
 82            import openai
 83        except ImportError as error:
 84            raise AIProviderError(
 85                "openai SDK is not installed. Install it with `pip install openai`."
 86            ) from error
 87
 88        normalized_api_key = _normalize_api_key_value(api_key) or "not-needed"
 89
 90        self._openai = openai
 91        self.base_url = _normalize_openai_compatible_base_url(
 92            base_url=base_url,
 93            default_base_url=DEFAULT_LOCAL_BASE_URL,
 94        )
 95        self.model = model
 96        self.api_key = normalized_api_key
 97        self.attach_csv_as_file = bool(attach_csv_as_file)
 98        self.request_timeout_seconds = _resolve_timeout_seconds(
 99            request_timeout_seconds,
100            DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS,
101        )
102        self._api_timeout_error_type = getattr(openai, "APITimeoutError", None)
103        self._csv_attachment_supported: bool | None = None
104        self.client = openai.OpenAI(
105            api_key=normalized_api_key,
106            base_url=self.base_url,
107            timeout=self.request_timeout_seconds,
108            max_retries=0,
109        )
110        logger.info(
111            "Initialized local provider at %s with model %s (timeout %.1fs)",
112            self.base_url,
113            model,
114            self.request_timeout_seconds,
115        )

Initialize the local provider.

Arguments:
  • base_url: Base URL for the local endpoint. Normalized to include /v1 if missing.
  • model: Model identifier.
  • api_key: API key. Defaults to "not-needed".
  • attach_csv_as_file: If True, attempt file uploads.
  • request_timeout_seconds: HTTP timeout in seconds.
Raises:
  • AIProviderError: If the openai SDK is not installed.
base_url
model
api_key
attach_csv_as_file
request_timeout_seconds
client
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> str:
117    def analyze(
118        self,
119        system_prompt: str,
120        user_prompt: str,
121        max_tokens: int = DEFAULT_MAX_TOKENS,
122    ) -> str:
123        """Send a prompt to the local endpoint and return the generated text.
124
125        Args:
126            system_prompt: The system-level instruction text.
127            user_prompt: The user-facing prompt with investigation context.
128            max_tokens: Maximum completion tokens.
129
130        Returns:
131            The generated analysis text.
132
133        Raises:
134            AIProviderError: On any API or network failure.
135        """
136        return self.analyze_with_attachments(
137            system_prompt=system_prompt,
138            user_prompt=user_prompt,
139            attachments=None,
140            max_tokens=max_tokens,
141        )

Send a prompt to the local endpoint and return the generated text.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_stream( self, system_prompt: str, user_prompt: str, max_tokens: int = 256000) -> Iterator[str]:
143    def analyze_stream(
144        self,
145        system_prompt: str,
146        user_prompt: str,
147        max_tokens: int = DEFAULT_MAX_TOKENS,
148    ) -> Iterator[str]:
149        """Stream generated text chunks from the local endpoint.
150
151        Falls back to non-streaming if the endpoint reports streaming
152        is unsupported.
153
154        Args:
155            system_prompt: The system-level instruction text.
156            user_prompt: The user-facing prompt with investigation context.
157            max_tokens: Maximum completion tokens.
158
159        Yields:
160            Text chunk strings as they are generated.
161
162        Raises:
163            AIProviderError: On empty response or API failure.
164        """
165        def _stream() -> Iterator[str]:
166            try:
167                prompt_for_completion = self._build_chat_completion_prompt(
168                    user_prompt=user_prompt,
169                    attachments=None,
170                )
171                try:
172                    stream = _run_with_rate_limit_retries(
173                        request_fn=lambda: self.client.chat.completions.create(
174                            model=self.model,
175                            max_tokens=max_tokens,
176                            messages=[
177                                {"role": "system", "content": system_prompt},
178                                {"role": "user", "content": prompt_for_completion},
179                            ],
180                            stream=True,
181                        ),
182                        rate_limit_error_type=self._openai.RateLimitError,
183                        provider_name="Local provider",
184                    )
185                except self._openai.BadRequestError as error:
186                    lowered_error = str(error).lower()
187                    if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error):
188                        fallback_text = self._request_non_stream(
189                            system_prompt=system_prompt,
190                            user_prompt=user_prompt,
191                            max_tokens=max_tokens,
192                            attachments=None,
193                        )
194                        if fallback_text:
195                            yield fallback_text
196                            return
197                    raise
198
199                emitted = False
200                for chunk in stream:
201                    choices = getattr(chunk, "choices", None)
202                    if not choices:
203                        continue
204                    choice = choices[0]
205                    delta = getattr(choice, "delta", None)
206                    if delta is None and isinstance(choice, dict):
207                        delta = choice.get("delta")
208                    chunk_text = _extract_openai_delta_text(
209                        delta,
210                        ("content", "reasoning_content", "reasoning", "thinking"),
211                    )
212                    if not chunk_text:
213                        continue
214                    emitted = True
215                    yield chunk_text
216
217                if not emitted:
218                    raise AIProviderError(
219                        "Local AI provider returned an empty streamed response. "
220                        "Try a different local model or increase max tokens."
221                    )
222            except AIProviderError:
223                raise
224            except self._openai.APIConnectionError as error:
225                self._raise_connection_error(error)
226            except self._openai.AuthenticationError as error:
227                raise AIProviderError(
228                    "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one."
229                ) from error
230            except self._openai.BadRequestError as error:
231                if _is_context_length_error(error):
232                    raise AIProviderError(
233                        "Local model request exceeded the context length. Reduce prompt size and retry."
234                    ) from error
235                raise AIProviderError(f"Local provider request was rejected: {error}") from error
236            except self._openai.APIError as error:
237                self._raise_api_error(error)
238            except Exception as error:
239                raise AIProviderError(f"Unexpected local provider error: {error}") from error
240
241        return _stream()

Stream generated text chunks from the local endpoint.

Falls back to non-streaming if the endpoint reports streaming is unsupported.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • max_tokens: Maximum completion tokens.
Yields:

Text chunk strings as they are generated.

Raises:
  • AIProviderError: On empty response or API failure.
def analyze_with_attachments( self, system_prompt: str, user_prompt: str, attachments: list[typing.Mapping[str, str]] | None, max_tokens: int = 256000) -> str:
243    def analyze_with_attachments(
244        self,
245        system_prompt: str,
246        user_prompt: str,
247        attachments: list[Mapping[str, str]] | None,
248        max_tokens: int = DEFAULT_MAX_TOKENS,
249    ) -> str:
250        """Analyze with optional CSV file attachments.
251
252        Args:
253            system_prompt: The system-level instruction text.
254            user_prompt: The user-facing prompt with investigation context.
255            attachments: Optional list of attachment descriptors.
256            max_tokens: Maximum completion tokens.
257
258        Returns:
259            The generated analysis text.
260
261        Raises:
262            AIProviderError: On any API or network failure.
263        """
264        def _request() -> str:
265            return self._request_non_stream(
266                system_prompt=system_prompt,
267                user_prompt=user_prompt,
268                max_tokens=max_tokens,
269                attachments=attachments,
270            )
271
272        return self._run_local_request(_request)

Analyze with optional CSV file attachments.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text.

Raises:
  • AIProviderError: On any API or network failure.
def analyze_with_progress( self, system_prompt: str, user_prompt: str, progress_callback: Optional[Callable[[dict[str, str]], NoneType]], attachments: list[typing.Mapping[str, str]] | None = None, max_tokens: int = 256000) -> str:
351    def analyze_with_progress(
352        self,
353        system_prompt: str,
354        user_prompt: str,
355        progress_callback: Callable[[dict[str, str]], None] | None,
356        attachments: list[Mapping[str, str]] | None = None,
357        max_tokens: int = DEFAULT_MAX_TOKENS,
358    ) -> str:
359        """Analyze with streamed progress updates when supported.
360
361        Streams the response and periodically invokes ``progress_callback``
362        with accumulated thinking and answer text. Falls back to
363        ``analyze_with_attachments`` when no callback is provided.
364
365        Args:
366            system_prompt: The system-level instruction text.
367            user_prompt: The user-facing prompt with investigation context.
368            progress_callback: Optional callable receiving progress dicts.
369            attachments: Optional list of attachment descriptors.
370            max_tokens: Maximum completion tokens.
371
372        Returns:
373            The generated analysis text with reasoning blocks removed.
374
375        Raises:
376            AIProviderError: On empty response or API failure.
377        """
378        if progress_callback is None:
379            return self.analyze_with_attachments(
380                system_prompt=system_prompt,
381                user_prompt=user_prompt,
382                attachments=attachments,
383                max_tokens=max_tokens,
384            )
385
386        def _request() -> str:
387            result = self._build_stream_or_result(
388                system_prompt=system_prompt,
389                user_prompt=user_prompt,
390                max_tokens=max_tokens,
391                attachments=attachments,
392            )
393            if isinstance(result, str):
394                return result
395            stream = result
396
397            thinking_parts: list[str] = []
398            answer_parts: list[str] = []
399            last_emit_at = 0.0
400            last_sent_thinking = ""
401            last_sent_answer = ""
402
403            for chunk in stream:
404                chunk_result = self._process_stream_chunk(chunk)
405                if chunk_result is None:
406                    continue
407
408                thinking_delta, answer_delta = chunk_result
409                if thinking_delta:
410                    thinking_parts.append(thinking_delta)
411                if answer_delta:
412                    answer_parts.append(answer_delta)
413
414                current_thinking = "".join(thinking_parts).strip()
415                current_answer = _clean_streamed_answer_text(
416                    answer_text="".join(answer_parts),
417                    thinking_text=current_thinking,
418                )
419
420                last_emit_at, last_sent_thinking, last_sent_answer = (
421                    self._emit_progress_if_needed(
422                        progress_callback=progress_callback,
423                        current_thinking=current_thinking,
424                        current_answer=current_answer,
425                        last_emit_at=last_emit_at,
426                        last_sent_thinking=last_sent_thinking,
427                        last_sent_answer=last_sent_answer,
428                    )
429                )
430
431            return self._finalize_stream_response(thinking_parts, answer_parts)
432
433        return self._run_local_request(_request)

Analyze with streamed progress updates when supported.

Streams the response and periodically invokes progress_callback with accumulated thinking and answer text. Falls back to analyze_with_attachments when no callback is provided.

Arguments:
  • system_prompt: The system-level instruction text.
  • user_prompt: The user-facing prompt with investigation context.
  • progress_callback: Optional callable receiving progress dicts.
  • attachments: Optional list of attachment descriptors.
  • max_tokens: Maximum completion tokens.
Returns:

The generated analysis text with reasoning blocks removed.

Raises:
  • AIProviderError: On empty response or API failure.
def get_model_info(self) -> dict[str, str]:
757    def get_model_info(self) -> dict[str, str]:
758        """Return local provider and model metadata.
759
760        Returns:
761            A dictionary with ``"provider"`` and ``"model"`` keys.
762        """
763        return {"provider": "local", "model": self.model}

Return local provider and model metadata.

Returns:

A dictionary with "provider" and "model" keys.