app.analyzer.prompts

Prompt template loading and construction for forensic analysis.

Provides functions for loading prompt templates from disk, loading per-artifact instruction prompts, resolving AI column projection configurations, and building the cross-artifact summary prompt.

These were extracted from ForensicAnalyzer to keep the core orchestration class focused on pipeline coordination.

Attributes:
  • LOGGER: Module-level logger instance.
  1"""Prompt template loading and construction for forensic analysis.
  2
  3Provides functions for loading prompt templates from disk, loading
  4per-artifact instruction prompts, resolving AI column projection
  5configurations, and building the cross-artifact summary prompt.
  6
  7These were extracted from ``ForensicAnalyzer`` to keep the core
  8orchestration class focused on pipeline coordination.
  9
 10Attributes:
 11    LOGGER: Module-level logger instance.
 12"""
 13
 14from __future__ import annotations
 15
 16import logging
 17from pathlib import Path
 18from typing import Any, Mapping
 19
 20import yaml
 21
 22from .constants import PROJECT_ROOT
 23from .ioc import build_priority_directives, format_ioc_targets
 24from .utils import coerce_projection_columns, normalize_artifact_key, normalize_os_type
 25
 26LOGGER = logging.getLogger(__name__)
 27
 28__all__ = [
 29    "build_summary_prompt",
 30    "load_artifact_ai_column_projections",
 31    "load_artifact_instruction_prompts",
 32    "load_prompt_template",
 33    "resolve_artifact_ai_columns_config_path",
 34]
 35
 36
 37def load_prompt_template(prompts_dir: Path, filename: str, default: str) -> str:
 38    """Read a prompt template file from the prompts directory.
 39
 40    Args:
 41        prompts_dir: Directory containing prompt template files.
 42        filename: Name of the template file.
 43        default: Fallback template string if the file cannot be read.
 44
 45    Returns:
 46        The template text, or *default* if reading fails.
 47    """
 48    try:
 49        prompt_path = prompts_dir / filename
 50        return prompt_path.read_text(encoding="utf-8")
 51    except OSError:
 52        return default
 53
 54
 55def load_artifact_instruction_prompts(
 56    prompts_dir: Path,
 57    os_type: str = "windows",
 58) -> dict[str, str]:
 59    """Load per-artifact analysis instruction prompts from disk.
 60
 61    Selects the OS-specific instruction directory based on *os_type*:
 62
 63    - ``"windows"`` (default): ``artifact_instructions/``
 64    - ``"linux"``: ``artifact_instructions_linux/``
 65
 66    For any other OS value the function falls back to the Windows
 67    directory.
 68
 69    Args:
 70        prompts_dir: Directory containing prompt template files.
 71        os_type: Operating system identifier (e.g. ``"windows"``,
 72            ``"linux"``).  Determines which sub-directory to scan.
 73
 74    Returns:
 75        A dict mapping lowercased artifact keys to instruction prompt text.
 76    """
 77    normalized_os = normalize_os_type(os_type)
 78    if normalized_os == "linux":
 79        instructions_dir = prompts_dir / "artifact_instructions_linux"
 80    else:
 81        instructions_dir = prompts_dir / "artifact_instructions"
 82
 83    if not instructions_dir.exists() or not instructions_dir.is_dir():
 84        return {}
 85
 86    prompts: dict[str, str] = {}
 87    for prompt_path in sorted(instructions_dir.glob("*.md")):
 88        try:
 89            prompt_text = prompt_path.read_text(encoding="utf-8").strip()
 90        except OSError:
 91            continue
 92        if not prompt_text:
 93            continue
 94        prompts[prompt_path.stem.strip().lower()] = prompt_text
 95    return prompts
 96
 97
 98def resolve_artifact_ai_columns_config_path(
 99    configured_path: str | Path,
100    case_dir: Path | None,
101) -> Path:
102    """Resolve the artifact AI columns config path to an absolute Path.
103
104    Checks for the file at the configured path, then relative to the
105    case directory, and finally relative to the project root.
106
107    Args:
108        configured_path: The configured path (may be relative).
109        case_dir: Path to the current case directory, or ``None``.
110
111    Returns:
112        Resolved absolute ``Path`` to the YAML config file.
113    """
114    configured = Path(configured_path).expanduser()
115    if configured.is_absolute():
116        return configured
117
118    candidates: list[Path] = []
119    if case_dir is not None:
120        candidates.append(case_dir / configured)
121    candidates.append(PROJECT_ROOT / configured)
122
123    for candidate in candidates:
124        if candidate.exists():
125            return candidate
126    return candidates[-1]
127
128
129def load_artifact_ai_column_projections(
130    config_path: Path,
131    os_type: str = "windows",
132) -> dict[str, tuple[str, ...]]:
133    """Load per-artifact column projection configuration from YAML.
134
135    Handles OS-suffixed keys (e.g. ``services_linux``) by mapping them
136    to the base key only when the active *os_type* matches the suffix.
137    This prevents Linux-specific column definitions from overwriting
138    their Windows counterparts (and vice-versa).
139
140    Args:
141        config_path: Absolute path to the YAML config file.
142        os_type: Active operating system type (``"windows"``,
143            ``"linux"``, etc.).  Controls which OS-suffixed keys are
144            accepted.
145
146    Returns:
147        A dict mapping normalized artifact keys to tuples of column names.
148    """
149    normalized_os = normalize_os_type(os_type)
150
151    try:
152        with config_path.open("r", encoding="utf-8") as handle:
153            parsed = yaml.safe_load(handle) or {}
154    except (OSError, yaml.YAMLError) as error:
155        LOGGER.warning(
156            "Failed to load AI column projection config from %s: %s. "
157            "AI column projection is disabled.", config_path, error,
158        )
159        return {}
160
161    if not isinstance(parsed, Mapping):
162        LOGGER.warning(
163            "Invalid AI column projection config in %s: expected a mapping, got %s.",
164            config_path, type(parsed).__name__,
165        )
166        return {}
167
168    source: Any = parsed.get("artifact_ai_columns", parsed)
169    if not isinstance(source, Mapping):
170        LOGGER.warning(
171            "Invalid AI column projection config in %s: 'artifact_ai_columns' must be a mapping, got %s.",
172            config_path, type(source).__name__,
173        )
174        return {}
175
176    projections: dict[str, tuple[str, ...]] = {}
177    for artifact_key, raw_columns in source.items():
178        if artifact_key is None:
179            continue
180        raw_key = str(artifact_key)
181
182        # Handle OS-suffixed keys like "services_linux": accept only
183        # when the suffix matches the current OS, and store under the
184        # base key so the correct projection wins.
185        os_suffix = f"_{normalized_os}"
186        if raw_key.endswith(os_suffix):
187            effective_key = raw_key[: -len(os_suffix)]
188        elif raw_key.rsplit("_", 1)[-1] in ("linux", "windows", "esxi"):
189            # OS-suffixed key for a *different* OS — skip it.
190            continue
191        else:
192            effective_key = raw_key
193
194        normalized_key = normalize_artifact_key(effective_key)
195        columns = coerce_projection_columns(raw_columns)
196        if columns:
197            projections[normalized_key] = tuple(columns)
198    return projections
199
200
201def build_summary_prompt(
202    summary_prompt_template: str,
203    investigation_context: str,
204    per_artifact_results: list[Mapping[str, Any]],
205    metadata_map: Mapping[str, Any],
206) -> str:
207    """Build the cross-artifact summary prompt from a template.
208
209    Assembles per-artifact findings into a single prompt using the
210    summary template, filling in investigation context, IOC targets,
211    priority directives, and host metadata placeholders.
212
213    Args:
214        summary_prompt_template: The summary template string with
215            ``{{placeholder}}`` markers.
216        investigation_context: The user's investigation context text.
217        per_artifact_results: List of per-artifact result dicts, each
218            with ``artifact_key``, ``artifact_name``, and ``analysis``.
219        metadata_map: Host metadata mapping with optional ``hostname``,
220            ``os_version``, ``os_type``, and ``domain`` keys.
221
222    Returns:
223        The fully rendered summary prompt string.
224    """
225    findings_blocks: list[str] = []
226    for result in per_artifact_results:
227        artifact_key = str(result.get("artifact_key", "unknown"))
228        artifact_name = str(result.get("artifact_name", artifact_key))
229        analysis = str(result.get("analysis", "")).strip()
230        findings_blocks.append(f"### {artifact_name} ({artifact_key})\n{analysis}")
231
232    findings_text = (
233        "\n\n".join(findings_blocks)
234        if findings_blocks
235        else "No per-artifact findings available."
236    )
237
238    priority_directives = build_priority_directives(investigation_context)
239    ioc_targets = format_ioc_targets(investigation_context)
240
241    summary_prompt = summary_prompt_template
242    replacements = {
243        "priority_directives": priority_directives,
244        "investigation_context": investigation_context.strip() or "No investigation context provided.",
245        "ioc_targets": ioc_targets,
246        "hostname": str(metadata_map.get("hostname", "Unknown")),
247        "os_version": str(metadata_map.get("os_version", "Unknown")),
248        "os_type": str(metadata_map.get("os_type", "Unknown")),
249        "domain": str(metadata_map.get("domain", "Unknown")),
250        "per_artifact_findings": findings_text,
251    }
252    for placeholder, value in replacements.items():
253        summary_prompt = summary_prompt.replace(f"{{{{{placeholder}}}}}", value)
254
255    return summary_prompt
def build_summary_prompt( summary_prompt_template: str, investigation_context: str, per_artifact_results: list[typing.Mapping[str, typing.Any]], metadata_map: Mapping[str, Any]) -> str:
202def build_summary_prompt(
203    summary_prompt_template: str,
204    investigation_context: str,
205    per_artifact_results: list[Mapping[str, Any]],
206    metadata_map: Mapping[str, Any],
207) -> str:
208    """Build the cross-artifact summary prompt from a template.
209
210    Assembles per-artifact findings into a single prompt using the
211    summary template, filling in investigation context, IOC targets,
212    priority directives, and host metadata placeholders.
213
214    Args:
215        summary_prompt_template: The summary template string with
216            ``{{placeholder}}`` markers.
217        investigation_context: The user's investigation context text.
218        per_artifact_results: List of per-artifact result dicts, each
219            with ``artifact_key``, ``artifact_name``, and ``analysis``.
220        metadata_map: Host metadata mapping with optional ``hostname``,
221            ``os_version``, ``os_type``, and ``domain`` keys.
222
223    Returns:
224        The fully rendered summary prompt string.
225    """
226    findings_blocks: list[str] = []
227    for result in per_artifact_results:
228        artifact_key = str(result.get("artifact_key", "unknown"))
229        artifact_name = str(result.get("artifact_name", artifact_key))
230        analysis = str(result.get("analysis", "")).strip()
231        findings_blocks.append(f"### {artifact_name} ({artifact_key})\n{analysis}")
232
233    findings_text = (
234        "\n\n".join(findings_blocks)
235        if findings_blocks
236        else "No per-artifact findings available."
237    )
238
239    priority_directives = build_priority_directives(investigation_context)
240    ioc_targets = format_ioc_targets(investigation_context)
241
242    summary_prompt = summary_prompt_template
243    replacements = {
244        "priority_directives": priority_directives,
245        "investigation_context": investigation_context.strip() or "No investigation context provided.",
246        "ioc_targets": ioc_targets,
247        "hostname": str(metadata_map.get("hostname", "Unknown")),
248        "os_version": str(metadata_map.get("os_version", "Unknown")),
249        "os_type": str(metadata_map.get("os_type", "Unknown")),
250        "domain": str(metadata_map.get("domain", "Unknown")),
251        "per_artifact_findings": findings_text,
252    }
253    for placeholder, value in replacements.items():
254        summary_prompt = summary_prompt.replace(f"{{{{{placeholder}}}}}", value)
255
256    return summary_prompt

Build the cross-artifact summary prompt from a template.

Assembles per-artifact findings into a single prompt using the summary template, filling in investigation context, IOC targets, priority directives, and host metadata placeholders.

Arguments:
  • summary_prompt_template: The summary template string with {{placeholder}} markers.
  • investigation_context: The user's investigation context text.
  • per_artifact_results: List of per-artifact result dicts, each with artifact_key, artifact_name, and analysis.
  • metadata_map: Host metadata mapping with optional hostname, os_version, os_type, and domain keys.
Returns:

The fully rendered summary prompt string.

def load_artifact_ai_column_projections( config_path: pathlib.Path, os_type: str = 'windows') -> dict[str, tuple[str, ...]]:
130def load_artifact_ai_column_projections(
131    config_path: Path,
132    os_type: str = "windows",
133) -> dict[str, tuple[str, ...]]:
134    """Load per-artifact column projection configuration from YAML.
135
136    Handles OS-suffixed keys (e.g. ``services_linux``) by mapping them
137    to the base key only when the active *os_type* matches the suffix.
138    This prevents Linux-specific column definitions from overwriting
139    their Windows counterparts (and vice-versa).
140
141    Args:
142        config_path: Absolute path to the YAML config file.
143        os_type: Active operating system type (``"windows"``,
144            ``"linux"``, etc.).  Controls which OS-suffixed keys are
145            accepted.
146
147    Returns:
148        A dict mapping normalized artifact keys to tuples of column names.
149    """
150    normalized_os = normalize_os_type(os_type)
151
152    try:
153        with config_path.open("r", encoding="utf-8") as handle:
154            parsed = yaml.safe_load(handle) or {}
155    except (OSError, yaml.YAMLError) as error:
156        LOGGER.warning(
157            "Failed to load AI column projection config from %s: %s. "
158            "AI column projection is disabled.", config_path, error,
159        )
160        return {}
161
162    if not isinstance(parsed, Mapping):
163        LOGGER.warning(
164            "Invalid AI column projection config in %s: expected a mapping, got %s.",
165            config_path, type(parsed).__name__,
166        )
167        return {}
168
169    source: Any = parsed.get("artifact_ai_columns", parsed)
170    if not isinstance(source, Mapping):
171        LOGGER.warning(
172            "Invalid AI column projection config in %s: 'artifact_ai_columns' must be a mapping, got %s.",
173            config_path, type(source).__name__,
174        )
175        return {}
176
177    projections: dict[str, tuple[str, ...]] = {}
178    for artifact_key, raw_columns in source.items():
179        if artifact_key is None:
180            continue
181        raw_key = str(artifact_key)
182
183        # Handle OS-suffixed keys like "services_linux": accept only
184        # when the suffix matches the current OS, and store under the
185        # base key so the correct projection wins.
186        os_suffix = f"_{normalized_os}"
187        if raw_key.endswith(os_suffix):
188            effective_key = raw_key[: -len(os_suffix)]
189        elif raw_key.rsplit("_", 1)[-1] in ("linux", "windows", "esxi"):
190            # OS-suffixed key for a *different* OS — skip it.
191            continue
192        else:
193            effective_key = raw_key
194
195        normalized_key = normalize_artifact_key(effective_key)
196        columns = coerce_projection_columns(raw_columns)
197        if columns:
198            projections[normalized_key] = tuple(columns)
199    return projections

Load per-artifact column projection configuration from YAML.

Handles OS-suffixed keys (e.g. services_linux) by mapping them to the base key only when the active os_type matches the suffix. This prevents Linux-specific column definitions from overwriting their Windows counterparts (and vice-versa).

Arguments:
  • config_path: Absolute path to the YAML config file.
  • os_type: Active operating system type ("windows", "linux", etc.). Controls which OS-suffixed keys are accepted.
Returns:

A dict mapping normalized artifact keys to tuples of column names.

def load_artifact_instruction_prompts(prompts_dir: pathlib.Path, os_type: str = 'windows') -> dict[str, str]:
56def load_artifact_instruction_prompts(
57    prompts_dir: Path,
58    os_type: str = "windows",
59) -> dict[str, str]:
60    """Load per-artifact analysis instruction prompts from disk.
61
62    Selects the OS-specific instruction directory based on *os_type*:
63
64    - ``"windows"`` (default): ``artifact_instructions/``
65    - ``"linux"``: ``artifact_instructions_linux/``
66
67    For any other OS value the function falls back to the Windows
68    directory.
69
70    Args:
71        prompts_dir: Directory containing prompt template files.
72        os_type: Operating system identifier (e.g. ``"windows"``,
73            ``"linux"``).  Determines which sub-directory to scan.
74
75    Returns:
76        A dict mapping lowercased artifact keys to instruction prompt text.
77    """
78    normalized_os = normalize_os_type(os_type)
79    if normalized_os == "linux":
80        instructions_dir = prompts_dir / "artifact_instructions_linux"
81    else:
82        instructions_dir = prompts_dir / "artifact_instructions"
83
84    if not instructions_dir.exists() or not instructions_dir.is_dir():
85        return {}
86
87    prompts: dict[str, str] = {}
88    for prompt_path in sorted(instructions_dir.glob("*.md")):
89        try:
90            prompt_text = prompt_path.read_text(encoding="utf-8").strip()
91        except OSError:
92            continue
93        if not prompt_text:
94            continue
95        prompts[prompt_path.stem.strip().lower()] = prompt_text
96    return prompts

Load per-artifact analysis instruction prompts from disk.

Selects the OS-specific instruction directory based on os_type:

  • "windows" (default): artifact_instructions/
  • "linux": artifact_instructions_linux/

For any other OS value the function falls back to the Windows directory.

Arguments:
  • prompts_dir: Directory containing prompt template files.
  • os_type: Operating system identifier (e.g. "windows", "linux"). Determines which sub-directory to scan.
Returns:

A dict mapping lowercased artifact keys to instruction prompt text.

def load_prompt_template(prompts_dir: pathlib.Path, filename: str, default: str) -> str:
38def load_prompt_template(prompts_dir: Path, filename: str, default: str) -> str:
39    """Read a prompt template file from the prompts directory.
40
41    Args:
42        prompts_dir: Directory containing prompt template files.
43        filename: Name of the template file.
44        default: Fallback template string if the file cannot be read.
45
46    Returns:
47        The template text, or *default* if reading fails.
48    """
49    try:
50        prompt_path = prompts_dir / filename
51        return prompt_path.read_text(encoding="utf-8")
52    except OSError:
53        return default

Read a prompt template file from the prompts directory.

Arguments:
  • prompts_dir: Directory containing prompt template files.
  • filename: Name of the template file.
  • default: Fallback template string if the file cannot be read.
Returns:

The template text, or default if reading fails.

def resolve_artifact_ai_columns_config_path( configured_path: str | pathlib.Path, case_dir: pathlib.Path | None) -> pathlib.Path:
 99def resolve_artifact_ai_columns_config_path(
100    configured_path: str | Path,
101    case_dir: Path | None,
102) -> Path:
103    """Resolve the artifact AI columns config path to an absolute Path.
104
105    Checks for the file at the configured path, then relative to the
106    case directory, and finally relative to the project root.
107
108    Args:
109        configured_path: The configured path (may be relative).
110        case_dir: Path to the current case directory, or ``None``.
111
112    Returns:
113        Resolved absolute ``Path`` to the YAML config file.
114    """
115    configured = Path(configured_path).expanduser()
116    if configured.is_absolute():
117        return configured
118
119    candidates: list[Path] = []
120    if case_dir is not None:
121        candidates.append(case_dir / configured)
122    candidates.append(PROJECT_ROOT / configured)
123
124    for candidate in candidates:
125        if candidate.exists():
126            return candidate
127    return candidates[-1]

Resolve the artifact AI columns config path to an absolute Path.

Checks for the file at the configured path, then relative to the case directory, and finally relative to the project root.

Arguments:
  • configured_path: The configured path (may be relative).
  • case_dir: Path to the current case directory, or None.
Returns:

Resolved absolute Path to the YAML config file.