app.analyzer.prompts
Prompt template loading and construction for forensic analysis.
Provides functions for loading prompt templates from disk, loading per-artifact instruction prompts, resolving AI column projection configurations, and building the cross-artifact summary prompt.
These were extracted from ForensicAnalyzer to keep the core
orchestration class focused on pipeline coordination.
Attributes:
- LOGGER: Module-level logger instance.
1"""Prompt template loading and construction for forensic analysis. 2 3Provides functions for loading prompt templates from disk, loading 4per-artifact instruction prompts, resolving AI column projection 5configurations, and building the cross-artifact summary prompt. 6 7These were extracted from ``ForensicAnalyzer`` to keep the core 8orchestration class focused on pipeline coordination. 9 10Attributes: 11 LOGGER: Module-level logger instance. 12""" 13 14from __future__ import annotations 15 16import logging 17from pathlib import Path 18from typing import Any, Mapping 19 20import yaml 21 22from .constants import PROJECT_ROOT 23from .ioc import build_priority_directives, format_ioc_targets 24from .utils import coerce_projection_columns, normalize_artifact_key, normalize_os_type 25 26LOGGER = logging.getLogger(__name__) 27 28__all__ = [ 29 "build_summary_prompt", 30 "load_artifact_ai_column_projections", 31 "load_artifact_instruction_prompts", 32 "load_prompt_template", 33 "resolve_artifact_ai_columns_config_path", 34] 35 36 37def load_prompt_template(prompts_dir: Path, filename: str, default: str) -> str: 38 """Read a prompt template file from the prompts directory. 39 40 Args: 41 prompts_dir: Directory containing prompt template files. 42 filename: Name of the template file. 43 default: Fallback template string if the file cannot be read. 44 45 Returns: 46 The template text, or *default* if reading fails. 47 """ 48 try: 49 prompt_path = prompts_dir / filename 50 return prompt_path.read_text(encoding="utf-8") 51 except OSError: 52 return default 53 54 55def load_artifact_instruction_prompts( 56 prompts_dir: Path, 57 os_type: str = "windows", 58) -> dict[str, str]: 59 """Load per-artifact analysis instruction prompts from disk. 60 61 Selects the OS-specific instruction directory based on *os_type*: 62 63 - ``"windows"`` (default): ``artifact_instructions/`` 64 - ``"linux"``: ``artifact_instructions_linux/`` 65 66 For any other OS value the function falls back to the Windows 67 directory. 68 69 Args: 70 prompts_dir: Directory containing prompt template files. 71 os_type: Operating system identifier (e.g. ``"windows"``, 72 ``"linux"``). Determines which sub-directory to scan. 73 74 Returns: 75 A dict mapping lowercased artifact keys to instruction prompt text. 76 """ 77 normalized_os = normalize_os_type(os_type) 78 if normalized_os == "linux": 79 instructions_dir = prompts_dir / "artifact_instructions_linux" 80 else: 81 instructions_dir = prompts_dir / "artifact_instructions" 82 83 if not instructions_dir.exists() or not instructions_dir.is_dir(): 84 return {} 85 86 prompts: dict[str, str] = {} 87 for prompt_path in sorted(instructions_dir.glob("*.md")): 88 try: 89 prompt_text = prompt_path.read_text(encoding="utf-8").strip() 90 except OSError: 91 continue 92 if not prompt_text: 93 continue 94 prompts[prompt_path.stem.strip().lower()] = prompt_text 95 return prompts 96 97 98def resolve_artifact_ai_columns_config_path( 99 configured_path: str | Path, 100 case_dir: Path | None, 101) -> Path: 102 """Resolve the artifact AI columns config path to an absolute Path. 103 104 Checks for the file at the configured path, then relative to the 105 case directory, and finally relative to the project root. 106 107 Args: 108 configured_path: The configured path (may be relative). 109 case_dir: Path to the current case directory, or ``None``. 110 111 Returns: 112 Resolved absolute ``Path`` to the YAML config file. 113 """ 114 configured = Path(configured_path).expanduser() 115 if configured.is_absolute(): 116 return configured 117 118 candidates: list[Path] = [] 119 if case_dir is not None: 120 candidates.append(case_dir / configured) 121 candidates.append(PROJECT_ROOT / configured) 122 123 for candidate in candidates: 124 if candidate.exists(): 125 return candidate 126 return candidates[-1] 127 128 129def load_artifact_ai_column_projections( 130 config_path: Path, 131 os_type: str = "windows", 132) -> dict[str, tuple[str, ...]]: 133 """Load per-artifact column projection configuration from YAML. 134 135 Handles OS-suffixed keys (e.g. ``services_linux``) by mapping them 136 to the base key only when the active *os_type* matches the suffix. 137 This prevents Linux-specific column definitions from overwriting 138 their Windows counterparts (and vice-versa). 139 140 Args: 141 config_path: Absolute path to the YAML config file. 142 os_type: Active operating system type (``"windows"``, 143 ``"linux"``, etc.). Controls which OS-suffixed keys are 144 accepted. 145 146 Returns: 147 A dict mapping normalized artifact keys to tuples of column names. 148 """ 149 normalized_os = normalize_os_type(os_type) 150 151 try: 152 with config_path.open("r", encoding="utf-8") as handle: 153 parsed = yaml.safe_load(handle) or {} 154 except (OSError, yaml.YAMLError) as error: 155 LOGGER.warning( 156 "Failed to load AI column projection config from %s: %s. " 157 "AI column projection is disabled.", config_path, error, 158 ) 159 return {} 160 161 if not isinstance(parsed, Mapping): 162 LOGGER.warning( 163 "Invalid AI column projection config in %s: expected a mapping, got %s.", 164 config_path, type(parsed).__name__, 165 ) 166 return {} 167 168 source: Any = parsed.get("artifact_ai_columns", parsed) 169 if not isinstance(source, Mapping): 170 LOGGER.warning( 171 "Invalid AI column projection config in %s: 'artifact_ai_columns' must be a mapping, got %s.", 172 config_path, type(source).__name__, 173 ) 174 return {} 175 176 projections: dict[str, tuple[str, ...]] = {} 177 for artifact_key, raw_columns in source.items(): 178 if artifact_key is None: 179 continue 180 raw_key = str(artifact_key) 181 182 # Handle OS-suffixed keys like "services_linux": accept only 183 # when the suffix matches the current OS, and store under the 184 # base key so the correct projection wins. 185 os_suffix = f"_{normalized_os}" 186 if raw_key.endswith(os_suffix): 187 effective_key = raw_key[: -len(os_suffix)] 188 elif raw_key.rsplit("_", 1)[-1] in ("linux", "windows", "esxi"): 189 # OS-suffixed key for a *different* OS — skip it. 190 continue 191 else: 192 effective_key = raw_key 193 194 normalized_key = normalize_artifact_key(effective_key) 195 columns = coerce_projection_columns(raw_columns) 196 if columns: 197 projections[normalized_key] = tuple(columns) 198 return projections 199 200 201def build_summary_prompt( 202 summary_prompt_template: str, 203 investigation_context: str, 204 per_artifact_results: list[Mapping[str, Any]], 205 metadata_map: Mapping[str, Any], 206) -> str: 207 """Build the cross-artifact summary prompt from a template. 208 209 Assembles per-artifact findings into a single prompt using the 210 summary template, filling in investigation context, IOC targets, 211 priority directives, and host metadata placeholders. 212 213 Args: 214 summary_prompt_template: The summary template string with 215 ``{{placeholder}}`` markers. 216 investigation_context: The user's investigation context text. 217 per_artifact_results: List of per-artifact result dicts, each 218 with ``artifact_key``, ``artifact_name``, and ``analysis``. 219 metadata_map: Host metadata mapping with optional ``hostname``, 220 ``os_version``, ``os_type``, and ``domain`` keys. 221 222 Returns: 223 The fully rendered summary prompt string. 224 """ 225 findings_blocks: list[str] = [] 226 for result in per_artifact_results: 227 artifact_key = str(result.get("artifact_key", "unknown")) 228 artifact_name = str(result.get("artifact_name", artifact_key)) 229 analysis = str(result.get("analysis", "")).strip() 230 findings_blocks.append(f"### {artifact_name} ({artifact_key})\n{analysis}") 231 232 findings_text = ( 233 "\n\n".join(findings_blocks) 234 if findings_blocks 235 else "No per-artifact findings available." 236 ) 237 238 priority_directives = build_priority_directives(investigation_context) 239 ioc_targets = format_ioc_targets(investigation_context) 240 241 summary_prompt = summary_prompt_template 242 replacements = { 243 "priority_directives": priority_directives, 244 "investigation_context": investigation_context.strip() or "No investigation context provided.", 245 "ioc_targets": ioc_targets, 246 "hostname": str(metadata_map.get("hostname", "Unknown")), 247 "os_version": str(metadata_map.get("os_version", "Unknown")), 248 "os_type": str(metadata_map.get("os_type", "Unknown")), 249 "domain": str(metadata_map.get("domain", "Unknown")), 250 "per_artifact_findings": findings_text, 251 } 252 for placeholder, value in replacements.items(): 253 summary_prompt = summary_prompt.replace(f"{{{{{placeholder}}}}}", value) 254 255 return summary_prompt
202def build_summary_prompt( 203 summary_prompt_template: str, 204 investigation_context: str, 205 per_artifact_results: list[Mapping[str, Any]], 206 metadata_map: Mapping[str, Any], 207) -> str: 208 """Build the cross-artifact summary prompt from a template. 209 210 Assembles per-artifact findings into a single prompt using the 211 summary template, filling in investigation context, IOC targets, 212 priority directives, and host metadata placeholders. 213 214 Args: 215 summary_prompt_template: The summary template string with 216 ``{{placeholder}}`` markers. 217 investigation_context: The user's investigation context text. 218 per_artifact_results: List of per-artifact result dicts, each 219 with ``artifact_key``, ``artifact_name``, and ``analysis``. 220 metadata_map: Host metadata mapping with optional ``hostname``, 221 ``os_version``, ``os_type``, and ``domain`` keys. 222 223 Returns: 224 The fully rendered summary prompt string. 225 """ 226 findings_blocks: list[str] = [] 227 for result in per_artifact_results: 228 artifact_key = str(result.get("artifact_key", "unknown")) 229 artifact_name = str(result.get("artifact_name", artifact_key)) 230 analysis = str(result.get("analysis", "")).strip() 231 findings_blocks.append(f"### {artifact_name} ({artifact_key})\n{analysis}") 232 233 findings_text = ( 234 "\n\n".join(findings_blocks) 235 if findings_blocks 236 else "No per-artifact findings available." 237 ) 238 239 priority_directives = build_priority_directives(investigation_context) 240 ioc_targets = format_ioc_targets(investigation_context) 241 242 summary_prompt = summary_prompt_template 243 replacements = { 244 "priority_directives": priority_directives, 245 "investigation_context": investigation_context.strip() or "No investigation context provided.", 246 "ioc_targets": ioc_targets, 247 "hostname": str(metadata_map.get("hostname", "Unknown")), 248 "os_version": str(metadata_map.get("os_version", "Unknown")), 249 "os_type": str(metadata_map.get("os_type", "Unknown")), 250 "domain": str(metadata_map.get("domain", "Unknown")), 251 "per_artifact_findings": findings_text, 252 } 253 for placeholder, value in replacements.items(): 254 summary_prompt = summary_prompt.replace(f"{{{{{placeholder}}}}}", value) 255 256 return summary_prompt
Build the cross-artifact summary prompt from a template.
Assembles per-artifact findings into a single prompt using the summary template, filling in investigation context, IOC targets, priority directives, and host metadata placeholders.
Arguments:
- summary_prompt_template: The summary template string with
{{placeholder}}markers. - investigation_context: The user's investigation context text.
- per_artifact_results: List of per-artifact result dicts, each
with
artifact_key,artifact_name, andanalysis. - metadata_map: Host metadata mapping with optional
hostname,os_version,os_type, anddomainkeys.
Returns:
The fully rendered summary prompt string.
130def load_artifact_ai_column_projections( 131 config_path: Path, 132 os_type: str = "windows", 133) -> dict[str, tuple[str, ...]]: 134 """Load per-artifact column projection configuration from YAML. 135 136 Handles OS-suffixed keys (e.g. ``services_linux``) by mapping them 137 to the base key only when the active *os_type* matches the suffix. 138 This prevents Linux-specific column definitions from overwriting 139 their Windows counterparts (and vice-versa). 140 141 Args: 142 config_path: Absolute path to the YAML config file. 143 os_type: Active operating system type (``"windows"``, 144 ``"linux"``, etc.). Controls which OS-suffixed keys are 145 accepted. 146 147 Returns: 148 A dict mapping normalized artifact keys to tuples of column names. 149 """ 150 normalized_os = normalize_os_type(os_type) 151 152 try: 153 with config_path.open("r", encoding="utf-8") as handle: 154 parsed = yaml.safe_load(handle) or {} 155 except (OSError, yaml.YAMLError) as error: 156 LOGGER.warning( 157 "Failed to load AI column projection config from %s: %s. " 158 "AI column projection is disabled.", config_path, error, 159 ) 160 return {} 161 162 if not isinstance(parsed, Mapping): 163 LOGGER.warning( 164 "Invalid AI column projection config in %s: expected a mapping, got %s.", 165 config_path, type(parsed).__name__, 166 ) 167 return {} 168 169 source: Any = parsed.get("artifact_ai_columns", parsed) 170 if not isinstance(source, Mapping): 171 LOGGER.warning( 172 "Invalid AI column projection config in %s: 'artifact_ai_columns' must be a mapping, got %s.", 173 config_path, type(source).__name__, 174 ) 175 return {} 176 177 projections: dict[str, tuple[str, ...]] = {} 178 for artifact_key, raw_columns in source.items(): 179 if artifact_key is None: 180 continue 181 raw_key = str(artifact_key) 182 183 # Handle OS-suffixed keys like "services_linux": accept only 184 # when the suffix matches the current OS, and store under the 185 # base key so the correct projection wins. 186 os_suffix = f"_{normalized_os}" 187 if raw_key.endswith(os_suffix): 188 effective_key = raw_key[: -len(os_suffix)] 189 elif raw_key.rsplit("_", 1)[-1] in ("linux", "windows", "esxi"): 190 # OS-suffixed key for a *different* OS — skip it. 191 continue 192 else: 193 effective_key = raw_key 194 195 normalized_key = normalize_artifact_key(effective_key) 196 columns = coerce_projection_columns(raw_columns) 197 if columns: 198 projections[normalized_key] = tuple(columns) 199 return projections
Load per-artifact column projection configuration from YAML.
Handles OS-suffixed keys (e.g. services_linux) by mapping them
to the base key only when the active os_type matches the suffix.
This prevents Linux-specific column definitions from overwriting
their Windows counterparts (and vice-versa).
Arguments:
- config_path: Absolute path to the YAML config file.
- os_type: Active operating system type (
"windows","linux", etc.). Controls which OS-suffixed keys are accepted.
Returns:
A dict mapping normalized artifact keys to tuples of column names.
56def load_artifact_instruction_prompts( 57 prompts_dir: Path, 58 os_type: str = "windows", 59) -> dict[str, str]: 60 """Load per-artifact analysis instruction prompts from disk. 61 62 Selects the OS-specific instruction directory based on *os_type*: 63 64 - ``"windows"`` (default): ``artifact_instructions/`` 65 - ``"linux"``: ``artifact_instructions_linux/`` 66 67 For any other OS value the function falls back to the Windows 68 directory. 69 70 Args: 71 prompts_dir: Directory containing prompt template files. 72 os_type: Operating system identifier (e.g. ``"windows"``, 73 ``"linux"``). Determines which sub-directory to scan. 74 75 Returns: 76 A dict mapping lowercased artifact keys to instruction prompt text. 77 """ 78 normalized_os = normalize_os_type(os_type) 79 if normalized_os == "linux": 80 instructions_dir = prompts_dir / "artifact_instructions_linux" 81 else: 82 instructions_dir = prompts_dir / "artifact_instructions" 83 84 if not instructions_dir.exists() or not instructions_dir.is_dir(): 85 return {} 86 87 prompts: dict[str, str] = {} 88 for prompt_path in sorted(instructions_dir.glob("*.md")): 89 try: 90 prompt_text = prompt_path.read_text(encoding="utf-8").strip() 91 except OSError: 92 continue 93 if not prompt_text: 94 continue 95 prompts[prompt_path.stem.strip().lower()] = prompt_text 96 return prompts
Load per-artifact analysis instruction prompts from disk.
Selects the OS-specific instruction directory based on os_type:
"windows"(default):artifact_instructions/"linux":artifact_instructions_linux/
For any other OS value the function falls back to the Windows directory.
Arguments:
- prompts_dir: Directory containing prompt template files.
- os_type: Operating system identifier (e.g.
"windows","linux"). Determines which sub-directory to scan.
Returns:
A dict mapping lowercased artifact keys to instruction prompt text.
38def load_prompt_template(prompts_dir: Path, filename: str, default: str) -> str: 39 """Read a prompt template file from the prompts directory. 40 41 Args: 42 prompts_dir: Directory containing prompt template files. 43 filename: Name of the template file. 44 default: Fallback template string if the file cannot be read. 45 46 Returns: 47 The template text, or *default* if reading fails. 48 """ 49 try: 50 prompt_path = prompts_dir / filename 51 return prompt_path.read_text(encoding="utf-8") 52 except OSError: 53 return default
Read a prompt template file from the prompts directory.
Arguments:
- prompts_dir: Directory containing prompt template files.
- filename: Name of the template file.
- default: Fallback template string if the file cannot be read.
Returns:
The template text, or default if reading fails.
99def resolve_artifact_ai_columns_config_path( 100 configured_path: str | Path, 101 case_dir: Path | None, 102) -> Path: 103 """Resolve the artifact AI columns config path to an absolute Path. 104 105 Checks for the file at the configured path, then relative to the 106 case directory, and finally relative to the project root. 107 108 Args: 109 configured_path: The configured path (may be relative). 110 case_dir: Path to the current case directory, or ``None``. 111 112 Returns: 113 Resolved absolute ``Path`` to the YAML config file. 114 """ 115 configured = Path(configured_path).expanduser() 116 if configured.is_absolute(): 117 return configured 118 119 candidates: list[Path] = [] 120 if case_dir is not None: 121 candidates.append(case_dir / configured) 122 candidates.append(PROJECT_ROOT / configured) 123 124 for candidate in candidates: 125 if candidate.exists(): 126 return candidate 127 return candidates[-1]
Resolve the artifact AI columns config path to an absolute Path.
Checks for the file at the configured path, then relative to the case directory, and finally relative to the project root.
Arguments:
- configured_path: The configured path (may be relative).
- case_dir: Path to the current case directory, or
None.
Returns:
Resolved absolute
Pathto the YAML config file.