app.analyzer.utils

Pure utility functions for the forensic analyzer pipeline.

Provides string manipulation, datetime parsing, CSV normalisation, filename sanitisation, token estimation, and other stateless helpers used across the analyzer sub-modules.

  1"""Pure utility functions for the forensic analyzer pipeline.
  2
  3Provides string manipulation, datetime parsing, CSV normalisation, filename
  4sanitisation, token estimation, and other stateless helpers used across the
  5analyzer sub-modules.
  6"""
  7
  8from __future__ import annotations
  9
 10import re
 11from datetime import datetime, timezone
 12from pathlib import Path
 13from typing import Any, Iterable, Mapping
 14
 15from .constants import (
 16    INTEGER_RE,
 17    TIMESTAMP_COLUMN_HINTS,
 18    TOKEN_CHAR_RATIO,
 19)
 20
 21try:
 22    import tiktoken
 23    _TIKTOKEN_AVAILABLE = True
 24except ImportError:
 25    _TIKTOKEN_AVAILABLE = False
 26
 27__all__ = [
 28    "stringify_value",
 29    "format_datetime",
 30    "normalize_table_cell",
 31    "sanitize_filename",
 32    "build_datetime",
 33    "parse_int",
 34    "normalize_datetime",
 35    "parse_datetime_value",
 36    "looks_like_timestamp_column",
 37    "extract_row_datetime",
 38    "time_range_for_rows",
 39    "normalize_artifact_key",
 40    "unique_preserve_order",
 41    "truncate_for_prompt",
 42    "extract_url_host",
 43    "normalize_csv_row",
 44    "coerce_projection_columns",
 45    "emit_analysis_progress",
 46    "estimate_tokens",
 47    "is_dedup_safe_identifier_column",
 48    "normalize_os_type",
 49    "read_int_setting",
 50    "read_bool_setting",
 51    "read_path_setting",
 52]
 53
 54
 55# ---------------------------------------------------------------------------
 56# String helpers
 57# ---------------------------------------------------------------------------
 58
 59def stringify_value(value: Any) -> str:
 60    """Convert an arbitrary value to a stripped string.
 61
 62    Args:
 63        value: Any value (string, ``None``, number, etc.).
 64
 65    Returns:
 66        The stripped string representation, or an empty string for ``None``.
 67    """
 68    if value is None:
 69        return ""
 70    if isinstance(value, str):
 71        return value.strip()
 72    return str(value).strip()
 73
 74
 75def format_datetime(value: datetime | None) -> str:
 76    """Format a datetime as an ISO string, or ``"N/A"`` for ``None``.
 77
 78    Args:
 79        value: Datetime to format, or ``None``.
 80
 81    Returns:
 82        ISO-formatted datetime string or ``"N/A"``.
 83    """
 84    if value is None:
 85        return "N/A"
 86    return value.isoformat()
 87
 88
 89def normalize_table_cell(value: str, cell_limit: int) -> str:
 90    """Normalize and truncate a cell value for table/statistics display.
 91
 92    Replaces newlines and pipe characters, strips whitespace, and
 93    truncates with an ellipsis if the value exceeds *cell_limit*.
 94
 95    Args:
 96        value: Raw cell value string.
 97        cell_limit: Maximum character length for the output.
 98
 99    Returns:
100        The cleaned and possibly truncated string.
101    """
102    text = value.replace("\r", " ").replace("\n", " ").replace("|", r"\|").strip()
103    if len(text) <= cell_limit:
104        return text
105    if cell_limit <= 3:
106        return text[:cell_limit]
107    return f"{text[: cell_limit - 3]}..."
108
109
110def sanitize_filename(value: str) -> str:
111    """Sanitize a string for use as a safe filename.
112
113    Args:
114        value: Raw string to sanitize.
115
116    Returns:
117        A filesystem-safe filename string, or ``"artifact"`` if empty.
118    """
119    cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")
120    return cleaned or "artifact"
121
122
123def truncate_for_prompt(value: str, limit: int) -> str:
124    """Truncate a string to fit within a character limit for prompt inclusion.
125
126    Args:
127        value: The string to truncate.
128        limit: Maximum allowed character count.
129
130    Returns:
131        The original string if it fits, or a truncated version.
132    """
133    text = str(value or "").strip()
134    if len(text) <= limit:
135        return text
136    if limit <= 20:
137        return text[:limit]
138    return f"{text[: limit - 14].rstrip()} ... [truncated]"
139
140
141def unique_preserve_order(values: Iterable[str]) -> list[str]:
142    """Deduplicate strings while preserving first-occurrence order.
143
144    Values are stripped, surrounding quotes/brackets are removed, and
145    trailing punctuation is trimmed before deduplication (case-insensitive).
146
147    Args:
148        values: Iterable of raw string values to deduplicate.
149
150    Returns:
151        A list of cleaned, unique strings in their original order.
152    """
153    unique: list[str] = []
154    seen: set[str] = set()
155    for raw_value in values:
156        value = str(raw_value).strip()
157        value = value.strip("\"'()[]{}<>")
158        value = value.rstrip(".,;:")
159        if not value:
160            continue
161        key = value.lower()
162        if key in seen:
163            continue
164        seen.add(key)
165        unique.append(value)
166    return unique
167
168
169# ---------------------------------------------------------------------------
170# Datetime helpers
171# ---------------------------------------------------------------------------
172
173def build_datetime(year: str, month: str, day: str) -> datetime | None:
174    """Construct a datetime from string year, month, and day components.
175
176    Args:
177        year: Year string (e.g., ``"2025"``).
178        month: Month string (``"1"`` through ``"12"``).
179        day: Day string (``"1"`` through ``"31"``).
180
181    Returns:
182        A ``datetime`` at midnight for the given date, or ``None``.
183    """
184    try:
185        return datetime(int(year), int(month), int(day))
186    except ValueError:
187        return None
188
189
190def normalize_datetime(value: datetime) -> datetime:
191    """Convert a datetime to a naive UTC datetime.
192
193    Args:
194        value: Datetime to normalize.
195
196    Returns:
197        A naive ``datetime`` representing the same instant in UTC.
198    """
199    if value.tzinfo is None:
200        return value
201    return value.astimezone(timezone.utc).replace(tzinfo=None)
202
203
204def parse_int(value: str) -> int | None:
205    """Extract and parse the first integer from a string.
206
207    Args:
208        value: String that may contain an integer.
209
210    Returns:
211        The parsed integer, or ``None``.
212    """
213    if not value:
214        return None
215    match = INTEGER_RE.search(value)
216    if not match:
217        return None
218    try:
219        return int(match.group())
220    except ValueError:
221        return None
222
223
224def parse_datetime_value(value: str, *, allow_epoch: bool = True) -> datetime | None:
225    """Attempt to parse a string value into a naive UTC datetime.
226
227    Tries ISO format first, then common date/time formats, and optionally
228    epoch timestamps (seconds or milliseconds).
229
230    Args:
231        value: Raw string that may contain a date or timestamp.
232        allow_epoch: If ``True`` (default), bare integers in the plausible
233            epoch range are accepted.  Set to ``False`` when scanning
234            columns that are not known to hold timestamps, to avoid
235            misinterpreting numeric IDs or counters as dates.
236
237    Returns:
238        A naive ``datetime`` in UTC, or ``None`` if parsing fails.
239    """
240    text = stringify_value(value)
241    if not text:
242        return None
243
244    cleaned = text.replace("Z", "+00:00")
245    try:
246        parsed = datetime.fromisoformat(cleaned)
247        return normalize_datetime(parsed)
248    except ValueError:
249        pass
250
251    for fmt in (
252        "%Y-%m-%d %H:%M:%S.%f%z",
253        "%Y-%m-%d %H:%M:%S%z",
254        "%Y-%m-%d %H:%M:%S.%f",
255        "%Y-%m-%d %H:%M:%S",
256        "%Y-%m-%d",
257        "%d-%m-%Y",
258        "%d/%m/%Y",
259        "%m/%d/%Y",
260        "%B %d, %Y",
261        "%b %d, %Y",
262        "%B %d %Y",
263        "%b %d %Y",
264    ):
265        try:
266            parsed = datetime.strptime(cleaned, fmt)
267            return normalize_datetime(parsed)
268        except ValueError:
269            continue
270
271    if not allow_epoch:
272        return None
273
274    int_value = parse_int(cleaned)
275    if int_value is not None:
276        if int_value > 1_000_000_000_000:
277            int_value //= 1000
278        if 946684800 <= int_value <= 4_102_444_800:
279            try:
280                parsed = datetime.fromtimestamp(int_value, tz=timezone.utc)
281                return normalize_datetime(parsed)
282            except (ValueError, OSError):
283                return None
284
285    return None
286
287
288def looks_like_timestamp_column(column_name: str) -> bool:
289    """Check whether a column name suggests it contains timestamp data.
290
291    Args:
292        column_name: CSV column header name.
293
294    Returns:
295        ``True`` if the lowercased name contains any timestamp hint substring.
296    """
297    lowered = column_name.strip().lower()
298    return any(hint in lowered for hint in TIMESTAMP_COLUMN_HINTS)
299
300
301def is_dedup_safe_identifier_column(column_name: str) -> bool:
302    """Return True only for auto-incremented record IDs safe for dedup.
303
304    Args:
305        column_name: CSV column header name.
306
307    Returns:
308        ``True`` if the column is a safe dedup identifier.
309    """
310    from .constants import DEDUP_SAFE_IDENTIFIER_HINTS
311    lowered = column_name.strip().lower().replace("-", "_").replace(" ", "_")
312    return lowered in DEDUP_SAFE_IDENTIFIER_HINTS
313
314
315def extract_row_datetime(row: dict[str, str], columns: list[str] | None = None) -> datetime | None:
316    """Extract the first parseable timestamp from a CSV row.
317
318    Prioritizes columns whose names look like timestamps (with full
319    parsing including epoch integers).  Falls back to remaining columns
320    but only accepts string-format dates — bare numeric values are
321    **not** treated as epoch timestamps in the fallback pass, to avoid
322    misinterpreting IDs or counters as dates.
323
324    Args:
325        row: Normalized row dict.
326        columns: Optional column list to constrain the search.
327
328    Returns:
329        The first successfully parsed ``datetime``, or ``None``.
330    """
331    all_columns = columns if columns else list(row.keys())
332    timestamp_columns = [c for c in all_columns if looks_like_timestamp_column(c)]
333
334    # Pass 1: timestamp-named columns — full parsing including epochs.
335    for column in timestamp_columns:
336        parsed = parse_datetime_value(row.get(column, ""), allow_epoch=True)
337        if parsed is not None:
338            return parsed
339
340    # Pass 2: remaining columns — string dates only, no epoch integers.
341    timestamp_set = set(timestamp_columns)
342    for column in all_columns:
343        if column in timestamp_set:
344            continue
345        parsed = parse_datetime_value(row.get(column, ""), allow_epoch=False)
346        if parsed is not None:
347            return parsed
348
349    return None
350
351
352def time_range_for_rows(rows: Iterable[dict[str, str]]) -> tuple[datetime | None, datetime | None]:
353    """Compute the earliest and latest timestamps across all rows.
354
355    Args:
356        rows: Iterable of row dicts to scan for timestamp values.
357
358    Returns:
359        A ``(min_time, max_time)`` tuple.
360    """
361    min_time: datetime | None = None
362    max_time: datetime | None = None
363    for row in rows:
364        parsed = extract_row_datetime(row=row)
365        if parsed is None:
366            continue
367        if min_time is None or parsed < min_time:
368            min_time = parsed
369        if max_time is None or parsed > max_time:
370            max_time = parsed
371    return min_time, max_time
372
373
374# ---------------------------------------------------------------------------
375# Artifact key normalisation
376# ---------------------------------------------------------------------------
377
378# Re-export from the shared module so existing callers are not broken.
379from ..os_utils import normalize_os_type as normalize_os_type  # noqa: F401, PLC0414
380
381
382def normalize_artifact_key(artifact_key: str) -> str:
383    """Normalize an artifact key to its canonical short form.
384
385    Args:
386        artifact_key: Raw artifact key string.
387
388    Returns:
389        The lowercased, normalized artifact key.
390    """
391    key = artifact_key.strip().lower()
392    if key == "mft":
393        return "mft"
394    if key.startswith("evtx") or key.endswith(".evtx") or ".evtx" in key:
395        return "evtx"
396    if key.startswith("shimcache"):
397        return "shimcache"
398    if key.startswith("amcache"):
399        return "amcache"
400    if key.startswith("prefetch"):
401        return "prefetch"
402    if key.startswith("services"):
403        return "services"
404    if key.startswith("tasks"):
405        return "tasks"
406    if key.startswith("userassist"):
407        return "userassist"
408    if key.startswith("runkeys"):
409        return "runkeys"
410    return key
411
412
413# ---------------------------------------------------------------------------
414# URL host extraction
415# ---------------------------------------------------------------------------
416
417def extract_url_host(url: str) -> str:
418    """Extract the lowercase hostname from a URL string.
419
420    Args:
421        url: A URL string.
422
423    Returns:
424        The lowercased hostname portion without scheme, port, or path.
425    """
426    text = url.strip()
427    if "://" in text:
428        text = text.split("://", 1)[1]
429    text = text.split("/", 1)[0]
430    text = text.split(":", 1)[0]
431    return text.lower().strip()
432
433
434# ---------------------------------------------------------------------------
435# CSV normalisation
436# ---------------------------------------------------------------------------
437
438def normalize_csv_row(row: dict[str | None, str | None | list[str]], columns: list[str]) -> dict[str, str]:
439    """Normalize a raw CSV DictReader row to a clean string-to-string dict.
440
441    Args:
442        row: Raw row dict from ``csv.DictReader``.
443        columns: Expected column names in the CSV.
444
445    Returns:
446        A normalized dict mapping column names to stripped string values.
447    """
448    normalized: dict[str, str] = {}
449    for column in columns:
450        normalized[column] = stringify_value(row.get(column))
451
452    extras = row.get(None)
453    if extras:
454        extra_values = [stringify_value(value) for value in extras]
455        normalized["__extra__"] = " | ".join(value for value in extra_values if value)
456
457    return normalized
458
459
460def coerce_projection_columns(value: Any) -> list[str]:
461    """Coerce a raw YAML value into a deduplicated list of column names.
462
463    Args:
464        value: Raw value from the YAML config (string, list, or other).
465
466    Returns:
467        A deduplicated list of non-empty column name strings.
468    """
469    if isinstance(value, str):
470        candidates = [part.strip() for part in value.split(",")]
471    elif isinstance(value, list):
472        candidates = [str(item).strip() for item in value]
473    else:
474        return []
475
476    deduplicated: list[str] = []
477    for candidate in candidates:
478        if candidate and candidate not in deduplicated:
479            deduplicated.append(candidate)
480    return deduplicated
481
482
483# ---------------------------------------------------------------------------
484# Progress callback
485# ---------------------------------------------------------------------------
486
487def emit_analysis_progress(
488    progress_callback: Any,
489    artifact_key: str,
490    status: str,
491    payload: dict[str, Any],
492) -> None:
493    """Emit a progress event to the frontend via the callback.
494
495    Args:
496        progress_callback: The user-supplied progress callback.
497        artifact_key: Artifact identifier for the event.
498        status: Event status.
499        payload: Event payload dict.
500    """
501    try:
502        progress_callback(artifact_key, status, payload)
503        return
504    except TypeError:
505        pass
506    except Exception:
507        return
508
509    try:
510        progress_callback({
511            "artifact_key": artifact_key,
512            "status": status,
513            "result": payload,
514        })
515    except Exception:
516        return
517
518
519# ---------------------------------------------------------------------------
520# Token estimation
521# ---------------------------------------------------------------------------
522
523def estimate_tokens(text: str, model_info: Mapping[str, str] | None = None) -> int:
524    """Estimate the token count of a text string.
525
526    When ``tiktoken`` is available and the provider is OpenAI-compatible,
527    an exact BPE token count is returned.  Otherwise a heuristic is used.
528
529    Args:
530        text: The text to estimate token count for.
531        model_info: Optional dict with ``provider`` and ``model`` keys.
532
533    Returns:
534        Estimated number of tokens (minimum 1).
535    """
536    if not text:
537        return 1
538
539    if _TIKTOKEN_AVAILABLE and model_info is not None:
540        provider_name = str(model_info.get("provider", "")).lower()
541        if provider_name in {"openai", "local", "custom"}:
542            model_name = str(model_info.get("model", ""))
543            try:
544                enc = tiktoken.encoding_for_model(model_name)
545            except KeyError:
546                try:
547                    enc = tiktoken.get_encoding("cl100k_base")
548                except Exception:
549                    enc = None
550            if enc is not None:
551                try:
552                    return max(1, len(enc.encode(text)))
553                except Exception:
554                    pass
555
556    ascii_chars: list[str] = []
557    non_ascii_count = 0
558    for ch in text:
559        if ord(ch) < 128:
560            ascii_chars.append(ch)
561        else:
562            non_ascii_count += 1
563
564    ascii_tokens = len(ascii_chars) / max(1, TOKEN_CHAR_RATIO)
565    non_ascii_tokens = non_ascii_count * 1.5
566    raw_estimate = ascii_tokens + non_ascii_tokens
567    with_margin = raw_estimate * 1.1
568
569    return max(1, int(with_margin))
570
571
572# ---------------------------------------------------------------------------
573# Config setting readers
574# ---------------------------------------------------------------------------
575
576def read_int_setting(
577    analysis_config: Mapping[str, Any], key: str, default: int,
578    minimum: int = 1, maximum: int | None = None,
579) -> int:
580    """Read an integer setting with bounds clamping.
581
582    Args:
583        analysis_config: The ``analysis`` sub-dictionary.
584        key: Configuration key name.
585        default: Default value.
586        minimum: Lower bound (inclusive).
587        maximum: Optional upper bound (inclusive).
588
589    Returns:
590        The parsed and clamped integer value.
591    """
592    raw_value = analysis_config.get(key, default)
593    try:
594        parsed_value = int(raw_value)
595    except (TypeError, ValueError):
596        parsed_value = default
597    if parsed_value < minimum:
598        parsed_value = minimum
599    if maximum is not None and parsed_value > maximum:
600        parsed_value = maximum
601    return parsed_value
602
603
604def read_bool_setting(analysis_config: Mapping[str, Any], key: str, default: bool) -> bool:
605    """Read a boolean setting from the analysis config.
606
607    Args:
608        analysis_config: The ``analysis`` sub-dictionary.
609        key: Configuration key name.
610        default: Default value.
611
612    Returns:
613        The parsed boolean value.
614    """
615    raw_value = analysis_config.get(key, default)
616    if isinstance(raw_value, bool):
617        return raw_value
618    if isinstance(raw_value, str):
619        lowered = raw_value.strip().lower()
620        if lowered in {"true", "1", "yes", "on"}:
621            return True
622        if lowered in {"false", "0", "no", "off"}:
623            return False
624    if isinstance(raw_value, (int, float)):
625        return bool(raw_value)
626    return default
627
628
629def read_path_setting(analysis_config: Mapping[str, Any], key: str, default: str) -> str:
630    """Read a file-path setting from the analysis config.
631
632    Args:
633        analysis_config: The ``analysis`` sub-dictionary.
634        key: Configuration key name.
635        default: Default value.
636
637    Returns:
638        The cleaned path string.
639    """
640    raw_value = analysis_config.get(key, default)
641    if isinstance(raw_value, (str, Path)):
642        cleaned = str(raw_value).strip()
643        if cleaned:
644            return cleaned
645    return default
def stringify_value(value: Any) -> str:
60def stringify_value(value: Any) -> str:
61    """Convert an arbitrary value to a stripped string.
62
63    Args:
64        value: Any value (string, ``None``, number, etc.).
65
66    Returns:
67        The stripped string representation, or an empty string for ``None``.
68    """
69    if value is None:
70        return ""
71    if isinstance(value, str):
72        return value.strip()
73    return str(value).strip()

Convert an arbitrary value to a stripped string.

Arguments:
  • value: Any value (string, None, number, etc.).
Returns:

The stripped string representation, or an empty string for None.

def format_datetime(value: datetime.datetime | None) -> str:
76def format_datetime(value: datetime | None) -> str:
77    """Format a datetime as an ISO string, or ``"N/A"`` for ``None``.
78
79    Args:
80        value: Datetime to format, or ``None``.
81
82    Returns:
83        ISO-formatted datetime string or ``"N/A"``.
84    """
85    if value is None:
86        return "N/A"
87    return value.isoformat()

Format a datetime as an ISO string, or "N/A" for None.

Arguments:
  • value: Datetime to format, or None.
Returns:

ISO-formatted datetime string or "N/A".

def normalize_table_cell(value: str, cell_limit: int) -> str:
 90def normalize_table_cell(value: str, cell_limit: int) -> str:
 91    """Normalize and truncate a cell value for table/statistics display.
 92
 93    Replaces newlines and pipe characters, strips whitespace, and
 94    truncates with an ellipsis if the value exceeds *cell_limit*.
 95
 96    Args:
 97        value: Raw cell value string.
 98        cell_limit: Maximum character length for the output.
 99
100    Returns:
101        The cleaned and possibly truncated string.
102    """
103    text = value.replace("\r", " ").replace("\n", " ").replace("|", r"\|").strip()
104    if len(text) <= cell_limit:
105        return text
106    if cell_limit <= 3:
107        return text[:cell_limit]
108    return f"{text[: cell_limit - 3]}..."

Normalize and truncate a cell value for table/statistics display.

Replaces newlines and pipe characters, strips whitespace, and truncates with an ellipsis if the value exceeds cell_limit.

Arguments:
  • value: Raw cell value string.
  • cell_limit: Maximum character length for the output.
Returns:

The cleaned and possibly truncated string.

def sanitize_filename(value: str) -> str:
111def sanitize_filename(value: str) -> str:
112    """Sanitize a string for use as a safe filename.
113
114    Args:
115        value: Raw string to sanitize.
116
117    Returns:
118        A filesystem-safe filename string, or ``"artifact"`` if empty.
119    """
120    cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_")
121    return cleaned or "artifact"

Sanitize a string for use as a safe filename.

Arguments:
  • value: Raw string to sanitize.
Returns:

A filesystem-safe filename string, or "artifact" if empty.

def build_datetime(year: str, month: str, day: str) -> datetime.datetime | None:
174def build_datetime(year: str, month: str, day: str) -> datetime | None:
175    """Construct a datetime from string year, month, and day components.
176
177    Args:
178        year: Year string (e.g., ``"2025"``).
179        month: Month string (``"1"`` through ``"12"``).
180        day: Day string (``"1"`` through ``"31"``).
181
182    Returns:
183        A ``datetime`` at midnight for the given date, or ``None``.
184    """
185    try:
186        return datetime(int(year), int(month), int(day))
187    except ValueError:
188        return None

Construct a datetime from string year, month, and day components.

Arguments:
  • year: Year string (e.g., "2025").
  • month: Month string ("1" through "12").
  • day: Day string ("1" through "31").
Returns:

A datetime at midnight for the given date, or None.

def parse_int(value: str) -> int | None:
205def parse_int(value: str) -> int | None:
206    """Extract and parse the first integer from a string.
207
208    Args:
209        value: String that may contain an integer.
210
211    Returns:
212        The parsed integer, or ``None``.
213    """
214    if not value:
215        return None
216    match = INTEGER_RE.search(value)
217    if not match:
218        return None
219    try:
220        return int(match.group())
221    except ValueError:
222        return None

Extract and parse the first integer from a string.

Arguments:
  • value: String that may contain an integer.
Returns:

The parsed integer, or None.

def normalize_datetime(value: datetime.datetime) -> datetime.datetime:
191def normalize_datetime(value: datetime) -> datetime:
192    """Convert a datetime to a naive UTC datetime.
193
194    Args:
195        value: Datetime to normalize.
196
197    Returns:
198        A naive ``datetime`` representing the same instant in UTC.
199    """
200    if value.tzinfo is None:
201        return value
202    return value.astimezone(timezone.utc).replace(tzinfo=None)

Convert a datetime to a naive UTC datetime.

Arguments:
  • value: Datetime to normalize.
Returns:

A naive datetime representing the same instant in UTC.

def parse_datetime_value(value: str, *, allow_epoch: bool = True) -> datetime.datetime | None:
225def parse_datetime_value(value: str, *, allow_epoch: bool = True) -> datetime | None:
226    """Attempt to parse a string value into a naive UTC datetime.
227
228    Tries ISO format first, then common date/time formats, and optionally
229    epoch timestamps (seconds or milliseconds).
230
231    Args:
232        value: Raw string that may contain a date or timestamp.
233        allow_epoch: If ``True`` (default), bare integers in the plausible
234            epoch range are accepted.  Set to ``False`` when scanning
235            columns that are not known to hold timestamps, to avoid
236            misinterpreting numeric IDs or counters as dates.
237
238    Returns:
239        A naive ``datetime`` in UTC, or ``None`` if parsing fails.
240    """
241    text = stringify_value(value)
242    if not text:
243        return None
244
245    cleaned = text.replace("Z", "+00:00")
246    try:
247        parsed = datetime.fromisoformat(cleaned)
248        return normalize_datetime(parsed)
249    except ValueError:
250        pass
251
252    for fmt in (
253        "%Y-%m-%d %H:%M:%S.%f%z",
254        "%Y-%m-%d %H:%M:%S%z",
255        "%Y-%m-%d %H:%M:%S.%f",
256        "%Y-%m-%d %H:%M:%S",
257        "%Y-%m-%d",
258        "%d-%m-%Y",
259        "%d/%m/%Y",
260        "%m/%d/%Y",
261        "%B %d, %Y",
262        "%b %d, %Y",
263        "%B %d %Y",
264        "%b %d %Y",
265    ):
266        try:
267            parsed = datetime.strptime(cleaned, fmt)
268            return normalize_datetime(parsed)
269        except ValueError:
270            continue
271
272    if not allow_epoch:
273        return None
274
275    int_value = parse_int(cleaned)
276    if int_value is not None:
277        if int_value > 1_000_000_000_000:
278            int_value //= 1000
279        if 946684800 <= int_value <= 4_102_444_800:
280            try:
281                parsed = datetime.fromtimestamp(int_value, tz=timezone.utc)
282                return normalize_datetime(parsed)
283            except (ValueError, OSError):
284                return None
285
286    return None

Attempt to parse a string value into a naive UTC datetime.

Tries ISO format first, then common date/time formats, and optionally epoch timestamps (seconds or milliseconds).

Arguments:
  • value: Raw string that may contain a date or timestamp.
  • allow_epoch: If True (default), bare integers in the plausible epoch range are accepted. Set to False when scanning columns that are not known to hold timestamps, to avoid misinterpreting numeric IDs or counters as dates.
Returns:

A naive datetime in UTC, or None if parsing fails.

def looks_like_timestamp_column(column_name: str) -> bool:
289def looks_like_timestamp_column(column_name: str) -> bool:
290    """Check whether a column name suggests it contains timestamp data.
291
292    Args:
293        column_name: CSV column header name.
294
295    Returns:
296        ``True`` if the lowercased name contains any timestamp hint substring.
297    """
298    lowered = column_name.strip().lower()
299    return any(hint in lowered for hint in TIMESTAMP_COLUMN_HINTS)

Check whether a column name suggests it contains timestamp data.

Arguments:
  • column_name: CSV column header name.
Returns:

True if the lowercased name contains any timestamp hint substring.

def extract_row_datetime( row: dict[str, str], columns: list[str] | None = None) -> datetime.datetime | None:
316def extract_row_datetime(row: dict[str, str], columns: list[str] | None = None) -> datetime | None:
317    """Extract the first parseable timestamp from a CSV row.
318
319    Prioritizes columns whose names look like timestamps (with full
320    parsing including epoch integers).  Falls back to remaining columns
321    but only accepts string-format dates — bare numeric values are
322    **not** treated as epoch timestamps in the fallback pass, to avoid
323    misinterpreting IDs or counters as dates.
324
325    Args:
326        row: Normalized row dict.
327        columns: Optional column list to constrain the search.
328
329    Returns:
330        The first successfully parsed ``datetime``, or ``None``.
331    """
332    all_columns = columns if columns else list(row.keys())
333    timestamp_columns = [c for c in all_columns if looks_like_timestamp_column(c)]
334
335    # Pass 1: timestamp-named columns — full parsing including epochs.
336    for column in timestamp_columns:
337        parsed = parse_datetime_value(row.get(column, ""), allow_epoch=True)
338        if parsed is not None:
339            return parsed
340
341    # Pass 2: remaining columns — string dates only, no epoch integers.
342    timestamp_set = set(timestamp_columns)
343    for column in all_columns:
344        if column in timestamp_set:
345            continue
346        parsed = parse_datetime_value(row.get(column, ""), allow_epoch=False)
347        if parsed is not None:
348            return parsed
349
350    return None

Extract the first parseable timestamp from a CSV row.

Prioritizes columns whose names look like timestamps (with full parsing including epoch integers). Falls back to remaining columns but only accepts string-format dates — bare numeric values are not treated as epoch timestamps in the fallback pass, to avoid misinterpreting IDs or counters as dates.

Arguments:
  • row: Normalized row dict.
  • columns: Optional column list to constrain the search.
Returns:

The first successfully parsed datetime, or None.

def time_range_for_rows( rows: Iterable[dict[str, str]]) -> tuple[datetime.datetime | None, datetime.datetime | None]:
353def time_range_for_rows(rows: Iterable[dict[str, str]]) -> tuple[datetime | None, datetime | None]:
354    """Compute the earliest and latest timestamps across all rows.
355
356    Args:
357        rows: Iterable of row dicts to scan for timestamp values.
358
359    Returns:
360        A ``(min_time, max_time)`` tuple.
361    """
362    min_time: datetime | None = None
363    max_time: datetime | None = None
364    for row in rows:
365        parsed = extract_row_datetime(row=row)
366        if parsed is None:
367            continue
368        if min_time is None or parsed < min_time:
369            min_time = parsed
370        if max_time is None or parsed > max_time:
371            max_time = parsed
372    return min_time, max_time

Compute the earliest and latest timestamps across all rows.

Arguments:
  • rows: Iterable of row dicts to scan for timestamp values.
Returns:

A (min_time, max_time) tuple.

def normalize_artifact_key(artifact_key: str) -> str:
383def normalize_artifact_key(artifact_key: str) -> str:
384    """Normalize an artifact key to its canonical short form.
385
386    Args:
387        artifact_key: Raw artifact key string.
388
389    Returns:
390        The lowercased, normalized artifact key.
391    """
392    key = artifact_key.strip().lower()
393    if key == "mft":
394        return "mft"
395    if key.startswith("evtx") or key.endswith(".evtx") or ".evtx" in key:
396        return "evtx"
397    if key.startswith("shimcache"):
398        return "shimcache"
399    if key.startswith("amcache"):
400        return "amcache"
401    if key.startswith("prefetch"):
402        return "prefetch"
403    if key.startswith("services"):
404        return "services"
405    if key.startswith("tasks"):
406        return "tasks"
407    if key.startswith("userassist"):
408        return "userassist"
409    if key.startswith("runkeys"):
410        return "runkeys"
411    return key

Normalize an artifact key to its canonical short form.

Arguments:
  • artifact_key: Raw artifact key string.
Returns:

The lowercased, normalized artifact key.

def unique_preserve_order(values: Iterable[str]) -> list[str]:
142def unique_preserve_order(values: Iterable[str]) -> list[str]:
143    """Deduplicate strings while preserving first-occurrence order.
144
145    Values are stripped, surrounding quotes/brackets are removed, and
146    trailing punctuation is trimmed before deduplication (case-insensitive).
147
148    Args:
149        values: Iterable of raw string values to deduplicate.
150
151    Returns:
152        A list of cleaned, unique strings in their original order.
153    """
154    unique: list[str] = []
155    seen: set[str] = set()
156    for raw_value in values:
157        value = str(raw_value).strip()
158        value = value.strip("\"'()[]{}<>")
159        value = value.rstrip(".,;:")
160        if not value:
161            continue
162        key = value.lower()
163        if key in seen:
164            continue
165        seen.add(key)
166        unique.append(value)
167    return unique

Deduplicate strings while preserving first-occurrence order.

Values are stripped, surrounding quotes/brackets are removed, and trailing punctuation is trimmed before deduplication (case-insensitive).

Arguments:
  • values: Iterable of raw string values to deduplicate.
Returns:

A list of cleaned, unique strings in their original order.

def truncate_for_prompt(value: str, limit: int) -> str:
124def truncate_for_prompt(value: str, limit: int) -> str:
125    """Truncate a string to fit within a character limit for prompt inclusion.
126
127    Args:
128        value: The string to truncate.
129        limit: Maximum allowed character count.
130
131    Returns:
132        The original string if it fits, or a truncated version.
133    """
134    text = str(value or "").strip()
135    if len(text) <= limit:
136        return text
137    if limit <= 20:
138        return text[:limit]
139    return f"{text[: limit - 14].rstrip()} ... [truncated]"

Truncate a string to fit within a character limit for prompt inclusion.

Arguments:
  • value: The string to truncate.
  • limit: Maximum allowed character count.
Returns:

The original string if it fits, or a truncated version.

def extract_url_host(url: str) -> str:
418def extract_url_host(url: str) -> str:
419    """Extract the lowercase hostname from a URL string.
420
421    Args:
422        url: A URL string.
423
424    Returns:
425        The lowercased hostname portion without scheme, port, or path.
426    """
427    text = url.strip()
428    if "://" in text:
429        text = text.split("://", 1)[1]
430    text = text.split("/", 1)[0]
431    text = text.split(":", 1)[0]
432    return text.lower().strip()

Extract the lowercase hostname from a URL string.

Arguments:
  • url: A URL string.
Returns:

The lowercased hostname portion without scheme, port, or path.

def normalize_csv_row( row: dict[str | None, str | None | list[str]], columns: list[str]) -> dict[str, str]:
439def normalize_csv_row(row: dict[str | None, str | None | list[str]], columns: list[str]) -> dict[str, str]:
440    """Normalize a raw CSV DictReader row to a clean string-to-string dict.
441
442    Args:
443        row: Raw row dict from ``csv.DictReader``.
444        columns: Expected column names in the CSV.
445
446    Returns:
447        A normalized dict mapping column names to stripped string values.
448    """
449    normalized: dict[str, str] = {}
450    for column in columns:
451        normalized[column] = stringify_value(row.get(column))
452
453    extras = row.get(None)
454    if extras:
455        extra_values = [stringify_value(value) for value in extras]
456        normalized["__extra__"] = " | ".join(value for value in extra_values if value)
457
458    return normalized

Normalize a raw CSV DictReader row to a clean string-to-string dict.

Arguments:
  • row: Raw row dict from csv.DictReader.
  • columns: Expected column names in the CSV.
Returns:

A normalized dict mapping column names to stripped string values.

def coerce_projection_columns(value: Any) -> list[str]:
461def coerce_projection_columns(value: Any) -> list[str]:
462    """Coerce a raw YAML value into a deduplicated list of column names.
463
464    Args:
465        value: Raw value from the YAML config (string, list, or other).
466
467    Returns:
468        A deduplicated list of non-empty column name strings.
469    """
470    if isinstance(value, str):
471        candidates = [part.strip() for part in value.split(",")]
472    elif isinstance(value, list):
473        candidates = [str(item).strip() for item in value]
474    else:
475        return []
476
477    deduplicated: list[str] = []
478    for candidate in candidates:
479        if candidate and candidate not in deduplicated:
480            deduplicated.append(candidate)
481    return deduplicated

Coerce a raw YAML value into a deduplicated list of column names.

Arguments:
  • value: Raw value from the YAML config (string, list, or other).
Returns:

A deduplicated list of non-empty column name strings.

def emit_analysis_progress( progress_callback: Any, artifact_key: str, status: str, payload: dict[str, typing.Any]) -> None:
488def emit_analysis_progress(
489    progress_callback: Any,
490    artifact_key: str,
491    status: str,
492    payload: dict[str, Any],
493) -> None:
494    """Emit a progress event to the frontend via the callback.
495
496    Args:
497        progress_callback: The user-supplied progress callback.
498        artifact_key: Artifact identifier for the event.
499        status: Event status.
500        payload: Event payload dict.
501    """
502    try:
503        progress_callback(artifact_key, status, payload)
504        return
505    except TypeError:
506        pass
507    except Exception:
508        return
509
510    try:
511        progress_callback({
512            "artifact_key": artifact_key,
513            "status": status,
514            "result": payload,
515        })
516    except Exception:
517        return

Emit a progress event to the frontend via the callback.

Arguments:
  • progress_callback: The user-supplied progress callback.
  • artifact_key: Artifact identifier for the event.
  • status: Event status.
  • payload: Event payload dict.
def estimate_tokens(text: str, model_info: Optional[Mapping[str, str]] = None) -> int:
524def estimate_tokens(text: str, model_info: Mapping[str, str] | None = None) -> int:
525    """Estimate the token count of a text string.
526
527    When ``tiktoken`` is available and the provider is OpenAI-compatible,
528    an exact BPE token count is returned.  Otherwise a heuristic is used.
529
530    Args:
531        text: The text to estimate token count for.
532        model_info: Optional dict with ``provider`` and ``model`` keys.
533
534    Returns:
535        Estimated number of tokens (minimum 1).
536    """
537    if not text:
538        return 1
539
540    if _TIKTOKEN_AVAILABLE and model_info is not None:
541        provider_name = str(model_info.get("provider", "")).lower()
542        if provider_name in {"openai", "local", "custom"}:
543            model_name = str(model_info.get("model", ""))
544            try:
545                enc = tiktoken.encoding_for_model(model_name)
546            except KeyError:
547                try:
548                    enc = tiktoken.get_encoding("cl100k_base")
549                except Exception:
550                    enc = None
551            if enc is not None:
552                try:
553                    return max(1, len(enc.encode(text)))
554                except Exception:
555                    pass
556
557    ascii_chars: list[str] = []
558    non_ascii_count = 0
559    for ch in text:
560        if ord(ch) < 128:
561            ascii_chars.append(ch)
562        else:
563            non_ascii_count += 1
564
565    ascii_tokens = len(ascii_chars) / max(1, TOKEN_CHAR_RATIO)
566    non_ascii_tokens = non_ascii_count * 1.5
567    raw_estimate = ascii_tokens + non_ascii_tokens
568    with_margin = raw_estimate * 1.1
569
570    return max(1, int(with_margin))

Estimate the token count of a text string.

When tiktoken is available and the provider is OpenAI-compatible, an exact BPE token count is returned. Otherwise a heuristic is used.

Arguments:
  • text: The text to estimate token count for.
  • model_info: Optional dict with provider and model keys.
Returns:

Estimated number of tokens (minimum 1).

def is_dedup_safe_identifier_column(column_name: str) -> bool:
302def is_dedup_safe_identifier_column(column_name: str) -> bool:
303    """Return True only for auto-incremented record IDs safe for dedup.
304
305    Args:
306        column_name: CSV column header name.
307
308    Returns:
309        ``True`` if the column is a safe dedup identifier.
310    """
311    from .constants import DEDUP_SAFE_IDENTIFIER_HINTS
312    lowered = column_name.strip().lower().replace("-", "_").replace(" ", "_")
313    return lowered in DEDUP_SAFE_IDENTIFIER_HINTS

Return True only for auto-incremented record IDs safe for dedup.

Arguments:
  • column_name: CSV column header name.
Returns:

True if the column is a safe dedup identifier.

def normalize_os_type(os_type: str | None) -> str:
24def normalize_os_type(os_type: str | None) -> str:
25    """Normalize an OS type identifier to its canonical lowercase form.
26
27    Args:
28        os_type: Operating system identifier (e.g. ``"windows"``,
29            ``"linux"``, ``"Linux "``).  ``None`` or empty values
30            default to ``"windows"``.
31
32    Returns:
33        The lowercased, stripped OS type string, defaulting to
34        ``"windows"`` when *os_type* is falsy.
35    """
36    return str(os_type).strip().lower() if os_type else "windows"

Normalize an OS type identifier to its canonical lowercase form.

Arguments:
  • os_type: Operating system identifier (e.g. "windows", "linux", "Linux "). None or empty values default to "windows".
Returns:

The lowercased, stripped OS type string, defaulting to "windows" when os_type is falsy.

def read_int_setting( analysis_config: Mapping[str, Any], key: str, default: int, minimum: int = 1, maximum: int | None = None) -> int:
577def read_int_setting(
578    analysis_config: Mapping[str, Any], key: str, default: int,
579    minimum: int = 1, maximum: int | None = None,
580) -> int:
581    """Read an integer setting with bounds clamping.
582
583    Args:
584        analysis_config: The ``analysis`` sub-dictionary.
585        key: Configuration key name.
586        default: Default value.
587        minimum: Lower bound (inclusive).
588        maximum: Optional upper bound (inclusive).
589
590    Returns:
591        The parsed and clamped integer value.
592    """
593    raw_value = analysis_config.get(key, default)
594    try:
595        parsed_value = int(raw_value)
596    except (TypeError, ValueError):
597        parsed_value = default
598    if parsed_value < minimum:
599        parsed_value = minimum
600    if maximum is not None and parsed_value > maximum:
601        parsed_value = maximum
602    return parsed_value

Read an integer setting with bounds clamping.

Arguments:
  • analysis_config: The analysis sub-dictionary.
  • key: Configuration key name.
  • default: Default value.
  • minimum: Lower bound (inclusive).
  • maximum: Optional upper bound (inclusive).
Returns:

The parsed and clamped integer value.

def read_bool_setting(analysis_config: Mapping[str, Any], key: str, default: bool) -> bool:
605def read_bool_setting(analysis_config: Mapping[str, Any], key: str, default: bool) -> bool:
606    """Read a boolean setting from the analysis config.
607
608    Args:
609        analysis_config: The ``analysis`` sub-dictionary.
610        key: Configuration key name.
611        default: Default value.
612
613    Returns:
614        The parsed boolean value.
615    """
616    raw_value = analysis_config.get(key, default)
617    if isinstance(raw_value, bool):
618        return raw_value
619    if isinstance(raw_value, str):
620        lowered = raw_value.strip().lower()
621        if lowered in {"true", "1", "yes", "on"}:
622            return True
623        if lowered in {"false", "0", "no", "off"}:
624            return False
625    if isinstance(raw_value, (int, float)):
626        return bool(raw_value)
627    return default

Read a boolean setting from the analysis config.

Arguments:
  • analysis_config: The analysis sub-dictionary.
  • key: Configuration key name.
  • default: Default value.
Returns:

The parsed boolean value.

def read_path_setting(analysis_config: Mapping[str, Any], key: str, default: str) -> str:
630def read_path_setting(analysis_config: Mapping[str, Any], key: str, default: str) -> str:
631    """Read a file-path setting from the analysis config.
632
633    Args:
634        analysis_config: The ``analysis`` sub-dictionary.
635        key: Configuration key name.
636        default: Default value.
637
638    Returns:
639        The cleaned path string.
640    """
641    raw_value = analysis_config.get(key, default)
642    if isinstance(raw_value, (str, Path)):
643        cleaned = str(raw_value).strip()
644        if cleaned:
645            return cleaned
646    return default

Read a file-path setting from the analysis config.

Arguments:
  • analysis_config: The analysis sub-dictionary.
  • key: Configuration key name.
  • default: Default value.
Returns:

The cleaned path string.