app.analyzer.constants

Constants, regex patterns, and prompt templates for the analyzer module.

This module centralises all compile-time constants used by the forensic analysis pipeline so they can be imported by multiple sub-modules without circular dependencies.

Attributes:
  • TOKEN_CHAR_RATIO (int): Approximate characters per token for ASCII text.
  • AI_MAX_TOKENS (int): Default AI context window size in tokens.
  • DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS (int): Token threshold for switching to the small-context prompt template.
  • AI_RETRY_ATTEMPTS (int): Maximum retry attempts for transient AI failures.
  • AI_RETRY_BASE_DELAY (float): Base delay in seconds for retry backoff.
  • MAX_MERGE_ROUNDS (int): Maximum hierarchical merge iterations.
  • ARTIFACT_DEDUPLICATION_ENABLED (bool): Default deduplication toggle.
  • DEDUPLICATED_PARSED_DIRNAME (str): Directory name for deduplicated CSVs.
  • DEDUP_COMMENT_COLUMN (str): Column name for deduplication annotations.
  • CITATION_SPOT_CHECK_LIMIT (int): Max citations to validate per artifact.
  • PROJECT_ROOT (Path): Absolute path to the project root directory.
  • DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH (Path): Default column projection YAML config path.
  1"""Constants, regex patterns, and prompt templates for the analyzer module.
  2
  3This module centralises all compile-time constants used by the forensic
  4analysis pipeline so they can be imported by multiple sub-modules without
  5circular dependencies.
  6
  7Attributes:
  8    TOKEN_CHAR_RATIO (int): Approximate characters per token for ASCII text.
  9    AI_MAX_TOKENS (int): Default AI context window size in tokens.
 10    DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS (int): Token threshold for
 11        switching to the small-context prompt template.
 12    AI_RETRY_ATTEMPTS (int): Maximum retry attempts for transient AI failures.
 13    AI_RETRY_BASE_DELAY (float): Base delay in seconds for retry backoff.
 14    MAX_MERGE_ROUNDS (int): Maximum hierarchical merge iterations.
 15    ARTIFACT_DEDUPLICATION_ENABLED (bool): Default deduplication toggle.
 16    DEDUPLICATED_PARSED_DIRNAME (str): Directory name for deduplicated CSVs.
 17    DEDUP_COMMENT_COLUMN (str): Column name for deduplication annotations.
 18    CITATION_SPOT_CHECK_LIMIT (int): Max citations to validate per artifact.
 19    PROJECT_ROOT (Path): Absolute path to the project root directory.
 20    DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH (Path): Default column projection
 21        YAML config path.
 22"""
 23
 24from __future__ import annotations
 25
 26import re
 27from pathlib import Path
 28
 29from ..ai_providers import AIProviderError
 30
 31__all__ = [
 32    "TOKEN_CHAR_RATIO",
 33    "AI_MAX_TOKENS",
 34    "DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS",
 35    "AI_RETRY_ATTEMPTS",
 36    "AI_RETRY_BASE_DELAY",
 37    "MAX_MERGE_ROUNDS",
 38    "ARTIFACT_DEDUPLICATION_ENABLED",
 39    "DEDUPLICATED_PARSED_DIRNAME",
 40    "DEDUP_COMMENT_COLUMN",
 41    "CITATION_SPOT_CHECK_LIMIT",
 42    "PROJECT_ROOT",
 43    "DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH",
 44    "UnavailableProvider",
 45]
 46
 47# ---------------------------------------------------------------------------
 48# Numeric / string constants
 49# ---------------------------------------------------------------------------
 50
 51TOKEN_CHAR_RATIO = 4
 52AI_MAX_TOKENS = 128000
 53DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS = 64000
 54AI_RETRY_ATTEMPTS = 3
 55AI_RETRY_BASE_DELAY = 1.0
 56MAX_MERGE_ROUNDS = 5
 57ARTIFACT_DEDUPLICATION_ENABLED = True
 58DEDUPLICATED_PARSED_DIRNAME = "parsed_deduplicated"
 59DEDUP_COMMENT_COLUMN = "_dedup_comment"
 60CITATION_SPOT_CHECK_LIMIT = 20
 61PROJECT_ROOT = Path(__file__).resolve().parents[2]
 62DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH = PROJECT_ROOT / "config" / "artifact_ai_columns.yaml"
 63
 64# ---------------------------------------------------------------------------
 65# IOC-extraction regex patterns
 66# ---------------------------------------------------------------------------
 67
 68IOC_URL_RE = re.compile(r"\bhttps?://[^\s\"'<>]+", flags=re.IGNORECASE)
 69IOC_IPV4_RE = re.compile(
 70    r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b"
 71)
 72IOC_HASH_RE = re.compile(r"\b(?:[A-Fa-f0-9]{32}|[A-Fa-f0-9]{40}|[A-Fa-f0-9]{64})\b")
 73IOC_DOMAIN_RE = re.compile(r"\b(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,63}\b")
 74IOC_EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,63}\b")
 75IOC_FILENAME_RE = re.compile(
 76    r"\b[A-Za-z0-9_.-]+\.(?:exe|dll|sys|bat|cmd|ps1|vbs|vbe|msi|msp|scr|cpl|lnk|jar)\b",
 77    flags=re.IGNORECASE,
 78)
 79
 80KNOWN_MALICIOUS_TOOL_KEYWORDS = (
 81    "mimikatz",
 82    "rubeus",
 83    "psexec",
 84    "procdump",
 85    "nanodump",
 86    "comsvcs",
 87    "secretsdump",
 88    "wmiexec",
 89    "atexec",
 90    "cobalt strike",
 91    "beacon",
 92    "bloodhound",
 93    "adfind",
 94    "ligolo",
 95    "metasploit",
 96)
 97
 98DOMAIN_EXCLUDED_SUFFIXES = {".local", ".lan", ".internal"}
 99
100# ---------------------------------------------------------------------------
101# Citation-validation regex patterns
102# ---------------------------------------------------------------------------
103
104CITED_ISO_TIMESTAMP_RE = re.compile(
105    r"\b\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?\b"
106)
107CITED_ROW_REF_RE = re.compile(r"\brow[_ ]?(?:ref(?:erence)?)?[:\s#]*(\d+)\b", re.IGNORECASE)
108CITED_COLUMN_REF_RE = re.compile(
109    r"(?:`([^`]{2,60})`"
110    r"|(?:column|field)\s+[\"']([^\"']{2,60})[\"']"
111    r"|[\"']([^\"']{2,60})[\"']\s+(?:column|field))",
112    re.IGNORECASE,
113)
114
115# ---------------------------------------------------------------------------
116# CSV / prompt section regex patterns
117# ---------------------------------------------------------------------------
118
119CSV_DATA_SECTION_RE = re.compile(
120    r"#{2,3}\s+Full\s+Data\s+\(CSV\)\s*\n(?:```\s*\n)?",
121    flags=re.IGNORECASE,
122)
123CSV_TRAILING_FENCE_RE = re.compile(r"\n```\s*$")
124WINDOWS_PATH_RE = re.compile(r"[A-Za-z]:\\[^\"'\s,;)]*")
125INTEGER_RE = re.compile(r"-?\d+")
126
127# ---------------------------------------------------------------------------
128# Lookup tables
129# ---------------------------------------------------------------------------
130
131TIMESTAMP_COLUMN_HINTS = (
132    "ts", "timestamp", "time", "date", "created",
133    "modified", "last", "first", "written",
134)
135
136DEDUP_SAFE_IDENTIFIER_HINTS = {
137    "recordid", "record_id", "entryid", "entry_id",
138    "index", "row_id", "rowid", "sequence_number", "sequencenumber",
139}
140
141METADATA_COLUMNS = {
142    "_source", "_classification", "_generated", "_version",
143    "source", "row_ref", "_row_ref",
144}
145
146LOW_SIGNAL_VALUES = {"", "none", "null", "n/a", "na", "unknown", "-"}
147
148# ---------------------------------------------------------------------------
149# Default prompt templates
150# ---------------------------------------------------------------------------
151
152DEFAULT_SYSTEM_PROMPT = (
153    "You are a digital forensic analyst. "
154    "Analyze ONLY the data provided to you. "
155    "Do not fabricate evidence. "
156    "Prioritize incident-relevant findings and response actions; use baseline only as supporting context."
157)
158
159DEFAULT_ARTIFACT_PROMPT_TEMPLATE = (
160    "## Priority Directives\n{{priority_directives}}\n\n"
161    "## Investigation Context\n{{investigation_context}}\n\n"
162    "## IOC Targets\n{{ioc_targets}}\n\n"
163    "## Artifact\n- Key: {{artifact_key}}\n- Name: {{artifact_name}}\n- Description: {{artifact_description}}\n\n"
164    "## Dataset Scope\n- Total records: {{total_records}}\n"
165    "- Time range start: {{time_range_start}}\n- Time range end: {{time_range_end}}\n\n"
166    "## Statistics\n{{statistics}}\n\n"
167    "## Incident Focus\n"
168    "- Prioritize suspicious activity that advances detection, scoping, containment, or remediation.\n"
169    "- Use baseline and statistics only as supporting context for behavior shifts.\n\n"
170    "## Analysis Instructions\n{{analysis_instructions}}\n\n"
171    "## Full Data (CSV)\n{{data_csv}}\n"
172)
173
174DEFAULT_ARTIFACT_PROMPT_TEMPLATE_SMALL_CONTEXT = (
175    "## Priority Directives\n{{priority_directives}}\n\n"
176    "## Investigation Context\n{{investigation_context}}\n\n"
177    "## IOC Targets\n{{ioc_targets}}\n\n"
178    "## Artifact\n- Key: {{artifact_key}}\n- Name: {{artifact_name}}\n- Description: {{artifact_description}}\n\n"
179    "## Dataset Scope\n- Total records: {{total_records}}\n"
180    "- Time range start: {{time_range_start}}\n- Time range end: {{time_range_end}}\n\n"
181    "## Incident Focus\n"
182    "- Prioritize suspicious activity that advances detection, scoping, containment, or remediation.\n"
183    "- Use baseline references only as supporting context for behavior shifts.\n\n"
184    "## Analysis Instructions\n{{analysis_instructions}}\n\n"
185    "## Full Data (CSV)\n{{data_csv}}\n"
186)
187
188DEFAULT_SUMMARY_PROMPT_TEMPLATE = (
189    "## Priority Directives\n{{priority_directives}}\n\n"
190    "## Investigation Context\n{{investigation_context}}\n\n"
191    "## IOC Targets\n{{ioc_targets}}\n\n"
192    "## Host Context\n- Hostname: {{hostname}}\n- OS Version: {{os_version}}\n- Domain: {{domain}}\n\n"
193    "## Per-Artifact Findings\n{{per_artifact_findings}}\n\n"
194    "## Incident Focus\n"
195    "- Correlate findings to identify likely intrusion activity, scope, and priority response actions.\n"
196    "- Use baseline references only when they strengthen incident conclusions.\n"
197)
198
199DEFAULT_CHUNK_MERGE_PROMPT_TEMPLATE = (
200    "You analyzed the same artifact dataset in {{chunk_count}} separate chunks "
201    "because the data was too large for a single pass.\n"
202    "Below are your findings from each chunk. Merge them into one final analysis.\n\n"
203    "## Investigation Context\n{{investigation_context}}\n\n"
204    "## Artifact: {{artifact_name}} ({{artifact_key}})\n\n"
205    "## Per-Chunk Findings\n{{per_chunk_findings}}\n\n"
206    "## Task\n"
207    "Merge the above chunk analyses into one coherent analysis. "
208    "Deduplicate repeated findings, reconcile contradictions, "
209    "and re-rank by severity then confidence. "
210    "Use the same output format as a single-pass artifact analysis:\n"
211    "- **Findings** (severity/confidence, evidence, alternative explanation, verify)\n"
212    "- **IOC Status** (if applicable)\n"
213    "- **Data Gaps**\n"
214)
215
216
217# ---------------------------------------------------------------------------
218# Fallback AI provider
219# ---------------------------------------------------------------------------
220
221class UnavailableProvider:
222    """Fallback AI provider used when the configured provider fails to initialize.
223
224    This sentinel object implements the same interface as a real AI provider
225    but raises ``AIProviderError`` on every ``analyze`` call, ensuring that
226    the analyzer reports a clear error instead of crashing with an
227    ``AttributeError``.
228
229    Attributes:
230        _error_message: Human-readable description of why the real provider
231            could not be created.
232    """
233
234    def __init__(self, error_message: str) -> None:
235        """Initialize the unavailable provider with an error message.
236
237        Args:
238            error_message: Description of the initialization failure to
239                surface when ``analyze`` is called.
240        """
241        self._error_message = error_message or "AI provider is unavailable."
242
243    def analyze(self, system_prompt: str, user_prompt: str, max_tokens: int = AI_MAX_TOKENS) -> str:
244        """Always raises ``AIProviderError`` with the stored error message.
245
246        Args:
247            system_prompt: The system prompt (unused).
248            user_prompt: The user prompt (unused).
249            max_tokens: Maximum response tokens (unused).
250
251        Raises:
252            AIProviderError: Always raised with the initialization error.
253        """
254        raise AIProviderError(self._error_message)
255
256    def get_model_info(self) -> dict[str, str]:
257        """Return placeholder model info indicating unavailability.
258
259        Returns:
260            A dict with ``provider`` and ``model`` both set to
261            ``"unavailable"``.
262        """
263        return {"provider": "unavailable", "model": "unavailable"}
TOKEN_CHAR_RATIO = 4
AI_MAX_TOKENS = 128000
DEFAULT_SHORTENED_PROMPT_CUTOFF_TOKENS = 64000
AI_RETRY_ATTEMPTS = 3
AI_RETRY_BASE_DELAY = 1.0
MAX_MERGE_ROUNDS = 5
ARTIFACT_DEDUPLICATION_ENABLED = True
DEDUPLICATED_PARSED_DIRNAME = 'parsed_deduplicated'
DEDUP_COMMENT_COLUMN = '_dedup_comment'
CITATION_SPOT_CHECK_LIMIT = 20
PROJECT_ROOT = PosixPath('/home/runner/work/AIFT/AIFT')
DEFAULT_ARTIFACT_AI_COLUMNS_CONFIG_PATH = PosixPath('/home/runner/work/AIFT/AIFT/config/artifact_ai_columns.yaml')
class UnavailableProvider:
222class UnavailableProvider:
223    """Fallback AI provider used when the configured provider fails to initialize.
224
225    This sentinel object implements the same interface as a real AI provider
226    but raises ``AIProviderError`` on every ``analyze`` call, ensuring that
227    the analyzer reports a clear error instead of crashing with an
228    ``AttributeError``.
229
230    Attributes:
231        _error_message: Human-readable description of why the real provider
232            could not be created.
233    """
234
235    def __init__(self, error_message: str) -> None:
236        """Initialize the unavailable provider with an error message.
237
238        Args:
239            error_message: Description of the initialization failure to
240                surface when ``analyze`` is called.
241        """
242        self._error_message = error_message or "AI provider is unavailable."
243
244    def analyze(self, system_prompt: str, user_prompt: str, max_tokens: int = AI_MAX_TOKENS) -> str:
245        """Always raises ``AIProviderError`` with the stored error message.
246
247        Args:
248            system_prompt: The system prompt (unused).
249            user_prompt: The user prompt (unused).
250            max_tokens: Maximum response tokens (unused).
251
252        Raises:
253            AIProviderError: Always raised with the initialization error.
254        """
255        raise AIProviderError(self._error_message)
256
257    def get_model_info(self) -> dict[str, str]:
258        """Return placeholder model info indicating unavailability.
259
260        Returns:
261            A dict with ``provider`` and ``model`` both set to
262            ``"unavailable"``.
263        """
264        return {"provider": "unavailable", "model": "unavailable"}

Fallback AI provider used when the configured provider fails to initialize.

This sentinel object implements the same interface as a real AI provider but raises AIProviderError on every analyze call, ensuring that the analyzer reports a clear error instead of crashing with an AttributeError.

Attributes:
  • _error_message: Human-readable description of why the real provider could not be created.
UnavailableProvider(error_message: str)
235    def __init__(self, error_message: str) -> None:
236        """Initialize the unavailable provider with an error message.
237
238        Args:
239            error_message: Description of the initialization failure to
240                surface when ``analyze`` is called.
241        """
242        self._error_message = error_message or "AI provider is unavailable."

Initialize the unavailable provider with an error message.

Arguments:
  • error_message: Description of the initialization failure to surface when analyze is called.
def analyze( self, system_prompt: str, user_prompt: str, max_tokens: int = 128000) -> str:
244    def analyze(self, system_prompt: str, user_prompt: str, max_tokens: int = AI_MAX_TOKENS) -> str:
245        """Always raises ``AIProviderError`` with the stored error message.
246
247        Args:
248            system_prompt: The system prompt (unused).
249            user_prompt: The user prompt (unused).
250            max_tokens: Maximum response tokens (unused).
251
252        Raises:
253            AIProviderError: Always raised with the initialization error.
254        """
255        raise AIProviderError(self._error_message)

Always raises AIProviderError with the stored error message.

Arguments:
  • system_prompt: The system prompt (unused).
  • user_prompt: The user prompt (unused).
  • max_tokens: Maximum response tokens (unused).
Raises:
  • AIProviderError: Always raised with the initialization error.
def get_model_info(self) -> dict[str, str]:
257    def get_model_info(self) -> dict[str, str]:
258        """Return placeholder model info indicating unavailability.
259
260        Returns:
261            A dict with ``provider`` and ``model`` both set to
262            ``"unavailable"``.
263        """
264        return {"provider": "unavailable", "model": "unavailable"}

Return placeholder model info indicating unavailability.

Returns:

A dict with provider and model both set to "unavailable".