app.ai_providers.local_provider
OpenAI-compatible local provider implementation.
Uses the openai Python SDK pointed at a local endpoint (Ollama,
LM Studio, vLLM, or similar). Supports synchronous and streaming
generation, CSV file attachments via the Responses API when available,
automatic reasoning-block stripping for local reasoning models, and
configurable request timeouts.
Attributes:
- logger: Module-level logger for local provider operations.
1"""OpenAI-compatible local provider implementation. 2 3Uses the ``openai`` Python SDK pointed at a local endpoint (Ollama, 4LM Studio, vLLM, or similar). Supports synchronous and streaming 5generation, CSV file attachments via the Responses API when available, 6automatic reasoning-block stripping for local reasoning models, and 7configurable request timeouts. 8 9Attributes: 10 logger: Module-level logger for local provider operations. 11""" 12 13from __future__ import annotations 14 15import logging 16import time 17from typing import Any, Callable, Iterator, Mapping 18 19from .base import ( 20 AIProvider, 21 AIProviderError, 22 DEFAULT_LOCAL_BASE_URL, 23 DEFAULT_MAX_TOKENS, 24 DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 25 _is_attachment_unsupported_error, 26 _is_context_length_error, 27 _normalize_api_key_value, 28 _normalize_openai_compatible_base_url, 29 _resolve_timeout_seconds, 30 _run_with_rate_limit_retries, 31 _T, 32) 33from .utils import ( 34 _clean_streamed_answer_text, 35 _extract_openai_delta_text, 36 _extract_openai_text, 37 _inline_attachment_data_into_prompt, 38 _strip_leading_reasoning_blocks, 39 upload_and_request_via_responses_api, 40) 41 42logger = logging.getLogger(__name__) 43 44DEFAULT_LOCAL_MODEL = "llama3.1:70b" 45 46 47class LocalProvider(AIProvider): 48 """OpenAI-compatible local provider implementation. 49 50 Attributes: 51 base_url (str): The normalized local endpoint base URL. 52 model (str): The local model identifier. 53 api_key (str): The API key for the local endpoint. 54 attach_csv_as_file (bool): Whether to attempt file-attachment mode. 55 request_timeout_seconds (float): HTTP timeout in seconds. 56 client: The ``openai.OpenAI`` SDK client instance. 57 """ 58 59 def __init__( 60 self, 61 base_url: str, 62 model: str, 63 api_key: str = "not-needed", 64 attach_csv_as_file: bool = True, 65 request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 66 ) -> None: 67 """Initialize the local provider. 68 69 Args: 70 base_url: Base URL for the local endpoint. Normalized to 71 include ``/v1`` if missing. 72 model: Model identifier. 73 api_key: API key. Defaults to ``"not-needed"``. 74 attach_csv_as_file: If ``True``, attempt file uploads. 75 request_timeout_seconds: HTTP timeout in seconds. 76 77 Raises: 78 AIProviderError: If the ``openai`` SDK is not installed. 79 """ 80 try: 81 import openai 82 except ImportError as error: 83 raise AIProviderError( 84 "openai SDK is not installed. Install it with `pip install openai`." 85 ) from error 86 87 normalized_api_key = _normalize_api_key_value(api_key) or "not-needed" 88 89 self._openai = openai 90 self.base_url = _normalize_openai_compatible_base_url( 91 base_url=base_url, 92 default_base_url=DEFAULT_LOCAL_BASE_URL, 93 ) 94 self.model = model 95 self.api_key = normalized_api_key 96 self.attach_csv_as_file = bool(attach_csv_as_file) 97 self.request_timeout_seconds = _resolve_timeout_seconds( 98 request_timeout_seconds, 99 DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 100 ) 101 self._api_timeout_error_type = getattr(openai, "APITimeoutError", None) 102 self._csv_attachment_supported: bool | None = None 103 self.client = openai.OpenAI( 104 api_key=normalized_api_key, 105 base_url=self.base_url, 106 timeout=self.request_timeout_seconds, 107 max_retries=0, 108 ) 109 logger.info( 110 "Initialized local provider at %s with model %s (timeout %.1fs)", 111 self.base_url, 112 model, 113 self.request_timeout_seconds, 114 ) 115 116 def analyze( 117 self, 118 system_prompt: str, 119 user_prompt: str, 120 max_tokens: int = DEFAULT_MAX_TOKENS, 121 ) -> str: 122 """Send a prompt to the local endpoint and return the generated text. 123 124 Args: 125 system_prompt: The system-level instruction text. 126 user_prompt: The user-facing prompt with investigation context. 127 max_tokens: Maximum completion tokens. 128 129 Returns: 130 The generated analysis text. 131 132 Raises: 133 AIProviderError: On any API or network failure. 134 """ 135 return self.analyze_with_attachments( 136 system_prompt=system_prompt, 137 user_prompt=user_prompt, 138 attachments=None, 139 max_tokens=max_tokens, 140 ) 141 142 def analyze_stream( 143 self, 144 system_prompt: str, 145 user_prompt: str, 146 max_tokens: int = DEFAULT_MAX_TOKENS, 147 ) -> Iterator[str]: 148 """Stream generated text chunks from the local endpoint. 149 150 Falls back to non-streaming if the endpoint reports streaming 151 is unsupported. 152 153 Args: 154 system_prompt: The system-level instruction text. 155 user_prompt: The user-facing prompt with investigation context. 156 max_tokens: Maximum completion tokens. 157 158 Yields: 159 Text chunk strings as they are generated. 160 161 Raises: 162 AIProviderError: On empty response or API failure. 163 """ 164 def _stream() -> Iterator[str]: 165 try: 166 prompt_for_completion = self._build_chat_completion_prompt( 167 user_prompt=user_prompt, 168 attachments=None, 169 ) 170 try: 171 stream = _run_with_rate_limit_retries( 172 request_fn=lambda: self.client.chat.completions.create( 173 model=self.model, 174 max_tokens=max_tokens, 175 messages=[ 176 {"role": "system", "content": system_prompt}, 177 {"role": "user", "content": prompt_for_completion}, 178 ], 179 stream=True, 180 ), 181 rate_limit_error_type=self._openai.RateLimitError, 182 provider_name="Local provider", 183 ) 184 except self._openai.BadRequestError as error: 185 lowered_error = str(error).lower() 186 if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error): 187 fallback_text = self._request_non_stream( 188 system_prompt=system_prompt, 189 user_prompt=user_prompt, 190 max_tokens=max_tokens, 191 attachments=None, 192 ) 193 if fallback_text: 194 yield fallback_text 195 return 196 raise 197 198 emitted = False 199 for chunk in stream: 200 choices = getattr(chunk, "choices", None) 201 if not choices: 202 continue 203 choice = choices[0] 204 delta = getattr(choice, "delta", None) 205 if delta is None and isinstance(choice, dict): 206 delta = choice.get("delta") 207 chunk_text = _extract_openai_delta_text( 208 delta, 209 ("content", "reasoning_content", "reasoning", "thinking"), 210 ) 211 if not chunk_text: 212 continue 213 emitted = True 214 yield chunk_text 215 216 if not emitted: 217 raise AIProviderError( 218 "Local AI provider returned an empty streamed response. " 219 "Try a different local model or increase max tokens." 220 ) 221 except AIProviderError: 222 raise 223 except self._openai.APIConnectionError as error: 224 self._raise_connection_error(error) 225 except self._openai.AuthenticationError as error: 226 raise AIProviderError( 227 "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one." 228 ) from error 229 except self._openai.BadRequestError as error: 230 if _is_context_length_error(error): 231 raise AIProviderError( 232 "Local model request exceeded the context length. Reduce prompt size and retry." 233 ) from error 234 raise AIProviderError(f"Local provider request was rejected: {error}") from error 235 except self._openai.APIError as error: 236 self._raise_api_error(error) 237 except Exception as error: 238 raise AIProviderError(f"Unexpected local provider error: {error}") from error 239 240 return _stream() 241 242 def analyze_with_attachments( 243 self, 244 system_prompt: str, 245 user_prompt: str, 246 attachments: list[Mapping[str, str]] | None, 247 max_tokens: int = DEFAULT_MAX_TOKENS, 248 ) -> str: 249 """Analyze with optional CSV file attachments. 250 251 Args: 252 system_prompt: The system-level instruction text. 253 user_prompt: The user-facing prompt with investigation context. 254 attachments: Optional list of attachment descriptors. 255 max_tokens: Maximum completion tokens. 256 257 Returns: 258 The generated analysis text. 259 260 Raises: 261 AIProviderError: On any API or network failure. 262 """ 263 def _request() -> str: 264 return self._request_non_stream( 265 system_prompt=system_prompt, 266 user_prompt=user_prompt, 267 max_tokens=max_tokens, 268 attachments=attachments, 269 ) 270 271 return self._run_local_request(_request) 272 273 def _run_local_request(self, request_fn: Callable[[], _T]) -> _T: 274 """Execute a local request with rate-limit retries and error mapping. 275 276 Args: 277 request_fn: A zero-argument callable that performs the request. 278 279 Returns: 280 The return value of ``request_fn`` on success. 281 282 Raises: 283 AIProviderError: On any OpenAI SDK error (with local messages). 284 """ 285 try: 286 return _run_with_rate_limit_retries( 287 request_fn=request_fn, 288 rate_limit_error_type=self._openai.RateLimitError, 289 provider_name="Local provider", 290 ) 291 except AIProviderError: 292 raise 293 except self._openai.APIConnectionError as error: 294 self._raise_connection_error(error) 295 except self._openai.AuthenticationError as error: 296 raise AIProviderError( 297 "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one." 298 ) from error 299 except self._openai.BadRequestError as error: 300 if _is_context_length_error(error): 301 raise AIProviderError( 302 "Local model request exceeded the context length. Reduce prompt size and retry." 303 ) from error 304 raise AIProviderError(f"Local provider request was rejected: {error}") from error 305 except self._openai.APIError as error: 306 self._raise_api_error(error) 307 except Exception as error: 308 raise AIProviderError(f"Unexpected local provider error: {error}") from error 309 310 def _raise_connection_error(self, error: Exception) -> None: 311 """Map APIConnectionError to AIProviderError with timeout detection. 312 313 Args: 314 error: The connection error to map. 315 316 Raises: 317 AIProviderError: Always raised with appropriate message. 318 """ 319 if ( 320 self._api_timeout_error_type is not None 321 and isinstance(error, self._api_timeout_error_type) 322 ) or "timeout" in str(error).lower(): 323 raise AIProviderError( 324 "Local AI request timed out after " 325 f"{self.request_timeout_seconds:g} seconds. " 326 "Increase `ai.local.request_timeout_seconds` for long-running prompts." 327 ) from error 328 raise AIProviderError( 329 "Unable to connect to local AI endpoint. Check `ai.local.base_url` and ensure the server is running." 330 ) from error 331 332 def _raise_api_error(self, error: Exception) -> None: 333 """Map APIError to AIProviderError with 404 detection. 334 335 Args: 336 error: The API error to map. 337 338 Raises: 339 AIProviderError: Always raised with appropriate message. 340 """ 341 error_text = str(error).lower() 342 if "404" in error_text or "not found" in error_text: 343 raise AIProviderError( 344 "Local AI endpoint returned 404 (not found). " 345 "This is often caused by a base URL missing `/v1`. " 346 f"Current base URL: {self.base_url}" 347 ) from error 348 raise AIProviderError(f"Local provider API error: {error}") from error 349 350 def analyze_with_progress( 351 self, 352 system_prompt: str, 353 user_prompt: str, 354 progress_callback: Callable[[dict[str, str]], None] | None, 355 attachments: list[Mapping[str, str]] | None = None, 356 max_tokens: int = DEFAULT_MAX_TOKENS, 357 ) -> str: 358 """Analyze with streamed progress updates when supported. 359 360 Streams the response and periodically invokes ``progress_callback`` 361 with accumulated thinking and answer text. Falls back to 362 ``analyze_with_attachments`` when no callback is provided. 363 364 Args: 365 system_prompt: The system-level instruction text. 366 user_prompt: The user-facing prompt with investigation context. 367 progress_callback: Optional callable receiving progress dicts. 368 attachments: Optional list of attachment descriptors. 369 max_tokens: Maximum completion tokens. 370 371 Returns: 372 The generated analysis text with reasoning blocks removed. 373 374 Raises: 375 AIProviderError: On empty response or API failure. 376 """ 377 if progress_callback is None: 378 return self.analyze_with_attachments( 379 system_prompt=system_prompt, 380 user_prompt=user_prompt, 381 attachments=attachments, 382 max_tokens=max_tokens, 383 ) 384 385 def _request() -> str: 386 result = self._build_stream_or_result( 387 system_prompt=system_prompt, 388 user_prompt=user_prompt, 389 max_tokens=max_tokens, 390 attachments=attachments, 391 ) 392 if isinstance(result, str): 393 return result 394 stream = result 395 396 thinking_parts: list[str] = [] 397 answer_parts: list[str] = [] 398 last_emit_at = 0.0 399 last_sent_thinking = "" 400 last_sent_answer = "" 401 402 for chunk in stream: 403 chunk_result = self._process_stream_chunk(chunk) 404 if chunk_result is None: 405 continue 406 407 thinking_delta, answer_delta = chunk_result 408 if thinking_delta: 409 thinking_parts.append(thinking_delta) 410 if answer_delta: 411 answer_parts.append(answer_delta) 412 413 current_thinking = "".join(thinking_parts).strip() 414 current_answer = _clean_streamed_answer_text( 415 answer_text="".join(answer_parts), 416 thinking_text=current_thinking, 417 ) 418 419 last_emit_at, last_sent_thinking, last_sent_answer = ( 420 self._emit_progress_if_needed( 421 progress_callback=progress_callback, 422 current_thinking=current_thinking, 423 current_answer=current_answer, 424 last_emit_at=last_emit_at, 425 last_sent_thinking=last_sent_thinking, 426 last_sent_answer=last_sent_answer, 427 ) 428 ) 429 430 return self._finalize_stream_response(thinking_parts, answer_parts) 431 432 return self._run_local_request(_request) 433 434 def _build_stream_or_result( 435 self, 436 system_prompt: str, 437 user_prompt: str, 438 max_tokens: int, 439 attachments: list[Mapping[str, str]] | None, 440 ) -> Any | str: 441 """Set up the streaming request, returning a stream or a final string. 442 443 Attempts CSV file attachment first. If that succeeds, returns the 444 completed text directly. Otherwise creates a streaming chat completion. 445 Falls back to non-streaming if the endpoint rejects streaming. 446 447 Args: 448 system_prompt: The system-level instruction text. 449 user_prompt: The user-facing prompt text. 450 max_tokens: Maximum completion tokens. 451 attachments: Optional list of attachment descriptors. 452 453 Returns: 454 A streaming response object, or a ``str`` if the result was 455 obtained without streaming (attachment path or fallback). 456 457 Raises: 458 AIProviderError: If the non-streaming fallback also fails. 459 """ 460 attachment_response = self._request_with_csv_attachments( 461 system_prompt=system_prompt, 462 user_prompt=user_prompt, 463 max_tokens=max_tokens, 464 attachments=attachments, 465 ) 466 if attachment_response: 467 cleaned = _strip_leading_reasoning_blocks(attachment_response) 468 return cleaned or attachment_response.strip() 469 470 prompt_for_completion = self._build_chat_completion_prompt( 471 user_prompt=user_prompt, 472 attachments=attachments, 473 ) 474 475 try: 476 return self.client.chat.completions.create( 477 model=self.model, 478 max_tokens=max_tokens, 479 messages=[ 480 {"role": "system", "content": system_prompt}, 481 {"role": "user", "content": prompt_for_completion}, 482 ], 483 stream=True, 484 ) 485 except self._openai.BadRequestError as error: 486 lowered_error = str(error).lower() 487 if "stream" in lowered_error and ( 488 "unsupported" in lowered_error or "not support" in lowered_error 489 ): 490 return self._request_non_stream( 491 system_prompt=system_prompt, 492 user_prompt=user_prompt, 493 max_tokens=max_tokens, 494 attachments=attachments, 495 ) 496 raise 497 498 @staticmethod 499 def _process_stream_chunk(chunk: Any) -> tuple[str, str] | None: 500 """Extract thinking and answer deltas from a single stream chunk. 501 502 Args: 503 chunk: A streaming response chunk from the OpenAI SDK. 504 505 Returns: 506 A ``(thinking_delta, answer_delta)`` tuple, or ``None`` if the 507 chunk contains no usable text. 508 """ 509 choices = getattr(chunk, "choices", None) 510 if not choices: 511 return None 512 choice = choices[0] 513 delta = getattr(choice, "delta", None) 514 if delta is None and isinstance(choice, dict): 515 delta = choice.get("delta") 516 if delta is None: 517 return None 518 519 answer_delta = _extract_openai_delta_text(delta, ("content",)) 520 thinking_delta = _extract_openai_delta_text( 521 delta, 522 ("reasoning_content", "reasoning", "thinking"), 523 ) 524 525 if not answer_delta and not thinking_delta: 526 return None 527 return (thinking_delta, answer_delta) 528 529 @staticmethod 530 def _emit_progress_if_needed( 531 progress_callback: Callable[[dict[str, str]], None], 532 current_thinking: str, 533 current_answer: str, 534 last_emit_at: float, 535 last_sent_thinking: str, 536 last_sent_answer: str, 537 ) -> tuple[float, str, str]: 538 """Send a progress callback if enough content has changed. 539 540 Applies rate-limiting so the callback fires at most every 0.35 s 541 unless at least 80 characters have been added to either channel. 542 543 Args: 544 progress_callback: The callable to invoke with progress data. 545 current_thinking: Accumulated thinking text so far. 546 current_answer: Accumulated answer text so far. 547 last_emit_at: Monotonic timestamp of the last emission. 548 last_sent_thinking: Thinking text sent in the last emission. 549 last_sent_answer: Answer text sent in the last emission. 550 551 Returns: 552 Updated ``(last_emit_at, last_sent_thinking, last_sent_answer)``. 553 """ 554 if not current_thinking and not current_answer: 555 return last_emit_at, last_sent_thinking, last_sent_answer 556 557 changed = ( 558 current_thinking != last_sent_thinking 559 or current_answer != last_sent_answer 560 ) 561 if not changed: 562 return last_emit_at, last_sent_thinking, last_sent_answer 563 564 now = time.monotonic() 565 if now - last_emit_at < 0.35 and ( 566 len(current_thinking) - len(last_sent_thinking) < 80 567 and len(current_answer) - len(last_sent_answer) < 80 568 ): 569 return last_emit_at, last_sent_thinking, last_sent_answer 570 571 try: 572 progress_callback( 573 { 574 "status": "thinking", 575 "thinking_text": current_thinking, 576 "partial_text": current_answer, 577 } 578 ) 579 except Exception: 580 pass 581 582 return now, current_thinking, current_answer 583 584 @staticmethod 585 def _finalize_stream_response( 586 thinking_parts: list[str], 587 answer_parts: list[str], 588 ) -> str: 589 """Assemble the final response text from accumulated stream parts. 590 591 Args: 592 thinking_parts: Collected thinking-channel text fragments. 593 answer_parts: Collected answer-channel text fragments. 594 595 Returns: 596 The cleaned final answer, or the thinking text if no answer 597 was produced. 598 599 Raises: 600 AIProviderError: If both channels are empty. 601 """ 602 final_thinking = "".join(thinking_parts).strip() 603 final_answer = _clean_streamed_answer_text( 604 answer_text="".join(answer_parts), 605 thinking_text=final_thinking, 606 ) 607 if final_answer: 608 return final_answer 609 if final_thinking: 610 return final_thinking 611 raise AIProviderError( 612 "Local AI provider returned an empty streamed response. " 613 "Try a different local model or increase max tokens." 614 ) 615 616 def _request_non_stream( 617 self, 618 system_prompt: str, 619 user_prompt: str, 620 max_tokens: int, 621 attachments: list[Mapping[str, str]] | None = None, 622 ) -> str: 623 """Perform a non-streaming local request with attachment handling. 624 625 Args: 626 system_prompt: The system-level instruction text. 627 user_prompt: The user-facing prompt text. 628 max_tokens: Maximum completion tokens. 629 attachments: Optional list of attachment descriptors. 630 631 Returns: 632 The generated analysis text with reasoning blocks removed. 633 634 Raises: 635 AIProviderError: If the response is empty. 636 """ 637 attachment_response = self._request_with_csv_attachments( 638 system_prompt=system_prompt, 639 user_prompt=user_prompt, 640 max_tokens=max_tokens, 641 attachments=attachments, 642 ) 643 if attachment_response: 644 cleaned_attachment_response = _strip_leading_reasoning_blocks(attachment_response) 645 if cleaned_attachment_response: 646 return cleaned_attachment_response 647 return attachment_response.strip() 648 649 prompt_for_completion = self._build_chat_completion_prompt( 650 user_prompt=user_prompt, 651 attachments=attachments, 652 ) 653 654 response = self.client.chat.completions.create( 655 model=self.model, 656 max_tokens=max_tokens, 657 messages=[ 658 {"role": "system", "content": system_prompt}, 659 {"role": "user", "content": prompt_for_completion}, 660 ], 661 ) 662 text = _extract_openai_text(response) 663 if text: 664 cleaned_text = _strip_leading_reasoning_blocks(text) 665 if cleaned_text: 666 return cleaned_text 667 return text.strip() 668 669 finish_reason = None 670 choices = getattr(response, "choices", None) 671 if choices: 672 first_choice = choices[0] 673 finish_reason = getattr(first_choice, "finish_reason", None) 674 if finish_reason is None and isinstance(first_choice, dict): 675 finish_reason = first_choice.get("finish_reason") 676 reason_detail = f" (finish_reason={finish_reason})" if finish_reason else "" 677 raise AIProviderError( 678 "Local AI provider returned an empty response" 679 f"{reason_detail}. This can happen with reasoning-only outputs or very low token limits." 680 ) 681 682 def _build_chat_completion_prompt( 683 self, 684 user_prompt: str, 685 attachments: list[Mapping[str, str]] | None, 686 ) -> str: 687 """Build the user prompt, inlining attachments if needed. 688 689 Args: 690 user_prompt: The original user-facing prompt text. 691 attachments: Optional list of attachment descriptors. 692 693 Returns: 694 The prompt string, potentially with attachment data appended. 695 """ 696 prompt_for_completion = user_prompt 697 if attachments: 698 prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt( 699 user_prompt=user_prompt, 700 attachments=attachments, 701 ) 702 if inlined_attachment_data: 703 logger.info("Local attachment fallback inlined attachment data into prompt.") 704 return prompt_for_completion 705 706 def _request_with_csv_attachments( 707 self, 708 system_prompt: str, 709 user_prompt: str, 710 max_tokens: int, 711 attachments: list[Mapping[str, str]] | None, 712 ) -> str | None: 713 """Attempt to send a request with CSV files via the local Responses API. 714 715 Args: 716 system_prompt: The system-level instruction text. 717 user_prompt: The user-facing prompt text. 718 max_tokens: Maximum completion tokens. 719 attachments: Optional list of attachment descriptors. 720 721 Returns: 722 The generated text if succeeded, or ``None`` if skipped. 723 """ 724 normalized_attachments = self._prepare_csv_attachments( 725 attachments, 726 supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"), 727 ) 728 if not normalized_attachments: 729 return None 730 731 try: 732 text = upload_and_request_via_responses_api( 733 client=self.client, 734 openai_module=self._openai, 735 model=self.model, 736 normalized_attachments=normalized_attachments, 737 system_prompt=system_prompt, 738 user_prompt=user_prompt, 739 max_tokens=max_tokens, 740 provider_name="Local provider", 741 upload_purpose="assistants", 742 convert_csv_to_txt=False, 743 ) 744 self._csv_attachment_supported = True 745 return text 746 except Exception as error: 747 if _is_attachment_unsupported_error(error): 748 self._csv_attachment_supported = False 749 logger.info( 750 "Local endpoint does not support file attachments via /files + /responses; " 751 "falling back to chat.completions text mode." 752 ) 753 return None 754 raise 755 756 def get_model_info(self) -> dict[str, str]: 757 """Return local provider and model metadata. 758 759 Returns: 760 A dictionary with ``"provider"`` and ``"model"`` keys. 761 """ 762 return {"provider": "local", "model": self.model}
48class LocalProvider(AIProvider): 49 """OpenAI-compatible local provider implementation. 50 51 Attributes: 52 base_url (str): The normalized local endpoint base URL. 53 model (str): The local model identifier. 54 api_key (str): The API key for the local endpoint. 55 attach_csv_as_file (bool): Whether to attempt file-attachment mode. 56 request_timeout_seconds (float): HTTP timeout in seconds. 57 client: The ``openai.OpenAI`` SDK client instance. 58 """ 59 60 def __init__( 61 self, 62 base_url: str, 63 model: str, 64 api_key: str = "not-needed", 65 attach_csv_as_file: bool = True, 66 request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 67 ) -> None: 68 """Initialize the local provider. 69 70 Args: 71 base_url: Base URL for the local endpoint. Normalized to 72 include ``/v1`` if missing. 73 model: Model identifier. 74 api_key: API key. Defaults to ``"not-needed"``. 75 attach_csv_as_file: If ``True``, attempt file uploads. 76 request_timeout_seconds: HTTP timeout in seconds. 77 78 Raises: 79 AIProviderError: If the ``openai`` SDK is not installed. 80 """ 81 try: 82 import openai 83 except ImportError as error: 84 raise AIProviderError( 85 "openai SDK is not installed. Install it with `pip install openai`." 86 ) from error 87 88 normalized_api_key = _normalize_api_key_value(api_key) or "not-needed" 89 90 self._openai = openai 91 self.base_url = _normalize_openai_compatible_base_url( 92 base_url=base_url, 93 default_base_url=DEFAULT_LOCAL_BASE_URL, 94 ) 95 self.model = model 96 self.api_key = normalized_api_key 97 self.attach_csv_as_file = bool(attach_csv_as_file) 98 self.request_timeout_seconds = _resolve_timeout_seconds( 99 request_timeout_seconds, 100 DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 101 ) 102 self._api_timeout_error_type = getattr(openai, "APITimeoutError", None) 103 self._csv_attachment_supported: bool | None = None 104 self.client = openai.OpenAI( 105 api_key=normalized_api_key, 106 base_url=self.base_url, 107 timeout=self.request_timeout_seconds, 108 max_retries=0, 109 ) 110 logger.info( 111 "Initialized local provider at %s with model %s (timeout %.1fs)", 112 self.base_url, 113 model, 114 self.request_timeout_seconds, 115 ) 116 117 def analyze( 118 self, 119 system_prompt: str, 120 user_prompt: str, 121 max_tokens: int = DEFAULT_MAX_TOKENS, 122 ) -> str: 123 """Send a prompt to the local endpoint and return the generated text. 124 125 Args: 126 system_prompt: The system-level instruction text. 127 user_prompt: The user-facing prompt with investigation context. 128 max_tokens: Maximum completion tokens. 129 130 Returns: 131 The generated analysis text. 132 133 Raises: 134 AIProviderError: On any API or network failure. 135 """ 136 return self.analyze_with_attachments( 137 system_prompt=system_prompt, 138 user_prompt=user_prompt, 139 attachments=None, 140 max_tokens=max_tokens, 141 ) 142 143 def analyze_stream( 144 self, 145 system_prompt: str, 146 user_prompt: str, 147 max_tokens: int = DEFAULT_MAX_TOKENS, 148 ) -> Iterator[str]: 149 """Stream generated text chunks from the local endpoint. 150 151 Falls back to non-streaming if the endpoint reports streaming 152 is unsupported. 153 154 Args: 155 system_prompt: The system-level instruction text. 156 user_prompt: The user-facing prompt with investigation context. 157 max_tokens: Maximum completion tokens. 158 159 Yields: 160 Text chunk strings as they are generated. 161 162 Raises: 163 AIProviderError: On empty response or API failure. 164 """ 165 def _stream() -> Iterator[str]: 166 try: 167 prompt_for_completion = self._build_chat_completion_prompt( 168 user_prompt=user_prompt, 169 attachments=None, 170 ) 171 try: 172 stream = _run_with_rate_limit_retries( 173 request_fn=lambda: self.client.chat.completions.create( 174 model=self.model, 175 max_tokens=max_tokens, 176 messages=[ 177 {"role": "system", "content": system_prompt}, 178 {"role": "user", "content": prompt_for_completion}, 179 ], 180 stream=True, 181 ), 182 rate_limit_error_type=self._openai.RateLimitError, 183 provider_name="Local provider", 184 ) 185 except self._openai.BadRequestError as error: 186 lowered_error = str(error).lower() 187 if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error): 188 fallback_text = self._request_non_stream( 189 system_prompt=system_prompt, 190 user_prompt=user_prompt, 191 max_tokens=max_tokens, 192 attachments=None, 193 ) 194 if fallback_text: 195 yield fallback_text 196 return 197 raise 198 199 emitted = False 200 for chunk in stream: 201 choices = getattr(chunk, "choices", None) 202 if not choices: 203 continue 204 choice = choices[0] 205 delta = getattr(choice, "delta", None) 206 if delta is None and isinstance(choice, dict): 207 delta = choice.get("delta") 208 chunk_text = _extract_openai_delta_text( 209 delta, 210 ("content", "reasoning_content", "reasoning", "thinking"), 211 ) 212 if not chunk_text: 213 continue 214 emitted = True 215 yield chunk_text 216 217 if not emitted: 218 raise AIProviderError( 219 "Local AI provider returned an empty streamed response. " 220 "Try a different local model or increase max tokens." 221 ) 222 except AIProviderError: 223 raise 224 except self._openai.APIConnectionError as error: 225 self._raise_connection_error(error) 226 except self._openai.AuthenticationError as error: 227 raise AIProviderError( 228 "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one." 229 ) from error 230 except self._openai.BadRequestError as error: 231 if _is_context_length_error(error): 232 raise AIProviderError( 233 "Local model request exceeded the context length. Reduce prompt size and retry." 234 ) from error 235 raise AIProviderError(f"Local provider request was rejected: {error}") from error 236 except self._openai.APIError as error: 237 self._raise_api_error(error) 238 except Exception as error: 239 raise AIProviderError(f"Unexpected local provider error: {error}") from error 240 241 return _stream() 242 243 def analyze_with_attachments( 244 self, 245 system_prompt: str, 246 user_prompt: str, 247 attachments: list[Mapping[str, str]] | None, 248 max_tokens: int = DEFAULT_MAX_TOKENS, 249 ) -> str: 250 """Analyze with optional CSV file attachments. 251 252 Args: 253 system_prompt: The system-level instruction text. 254 user_prompt: The user-facing prompt with investigation context. 255 attachments: Optional list of attachment descriptors. 256 max_tokens: Maximum completion tokens. 257 258 Returns: 259 The generated analysis text. 260 261 Raises: 262 AIProviderError: On any API or network failure. 263 """ 264 def _request() -> str: 265 return self._request_non_stream( 266 system_prompt=system_prompt, 267 user_prompt=user_prompt, 268 max_tokens=max_tokens, 269 attachments=attachments, 270 ) 271 272 return self._run_local_request(_request) 273 274 def _run_local_request(self, request_fn: Callable[[], _T]) -> _T: 275 """Execute a local request with rate-limit retries and error mapping. 276 277 Args: 278 request_fn: A zero-argument callable that performs the request. 279 280 Returns: 281 The return value of ``request_fn`` on success. 282 283 Raises: 284 AIProviderError: On any OpenAI SDK error (with local messages). 285 """ 286 try: 287 return _run_with_rate_limit_retries( 288 request_fn=request_fn, 289 rate_limit_error_type=self._openai.RateLimitError, 290 provider_name="Local provider", 291 ) 292 except AIProviderError: 293 raise 294 except self._openai.APIConnectionError as error: 295 self._raise_connection_error(error) 296 except self._openai.AuthenticationError as error: 297 raise AIProviderError( 298 "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one." 299 ) from error 300 except self._openai.BadRequestError as error: 301 if _is_context_length_error(error): 302 raise AIProviderError( 303 "Local model request exceeded the context length. Reduce prompt size and retry." 304 ) from error 305 raise AIProviderError(f"Local provider request was rejected: {error}") from error 306 except self._openai.APIError as error: 307 self._raise_api_error(error) 308 except Exception as error: 309 raise AIProviderError(f"Unexpected local provider error: {error}") from error 310 311 def _raise_connection_error(self, error: Exception) -> None: 312 """Map APIConnectionError to AIProviderError with timeout detection. 313 314 Args: 315 error: The connection error to map. 316 317 Raises: 318 AIProviderError: Always raised with appropriate message. 319 """ 320 if ( 321 self._api_timeout_error_type is not None 322 and isinstance(error, self._api_timeout_error_type) 323 ) or "timeout" in str(error).lower(): 324 raise AIProviderError( 325 "Local AI request timed out after " 326 f"{self.request_timeout_seconds:g} seconds. " 327 "Increase `ai.local.request_timeout_seconds` for long-running prompts." 328 ) from error 329 raise AIProviderError( 330 "Unable to connect to local AI endpoint. Check `ai.local.base_url` and ensure the server is running." 331 ) from error 332 333 def _raise_api_error(self, error: Exception) -> None: 334 """Map APIError to AIProviderError with 404 detection. 335 336 Args: 337 error: The API error to map. 338 339 Raises: 340 AIProviderError: Always raised with appropriate message. 341 """ 342 error_text = str(error).lower() 343 if "404" in error_text or "not found" in error_text: 344 raise AIProviderError( 345 "Local AI endpoint returned 404 (not found). " 346 "This is often caused by a base URL missing `/v1`. " 347 f"Current base URL: {self.base_url}" 348 ) from error 349 raise AIProviderError(f"Local provider API error: {error}") from error 350 351 def analyze_with_progress( 352 self, 353 system_prompt: str, 354 user_prompt: str, 355 progress_callback: Callable[[dict[str, str]], None] | None, 356 attachments: list[Mapping[str, str]] | None = None, 357 max_tokens: int = DEFAULT_MAX_TOKENS, 358 ) -> str: 359 """Analyze with streamed progress updates when supported. 360 361 Streams the response and periodically invokes ``progress_callback`` 362 with accumulated thinking and answer text. Falls back to 363 ``analyze_with_attachments`` when no callback is provided. 364 365 Args: 366 system_prompt: The system-level instruction text. 367 user_prompt: The user-facing prompt with investigation context. 368 progress_callback: Optional callable receiving progress dicts. 369 attachments: Optional list of attachment descriptors. 370 max_tokens: Maximum completion tokens. 371 372 Returns: 373 The generated analysis text with reasoning blocks removed. 374 375 Raises: 376 AIProviderError: On empty response or API failure. 377 """ 378 if progress_callback is None: 379 return self.analyze_with_attachments( 380 system_prompt=system_prompt, 381 user_prompt=user_prompt, 382 attachments=attachments, 383 max_tokens=max_tokens, 384 ) 385 386 def _request() -> str: 387 result = self._build_stream_or_result( 388 system_prompt=system_prompt, 389 user_prompt=user_prompt, 390 max_tokens=max_tokens, 391 attachments=attachments, 392 ) 393 if isinstance(result, str): 394 return result 395 stream = result 396 397 thinking_parts: list[str] = [] 398 answer_parts: list[str] = [] 399 last_emit_at = 0.0 400 last_sent_thinking = "" 401 last_sent_answer = "" 402 403 for chunk in stream: 404 chunk_result = self._process_stream_chunk(chunk) 405 if chunk_result is None: 406 continue 407 408 thinking_delta, answer_delta = chunk_result 409 if thinking_delta: 410 thinking_parts.append(thinking_delta) 411 if answer_delta: 412 answer_parts.append(answer_delta) 413 414 current_thinking = "".join(thinking_parts).strip() 415 current_answer = _clean_streamed_answer_text( 416 answer_text="".join(answer_parts), 417 thinking_text=current_thinking, 418 ) 419 420 last_emit_at, last_sent_thinking, last_sent_answer = ( 421 self._emit_progress_if_needed( 422 progress_callback=progress_callback, 423 current_thinking=current_thinking, 424 current_answer=current_answer, 425 last_emit_at=last_emit_at, 426 last_sent_thinking=last_sent_thinking, 427 last_sent_answer=last_sent_answer, 428 ) 429 ) 430 431 return self._finalize_stream_response(thinking_parts, answer_parts) 432 433 return self._run_local_request(_request) 434 435 def _build_stream_or_result( 436 self, 437 system_prompt: str, 438 user_prompt: str, 439 max_tokens: int, 440 attachments: list[Mapping[str, str]] | None, 441 ) -> Any | str: 442 """Set up the streaming request, returning a stream or a final string. 443 444 Attempts CSV file attachment first. If that succeeds, returns the 445 completed text directly. Otherwise creates a streaming chat completion. 446 Falls back to non-streaming if the endpoint rejects streaming. 447 448 Args: 449 system_prompt: The system-level instruction text. 450 user_prompt: The user-facing prompt text. 451 max_tokens: Maximum completion tokens. 452 attachments: Optional list of attachment descriptors. 453 454 Returns: 455 A streaming response object, or a ``str`` if the result was 456 obtained without streaming (attachment path or fallback). 457 458 Raises: 459 AIProviderError: If the non-streaming fallback also fails. 460 """ 461 attachment_response = self._request_with_csv_attachments( 462 system_prompt=system_prompt, 463 user_prompt=user_prompt, 464 max_tokens=max_tokens, 465 attachments=attachments, 466 ) 467 if attachment_response: 468 cleaned = _strip_leading_reasoning_blocks(attachment_response) 469 return cleaned or attachment_response.strip() 470 471 prompt_for_completion = self._build_chat_completion_prompt( 472 user_prompt=user_prompt, 473 attachments=attachments, 474 ) 475 476 try: 477 return self.client.chat.completions.create( 478 model=self.model, 479 max_tokens=max_tokens, 480 messages=[ 481 {"role": "system", "content": system_prompt}, 482 {"role": "user", "content": prompt_for_completion}, 483 ], 484 stream=True, 485 ) 486 except self._openai.BadRequestError as error: 487 lowered_error = str(error).lower() 488 if "stream" in lowered_error and ( 489 "unsupported" in lowered_error or "not support" in lowered_error 490 ): 491 return self._request_non_stream( 492 system_prompt=system_prompt, 493 user_prompt=user_prompt, 494 max_tokens=max_tokens, 495 attachments=attachments, 496 ) 497 raise 498 499 @staticmethod 500 def _process_stream_chunk(chunk: Any) -> tuple[str, str] | None: 501 """Extract thinking and answer deltas from a single stream chunk. 502 503 Args: 504 chunk: A streaming response chunk from the OpenAI SDK. 505 506 Returns: 507 A ``(thinking_delta, answer_delta)`` tuple, or ``None`` if the 508 chunk contains no usable text. 509 """ 510 choices = getattr(chunk, "choices", None) 511 if not choices: 512 return None 513 choice = choices[0] 514 delta = getattr(choice, "delta", None) 515 if delta is None and isinstance(choice, dict): 516 delta = choice.get("delta") 517 if delta is None: 518 return None 519 520 answer_delta = _extract_openai_delta_text(delta, ("content",)) 521 thinking_delta = _extract_openai_delta_text( 522 delta, 523 ("reasoning_content", "reasoning", "thinking"), 524 ) 525 526 if not answer_delta and not thinking_delta: 527 return None 528 return (thinking_delta, answer_delta) 529 530 @staticmethod 531 def _emit_progress_if_needed( 532 progress_callback: Callable[[dict[str, str]], None], 533 current_thinking: str, 534 current_answer: str, 535 last_emit_at: float, 536 last_sent_thinking: str, 537 last_sent_answer: str, 538 ) -> tuple[float, str, str]: 539 """Send a progress callback if enough content has changed. 540 541 Applies rate-limiting so the callback fires at most every 0.35 s 542 unless at least 80 characters have been added to either channel. 543 544 Args: 545 progress_callback: The callable to invoke with progress data. 546 current_thinking: Accumulated thinking text so far. 547 current_answer: Accumulated answer text so far. 548 last_emit_at: Monotonic timestamp of the last emission. 549 last_sent_thinking: Thinking text sent in the last emission. 550 last_sent_answer: Answer text sent in the last emission. 551 552 Returns: 553 Updated ``(last_emit_at, last_sent_thinking, last_sent_answer)``. 554 """ 555 if not current_thinking and not current_answer: 556 return last_emit_at, last_sent_thinking, last_sent_answer 557 558 changed = ( 559 current_thinking != last_sent_thinking 560 or current_answer != last_sent_answer 561 ) 562 if not changed: 563 return last_emit_at, last_sent_thinking, last_sent_answer 564 565 now = time.monotonic() 566 if now - last_emit_at < 0.35 and ( 567 len(current_thinking) - len(last_sent_thinking) < 80 568 and len(current_answer) - len(last_sent_answer) < 80 569 ): 570 return last_emit_at, last_sent_thinking, last_sent_answer 571 572 try: 573 progress_callback( 574 { 575 "status": "thinking", 576 "thinking_text": current_thinking, 577 "partial_text": current_answer, 578 } 579 ) 580 except Exception: 581 pass 582 583 return now, current_thinking, current_answer 584 585 @staticmethod 586 def _finalize_stream_response( 587 thinking_parts: list[str], 588 answer_parts: list[str], 589 ) -> str: 590 """Assemble the final response text from accumulated stream parts. 591 592 Args: 593 thinking_parts: Collected thinking-channel text fragments. 594 answer_parts: Collected answer-channel text fragments. 595 596 Returns: 597 The cleaned final answer, or the thinking text if no answer 598 was produced. 599 600 Raises: 601 AIProviderError: If both channels are empty. 602 """ 603 final_thinking = "".join(thinking_parts).strip() 604 final_answer = _clean_streamed_answer_text( 605 answer_text="".join(answer_parts), 606 thinking_text=final_thinking, 607 ) 608 if final_answer: 609 return final_answer 610 if final_thinking: 611 return final_thinking 612 raise AIProviderError( 613 "Local AI provider returned an empty streamed response. " 614 "Try a different local model or increase max tokens." 615 ) 616 617 def _request_non_stream( 618 self, 619 system_prompt: str, 620 user_prompt: str, 621 max_tokens: int, 622 attachments: list[Mapping[str, str]] | None = None, 623 ) -> str: 624 """Perform a non-streaming local request with attachment handling. 625 626 Args: 627 system_prompt: The system-level instruction text. 628 user_prompt: The user-facing prompt text. 629 max_tokens: Maximum completion tokens. 630 attachments: Optional list of attachment descriptors. 631 632 Returns: 633 The generated analysis text with reasoning blocks removed. 634 635 Raises: 636 AIProviderError: If the response is empty. 637 """ 638 attachment_response = self._request_with_csv_attachments( 639 system_prompt=system_prompt, 640 user_prompt=user_prompt, 641 max_tokens=max_tokens, 642 attachments=attachments, 643 ) 644 if attachment_response: 645 cleaned_attachment_response = _strip_leading_reasoning_blocks(attachment_response) 646 if cleaned_attachment_response: 647 return cleaned_attachment_response 648 return attachment_response.strip() 649 650 prompt_for_completion = self._build_chat_completion_prompt( 651 user_prompt=user_prompt, 652 attachments=attachments, 653 ) 654 655 response = self.client.chat.completions.create( 656 model=self.model, 657 max_tokens=max_tokens, 658 messages=[ 659 {"role": "system", "content": system_prompt}, 660 {"role": "user", "content": prompt_for_completion}, 661 ], 662 ) 663 text = _extract_openai_text(response) 664 if text: 665 cleaned_text = _strip_leading_reasoning_blocks(text) 666 if cleaned_text: 667 return cleaned_text 668 return text.strip() 669 670 finish_reason = None 671 choices = getattr(response, "choices", None) 672 if choices: 673 first_choice = choices[0] 674 finish_reason = getattr(first_choice, "finish_reason", None) 675 if finish_reason is None and isinstance(first_choice, dict): 676 finish_reason = first_choice.get("finish_reason") 677 reason_detail = f" (finish_reason={finish_reason})" if finish_reason else "" 678 raise AIProviderError( 679 "Local AI provider returned an empty response" 680 f"{reason_detail}. This can happen with reasoning-only outputs or very low token limits." 681 ) 682 683 def _build_chat_completion_prompt( 684 self, 685 user_prompt: str, 686 attachments: list[Mapping[str, str]] | None, 687 ) -> str: 688 """Build the user prompt, inlining attachments if needed. 689 690 Args: 691 user_prompt: The original user-facing prompt text. 692 attachments: Optional list of attachment descriptors. 693 694 Returns: 695 The prompt string, potentially with attachment data appended. 696 """ 697 prompt_for_completion = user_prompt 698 if attachments: 699 prompt_for_completion, inlined_attachment_data = _inline_attachment_data_into_prompt( 700 user_prompt=user_prompt, 701 attachments=attachments, 702 ) 703 if inlined_attachment_data: 704 logger.info("Local attachment fallback inlined attachment data into prompt.") 705 return prompt_for_completion 706 707 def _request_with_csv_attachments( 708 self, 709 system_prompt: str, 710 user_prompt: str, 711 max_tokens: int, 712 attachments: list[Mapping[str, str]] | None, 713 ) -> str | None: 714 """Attempt to send a request with CSV files via the local Responses API. 715 716 Args: 717 system_prompt: The system-level instruction text. 718 user_prompt: The user-facing prompt text. 719 max_tokens: Maximum completion tokens. 720 attachments: Optional list of attachment descriptors. 721 722 Returns: 723 The generated text if succeeded, or ``None`` if skipped. 724 """ 725 normalized_attachments = self._prepare_csv_attachments( 726 attachments, 727 supports_file_attachments=hasattr(self.client, "files") and hasattr(self.client, "responses"), 728 ) 729 if not normalized_attachments: 730 return None 731 732 try: 733 text = upload_and_request_via_responses_api( 734 client=self.client, 735 openai_module=self._openai, 736 model=self.model, 737 normalized_attachments=normalized_attachments, 738 system_prompt=system_prompt, 739 user_prompt=user_prompt, 740 max_tokens=max_tokens, 741 provider_name="Local provider", 742 upload_purpose="assistants", 743 convert_csv_to_txt=False, 744 ) 745 self._csv_attachment_supported = True 746 return text 747 except Exception as error: 748 if _is_attachment_unsupported_error(error): 749 self._csv_attachment_supported = False 750 logger.info( 751 "Local endpoint does not support file attachments via /files + /responses; " 752 "falling back to chat.completions text mode." 753 ) 754 return None 755 raise 756 757 def get_model_info(self) -> dict[str, str]: 758 """Return local provider and model metadata. 759 760 Returns: 761 A dictionary with ``"provider"`` and ``"model"`` keys. 762 """ 763 return {"provider": "local", "model": self.model}
OpenAI-compatible local provider implementation.
Attributes:
- base_url (str): The normalized local endpoint base URL.
- model (str): The local model identifier.
- api_key (str): The API key for the local endpoint.
- attach_csv_as_file (bool): Whether to attempt file-attachment mode.
- request_timeout_seconds (float): HTTP timeout in seconds.
- client: The
openai.OpenAISDK client instance.
60 def __init__( 61 self, 62 base_url: str, 63 model: str, 64 api_key: str = "not-needed", 65 attach_csv_as_file: bool = True, 66 request_timeout_seconds: float = DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 67 ) -> None: 68 """Initialize the local provider. 69 70 Args: 71 base_url: Base URL for the local endpoint. Normalized to 72 include ``/v1`` if missing. 73 model: Model identifier. 74 api_key: API key. Defaults to ``"not-needed"``. 75 attach_csv_as_file: If ``True``, attempt file uploads. 76 request_timeout_seconds: HTTP timeout in seconds. 77 78 Raises: 79 AIProviderError: If the ``openai`` SDK is not installed. 80 """ 81 try: 82 import openai 83 except ImportError as error: 84 raise AIProviderError( 85 "openai SDK is not installed. Install it with `pip install openai`." 86 ) from error 87 88 normalized_api_key = _normalize_api_key_value(api_key) or "not-needed" 89 90 self._openai = openai 91 self.base_url = _normalize_openai_compatible_base_url( 92 base_url=base_url, 93 default_base_url=DEFAULT_LOCAL_BASE_URL, 94 ) 95 self.model = model 96 self.api_key = normalized_api_key 97 self.attach_csv_as_file = bool(attach_csv_as_file) 98 self.request_timeout_seconds = _resolve_timeout_seconds( 99 request_timeout_seconds, 100 DEFAULT_LOCAL_REQUEST_TIMEOUT_SECONDS, 101 ) 102 self._api_timeout_error_type = getattr(openai, "APITimeoutError", None) 103 self._csv_attachment_supported: bool | None = None 104 self.client = openai.OpenAI( 105 api_key=normalized_api_key, 106 base_url=self.base_url, 107 timeout=self.request_timeout_seconds, 108 max_retries=0, 109 ) 110 logger.info( 111 "Initialized local provider at %s with model %s (timeout %.1fs)", 112 self.base_url, 113 model, 114 self.request_timeout_seconds, 115 )
Initialize the local provider.
Arguments:
- base_url: Base URL for the local endpoint. Normalized to
include
/v1if missing. - model: Model identifier.
- api_key: API key. Defaults to
"not-needed". - attach_csv_as_file: If
True, attempt file uploads. - request_timeout_seconds: HTTP timeout in seconds.
Raises:
- AIProviderError: If the
openaiSDK is not installed.
117 def analyze( 118 self, 119 system_prompt: str, 120 user_prompt: str, 121 max_tokens: int = DEFAULT_MAX_TOKENS, 122 ) -> str: 123 """Send a prompt to the local endpoint and return the generated text. 124 125 Args: 126 system_prompt: The system-level instruction text. 127 user_prompt: The user-facing prompt with investigation context. 128 max_tokens: Maximum completion tokens. 129 130 Returns: 131 The generated analysis text. 132 133 Raises: 134 AIProviderError: On any API or network failure. 135 """ 136 return self.analyze_with_attachments( 137 system_prompt=system_prompt, 138 user_prompt=user_prompt, 139 attachments=None, 140 max_tokens=max_tokens, 141 )
Send a prompt to the local endpoint and return the generated text.
Arguments:
- system_prompt: The system-level instruction text.
- user_prompt: The user-facing prompt with investigation context.
- max_tokens: Maximum completion tokens.
Returns:
The generated analysis text.
Raises:
- AIProviderError: On any API or network failure.
143 def analyze_stream( 144 self, 145 system_prompt: str, 146 user_prompt: str, 147 max_tokens: int = DEFAULT_MAX_TOKENS, 148 ) -> Iterator[str]: 149 """Stream generated text chunks from the local endpoint. 150 151 Falls back to non-streaming if the endpoint reports streaming 152 is unsupported. 153 154 Args: 155 system_prompt: The system-level instruction text. 156 user_prompt: The user-facing prompt with investigation context. 157 max_tokens: Maximum completion tokens. 158 159 Yields: 160 Text chunk strings as they are generated. 161 162 Raises: 163 AIProviderError: On empty response or API failure. 164 """ 165 def _stream() -> Iterator[str]: 166 try: 167 prompt_for_completion = self._build_chat_completion_prompt( 168 user_prompt=user_prompt, 169 attachments=None, 170 ) 171 try: 172 stream = _run_with_rate_limit_retries( 173 request_fn=lambda: self.client.chat.completions.create( 174 model=self.model, 175 max_tokens=max_tokens, 176 messages=[ 177 {"role": "system", "content": system_prompt}, 178 {"role": "user", "content": prompt_for_completion}, 179 ], 180 stream=True, 181 ), 182 rate_limit_error_type=self._openai.RateLimitError, 183 provider_name="Local provider", 184 ) 185 except self._openai.BadRequestError as error: 186 lowered_error = str(error).lower() 187 if "stream" in lowered_error and ("unsupported" in lowered_error or "not support" in lowered_error): 188 fallback_text = self._request_non_stream( 189 system_prompt=system_prompt, 190 user_prompt=user_prompt, 191 max_tokens=max_tokens, 192 attachments=None, 193 ) 194 if fallback_text: 195 yield fallback_text 196 return 197 raise 198 199 emitted = False 200 for chunk in stream: 201 choices = getattr(chunk, "choices", None) 202 if not choices: 203 continue 204 choice = choices[0] 205 delta = getattr(choice, "delta", None) 206 if delta is None and isinstance(choice, dict): 207 delta = choice.get("delta") 208 chunk_text = _extract_openai_delta_text( 209 delta, 210 ("content", "reasoning_content", "reasoning", "thinking"), 211 ) 212 if not chunk_text: 213 continue 214 emitted = True 215 yield chunk_text 216 217 if not emitted: 218 raise AIProviderError( 219 "Local AI provider returned an empty streamed response. " 220 "Try a different local model or increase max tokens." 221 ) 222 except AIProviderError: 223 raise 224 except self._openai.APIConnectionError as error: 225 self._raise_connection_error(error) 226 except self._openai.AuthenticationError as error: 227 raise AIProviderError( 228 "Local AI endpoint rejected authentication. Check `ai.local.api_key` if your server requires one." 229 ) from error 230 except self._openai.BadRequestError as error: 231 if _is_context_length_error(error): 232 raise AIProviderError( 233 "Local model request exceeded the context length. Reduce prompt size and retry." 234 ) from error 235 raise AIProviderError(f"Local provider request was rejected: {error}") from error 236 except self._openai.APIError as error: 237 self._raise_api_error(error) 238 except Exception as error: 239 raise AIProviderError(f"Unexpected local provider error: {error}") from error 240 241 return _stream()
Stream generated text chunks from the local endpoint.
Falls back to non-streaming if the endpoint reports streaming is unsupported.
Arguments:
- system_prompt: The system-level instruction text.
- user_prompt: The user-facing prompt with investigation context.
- max_tokens: Maximum completion tokens.
Yields:
Text chunk strings as they are generated.
Raises:
- AIProviderError: On empty response or API failure.
243 def analyze_with_attachments( 244 self, 245 system_prompt: str, 246 user_prompt: str, 247 attachments: list[Mapping[str, str]] | None, 248 max_tokens: int = DEFAULT_MAX_TOKENS, 249 ) -> str: 250 """Analyze with optional CSV file attachments. 251 252 Args: 253 system_prompt: The system-level instruction text. 254 user_prompt: The user-facing prompt with investigation context. 255 attachments: Optional list of attachment descriptors. 256 max_tokens: Maximum completion tokens. 257 258 Returns: 259 The generated analysis text. 260 261 Raises: 262 AIProviderError: On any API or network failure. 263 """ 264 def _request() -> str: 265 return self._request_non_stream( 266 system_prompt=system_prompt, 267 user_prompt=user_prompt, 268 max_tokens=max_tokens, 269 attachments=attachments, 270 ) 271 272 return self._run_local_request(_request)
Analyze with optional CSV file attachments.
Arguments:
- system_prompt: The system-level instruction text.
- user_prompt: The user-facing prompt with investigation context.
- attachments: Optional list of attachment descriptors.
- max_tokens: Maximum completion tokens.
Returns:
The generated analysis text.
Raises:
- AIProviderError: On any API or network failure.
351 def analyze_with_progress( 352 self, 353 system_prompt: str, 354 user_prompt: str, 355 progress_callback: Callable[[dict[str, str]], None] | None, 356 attachments: list[Mapping[str, str]] | None = None, 357 max_tokens: int = DEFAULT_MAX_TOKENS, 358 ) -> str: 359 """Analyze with streamed progress updates when supported. 360 361 Streams the response and periodically invokes ``progress_callback`` 362 with accumulated thinking and answer text. Falls back to 363 ``analyze_with_attachments`` when no callback is provided. 364 365 Args: 366 system_prompt: The system-level instruction text. 367 user_prompt: The user-facing prompt with investigation context. 368 progress_callback: Optional callable receiving progress dicts. 369 attachments: Optional list of attachment descriptors. 370 max_tokens: Maximum completion tokens. 371 372 Returns: 373 The generated analysis text with reasoning blocks removed. 374 375 Raises: 376 AIProviderError: On empty response or API failure. 377 """ 378 if progress_callback is None: 379 return self.analyze_with_attachments( 380 system_prompt=system_prompt, 381 user_prompt=user_prompt, 382 attachments=attachments, 383 max_tokens=max_tokens, 384 ) 385 386 def _request() -> str: 387 result = self._build_stream_or_result( 388 system_prompt=system_prompt, 389 user_prompt=user_prompt, 390 max_tokens=max_tokens, 391 attachments=attachments, 392 ) 393 if isinstance(result, str): 394 return result 395 stream = result 396 397 thinking_parts: list[str] = [] 398 answer_parts: list[str] = [] 399 last_emit_at = 0.0 400 last_sent_thinking = "" 401 last_sent_answer = "" 402 403 for chunk in stream: 404 chunk_result = self._process_stream_chunk(chunk) 405 if chunk_result is None: 406 continue 407 408 thinking_delta, answer_delta = chunk_result 409 if thinking_delta: 410 thinking_parts.append(thinking_delta) 411 if answer_delta: 412 answer_parts.append(answer_delta) 413 414 current_thinking = "".join(thinking_parts).strip() 415 current_answer = _clean_streamed_answer_text( 416 answer_text="".join(answer_parts), 417 thinking_text=current_thinking, 418 ) 419 420 last_emit_at, last_sent_thinking, last_sent_answer = ( 421 self._emit_progress_if_needed( 422 progress_callback=progress_callback, 423 current_thinking=current_thinking, 424 current_answer=current_answer, 425 last_emit_at=last_emit_at, 426 last_sent_thinking=last_sent_thinking, 427 last_sent_answer=last_sent_answer, 428 ) 429 ) 430 431 return self._finalize_stream_response(thinking_parts, answer_parts) 432 433 return self._run_local_request(_request)
Analyze with streamed progress updates when supported.
Streams the response and periodically invokes progress_callback
with accumulated thinking and answer text. Falls back to
analyze_with_attachments when no callback is provided.
Arguments:
- system_prompt: The system-level instruction text.
- user_prompt: The user-facing prompt with investigation context.
- progress_callback: Optional callable receiving progress dicts.
- attachments: Optional list of attachment descriptors.
- max_tokens: Maximum completion tokens.
Returns:
The generated analysis text with reasoning blocks removed.
Raises:
- AIProviderError: On empty response or API failure.
757 def get_model_info(self) -> dict[str, str]: 758 """Return local provider and model metadata. 759 760 Returns: 761 A dictionary with ``"provider"`` and ``"model"`` keys. 762 """ 763 return {"provider": "local", "model": self.model}
Return local provider and model metadata.
Returns:
A dictionary with
"provider"and"model"keys.