app.analyzer.utils
Pure utility functions for the forensic analyzer pipeline.
Provides string manipulation, datetime parsing, CSV normalisation, filename sanitisation, token estimation, and other stateless helpers used across the analyzer sub-modules.
1"""Pure utility functions for the forensic analyzer pipeline. 2 3Provides string manipulation, datetime parsing, CSV normalisation, filename 4sanitisation, token estimation, and other stateless helpers used across the 5analyzer sub-modules. 6""" 7 8from __future__ import annotations 9 10import re 11from datetime import datetime, timezone 12from pathlib import Path 13from typing import Any, Iterable, Mapping 14 15from .constants import ( 16 INTEGER_RE, 17 TIMESTAMP_COLUMN_HINTS, 18 TOKEN_CHAR_RATIO, 19) 20 21try: 22 import tiktoken 23 _TIKTOKEN_AVAILABLE = True 24except ImportError: 25 _TIKTOKEN_AVAILABLE = False 26 27__all__ = [ 28 "stringify_value", 29 "format_datetime", 30 "normalize_table_cell", 31 "sanitize_filename", 32 "build_datetime", 33 "parse_int", 34 "normalize_datetime", 35 "parse_datetime_value", 36 "looks_like_timestamp_column", 37 "extract_row_datetime", 38 "time_range_for_rows", 39 "normalize_artifact_key", 40 "unique_preserve_order", 41 "truncate_for_prompt", 42 "extract_url_host", 43 "normalize_csv_row", 44 "coerce_projection_columns", 45 "emit_analysis_progress", 46 "estimate_tokens", 47 "is_dedup_safe_identifier_column", 48 "normalize_os_type", 49 "read_int_setting", 50 "read_bool_setting", 51 "read_path_setting", 52] 53 54 55# --------------------------------------------------------------------------- 56# String helpers 57# --------------------------------------------------------------------------- 58 59def stringify_value(value: Any) -> str: 60 """Convert an arbitrary value to a stripped string. 61 62 Args: 63 value: Any value (string, ``None``, number, etc.). 64 65 Returns: 66 The stripped string representation, or an empty string for ``None``. 67 """ 68 if value is None: 69 return "" 70 if isinstance(value, str): 71 return value.strip() 72 return str(value).strip() 73 74 75def format_datetime(value: datetime | None) -> str: 76 """Format a datetime as an ISO string, or ``"N/A"`` for ``None``. 77 78 Args: 79 value: Datetime to format, or ``None``. 80 81 Returns: 82 ISO-formatted datetime string or ``"N/A"``. 83 """ 84 if value is None: 85 return "N/A" 86 return value.isoformat() 87 88 89def normalize_table_cell(value: str, cell_limit: int) -> str: 90 """Normalize and truncate a cell value for table/statistics display. 91 92 Replaces newlines and pipe characters, strips whitespace, and 93 truncates with an ellipsis if the value exceeds *cell_limit*. 94 95 Args: 96 value: Raw cell value string. 97 cell_limit: Maximum character length for the output. 98 99 Returns: 100 The cleaned and possibly truncated string. 101 """ 102 text = value.replace("\r", " ").replace("\n", " ").replace("|", r"\|").strip() 103 if len(text) <= cell_limit: 104 return text 105 if cell_limit <= 3: 106 return text[:cell_limit] 107 return f"{text[: cell_limit - 3]}..." 108 109 110def sanitize_filename(value: str) -> str: 111 """Sanitize a string for use as a safe filename. 112 113 Args: 114 value: Raw string to sanitize. 115 116 Returns: 117 A filesystem-safe filename string, or ``"artifact"`` if empty. 118 """ 119 cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_") 120 return cleaned or "artifact" 121 122 123def truncate_for_prompt(value: str, limit: int) -> str: 124 """Truncate a string to fit within a character limit for prompt inclusion. 125 126 Args: 127 value: The string to truncate. 128 limit: Maximum allowed character count. 129 130 Returns: 131 The original string if it fits, or a truncated version. 132 """ 133 text = str(value or "").strip() 134 if len(text) <= limit: 135 return text 136 if limit <= 20: 137 return text[:limit] 138 return f"{text[: limit - 14].rstrip()} ... [truncated]" 139 140 141def unique_preserve_order(values: Iterable[str]) -> list[str]: 142 """Deduplicate strings while preserving first-occurrence order. 143 144 Values are stripped, surrounding quotes/brackets are removed, and 145 trailing punctuation is trimmed before deduplication (case-insensitive). 146 147 Args: 148 values: Iterable of raw string values to deduplicate. 149 150 Returns: 151 A list of cleaned, unique strings in their original order. 152 """ 153 unique: list[str] = [] 154 seen: set[str] = set() 155 for raw_value in values: 156 value = str(raw_value).strip() 157 value = value.strip("\"'()[]{}<>") 158 value = value.rstrip(".,;:") 159 if not value: 160 continue 161 key = value.lower() 162 if key in seen: 163 continue 164 seen.add(key) 165 unique.append(value) 166 return unique 167 168 169# --------------------------------------------------------------------------- 170# Datetime helpers 171# --------------------------------------------------------------------------- 172 173def build_datetime(year: str, month: str, day: str) -> datetime | None: 174 """Construct a datetime from string year, month, and day components. 175 176 Args: 177 year: Year string (e.g., ``"2025"``). 178 month: Month string (``"1"`` through ``"12"``). 179 day: Day string (``"1"`` through ``"31"``). 180 181 Returns: 182 A ``datetime`` at midnight for the given date, or ``None``. 183 """ 184 try: 185 return datetime(int(year), int(month), int(day)) 186 except ValueError: 187 return None 188 189 190def normalize_datetime(value: datetime) -> datetime: 191 """Convert a datetime to a naive UTC datetime. 192 193 Args: 194 value: Datetime to normalize. 195 196 Returns: 197 A naive ``datetime`` representing the same instant in UTC. 198 """ 199 if value.tzinfo is None: 200 return value 201 return value.astimezone(timezone.utc).replace(tzinfo=None) 202 203 204def parse_int(value: str) -> int | None: 205 """Extract and parse the first integer from a string. 206 207 Args: 208 value: String that may contain an integer. 209 210 Returns: 211 The parsed integer, or ``None``. 212 """ 213 if not value: 214 return None 215 match = INTEGER_RE.search(value) 216 if not match: 217 return None 218 try: 219 return int(match.group()) 220 except ValueError: 221 return None 222 223 224def parse_datetime_value(value: str, *, allow_epoch: bool = True) -> datetime | None: 225 """Attempt to parse a string value into a naive UTC datetime. 226 227 Tries ISO format first, then common date/time formats, and optionally 228 epoch timestamps (seconds or milliseconds). 229 230 Args: 231 value: Raw string that may contain a date or timestamp. 232 allow_epoch: If ``True`` (default), bare integers in the plausible 233 epoch range are accepted. Set to ``False`` when scanning 234 columns that are not known to hold timestamps, to avoid 235 misinterpreting numeric IDs or counters as dates. 236 237 Returns: 238 A naive ``datetime`` in UTC, or ``None`` if parsing fails. 239 """ 240 text = stringify_value(value) 241 if not text: 242 return None 243 244 cleaned = text.replace("Z", "+00:00") 245 try: 246 parsed = datetime.fromisoformat(cleaned) 247 return normalize_datetime(parsed) 248 except ValueError: 249 pass 250 251 for fmt in ( 252 "%Y-%m-%d %H:%M:%S.%f%z", 253 "%Y-%m-%d %H:%M:%S%z", 254 "%Y-%m-%d %H:%M:%S.%f", 255 "%Y-%m-%d %H:%M:%S", 256 "%Y-%m-%d", 257 "%d-%m-%Y", 258 "%d/%m/%Y", 259 "%m/%d/%Y", 260 "%B %d, %Y", 261 "%b %d, %Y", 262 "%B %d %Y", 263 "%b %d %Y", 264 ): 265 try: 266 parsed = datetime.strptime(cleaned, fmt) 267 return normalize_datetime(parsed) 268 except ValueError: 269 continue 270 271 if not allow_epoch: 272 return None 273 274 int_value = parse_int(cleaned) 275 if int_value is not None: 276 if int_value > 1_000_000_000_000: 277 int_value //= 1000 278 if 946684800 <= int_value <= 4_102_444_800: 279 try: 280 parsed = datetime.fromtimestamp(int_value, tz=timezone.utc) 281 return normalize_datetime(parsed) 282 except (ValueError, OSError): 283 return None 284 285 return None 286 287 288def looks_like_timestamp_column(column_name: str) -> bool: 289 """Check whether a column name suggests it contains timestamp data. 290 291 Args: 292 column_name: CSV column header name. 293 294 Returns: 295 ``True`` if the lowercased name contains any timestamp hint substring. 296 """ 297 lowered = column_name.strip().lower() 298 return any(hint in lowered for hint in TIMESTAMP_COLUMN_HINTS) 299 300 301def is_dedup_safe_identifier_column(column_name: str) -> bool: 302 """Return True only for auto-incremented record IDs safe for dedup. 303 304 Args: 305 column_name: CSV column header name. 306 307 Returns: 308 ``True`` if the column is a safe dedup identifier. 309 """ 310 from .constants import DEDUP_SAFE_IDENTIFIER_HINTS 311 lowered = column_name.strip().lower().replace("-", "_").replace(" ", "_") 312 return lowered in DEDUP_SAFE_IDENTIFIER_HINTS 313 314 315def extract_row_datetime(row: dict[str, str], columns: list[str] | None = None) -> datetime | None: 316 """Extract the first parseable timestamp from a CSV row. 317 318 Prioritizes columns whose names look like timestamps (with full 319 parsing including epoch integers). Falls back to remaining columns 320 but only accepts string-format dates — bare numeric values are 321 **not** treated as epoch timestamps in the fallback pass, to avoid 322 misinterpreting IDs or counters as dates. 323 324 Args: 325 row: Normalized row dict. 326 columns: Optional column list to constrain the search. 327 328 Returns: 329 The first successfully parsed ``datetime``, or ``None``. 330 """ 331 all_columns = columns if columns else list(row.keys()) 332 timestamp_columns = [c for c in all_columns if looks_like_timestamp_column(c)] 333 334 # Pass 1: timestamp-named columns — full parsing including epochs. 335 for column in timestamp_columns: 336 parsed = parse_datetime_value(row.get(column, ""), allow_epoch=True) 337 if parsed is not None: 338 return parsed 339 340 # Pass 2: remaining columns — string dates only, no epoch integers. 341 timestamp_set = set(timestamp_columns) 342 for column in all_columns: 343 if column in timestamp_set: 344 continue 345 parsed = parse_datetime_value(row.get(column, ""), allow_epoch=False) 346 if parsed is not None: 347 return parsed 348 349 return None 350 351 352def time_range_for_rows(rows: Iterable[dict[str, str]]) -> tuple[datetime | None, datetime | None]: 353 """Compute the earliest and latest timestamps across all rows. 354 355 Args: 356 rows: Iterable of row dicts to scan for timestamp values. 357 358 Returns: 359 A ``(min_time, max_time)`` tuple. 360 """ 361 min_time: datetime | None = None 362 max_time: datetime | None = None 363 for row in rows: 364 parsed = extract_row_datetime(row=row) 365 if parsed is None: 366 continue 367 if min_time is None or parsed < min_time: 368 min_time = parsed 369 if max_time is None or parsed > max_time: 370 max_time = parsed 371 return min_time, max_time 372 373 374# --------------------------------------------------------------------------- 375# Artifact key normalisation 376# --------------------------------------------------------------------------- 377 378# Re-export from the shared module so existing callers are not broken. 379from ..os_utils import normalize_os_type as normalize_os_type # noqa: F401, PLC0414 380 381 382def normalize_artifact_key(artifact_key: str) -> str: 383 """Normalize an artifact key to its canonical short form. 384 385 Args: 386 artifact_key: Raw artifact key string. 387 388 Returns: 389 The lowercased, normalized artifact key. 390 """ 391 key = artifact_key.strip().lower() 392 if key == "mft": 393 return "mft" 394 if key.startswith("evtx") or key.endswith(".evtx") or ".evtx" in key: 395 return "evtx" 396 if key.startswith("shimcache"): 397 return "shimcache" 398 if key.startswith("amcache"): 399 return "amcache" 400 if key.startswith("prefetch"): 401 return "prefetch" 402 if key.startswith("services"): 403 return "services" 404 if key.startswith("tasks"): 405 return "tasks" 406 if key.startswith("userassist"): 407 return "userassist" 408 if key.startswith("runkeys"): 409 return "runkeys" 410 return key 411 412 413# --------------------------------------------------------------------------- 414# URL host extraction 415# --------------------------------------------------------------------------- 416 417def extract_url_host(url: str) -> str: 418 """Extract the lowercase hostname from a URL string. 419 420 Args: 421 url: A URL string. 422 423 Returns: 424 The lowercased hostname portion without scheme, port, or path. 425 """ 426 text = url.strip() 427 if "://" in text: 428 text = text.split("://", 1)[1] 429 text = text.split("/", 1)[0] 430 text = text.split(":", 1)[0] 431 return text.lower().strip() 432 433 434# --------------------------------------------------------------------------- 435# CSV normalisation 436# --------------------------------------------------------------------------- 437 438def normalize_csv_row(row: dict[str | None, str | None | list[str]], columns: list[str]) -> dict[str, str]: 439 """Normalize a raw CSV DictReader row to a clean string-to-string dict. 440 441 Args: 442 row: Raw row dict from ``csv.DictReader``. 443 columns: Expected column names in the CSV. 444 445 Returns: 446 A normalized dict mapping column names to stripped string values. 447 """ 448 normalized: dict[str, str] = {} 449 for column in columns: 450 normalized[column] = stringify_value(row.get(column)) 451 452 extras = row.get(None) 453 if extras: 454 extra_values = [stringify_value(value) for value in extras] 455 normalized["__extra__"] = " | ".join(value for value in extra_values if value) 456 457 return normalized 458 459 460def coerce_projection_columns(value: Any) -> list[str]: 461 """Coerce a raw YAML value into a deduplicated list of column names. 462 463 Args: 464 value: Raw value from the YAML config (string, list, or other). 465 466 Returns: 467 A deduplicated list of non-empty column name strings. 468 """ 469 if isinstance(value, str): 470 candidates = [part.strip() for part in value.split(",")] 471 elif isinstance(value, list): 472 candidates = [str(item).strip() for item in value] 473 else: 474 return [] 475 476 deduplicated: list[str] = [] 477 for candidate in candidates: 478 if candidate and candidate not in deduplicated: 479 deduplicated.append(candidate) 480 return deduplicated 481 482 483# --------------------------------------------------------------------------- 484# Progress callback 485# --------------------------------------------------------------------------- 486 487def emit_analysis_progress( 488 progress_callback: Any, 489 artifact_key: str, 490 status: str, 491 payload: dict[str, Any], 492) -> None: 493 """Emit a progress event to the frontend via the callback. 494 495 Args: 496 progress_callback: The user-supplied progress callback. 497 artifact_key: Artifact identifier for the event. 498 status: Event status. 499 payload: Event payload dict. 500 """ 501 try: 502 progress_callback(artifact_key, status, payload) 503 return 504 except TypeError: 505 pass 506 except Exception: 507 return 508 509 try: 510 progress_callback({ 511 "artifact_key": artifact_key, 512 "status": status, 513 "result": payload, 514 }) 515 except Exception: 516 return 517 518 519# --------------------------------------------------------------------------- 520# Token estimation 521# --------------------------------------------------------------------------- 522 523def estimate_tokens(text: str, model_info: Mapping[str, str] | None = None) -> int: 524 """Estimate the token count of a text string. 525 526 When ``tiktoken`` is available and the provider is OpenAI-compatible, 527 an exact BPE token count is returned. Otherwise a heuristic is used. 528 529 Args: 530 text: The text to estimate token count for. 531 model_info: Optional dict with ``provider`` and ``model`` keys. 532 533 Returns: 534 Estimated number of tokens (minimum 1). 535 """ 536 if not text: 537 return 1 538 539 if _TIKTOKEN_AVAILABLE and model_info is not None: 540 provider_name = str(model_info.get("provider", "")).lower() 541 if provider_name in {"openai", "local", "custom"}: 542 model_name = str(model_info.get("model", "")) 543 try: 544 enc = tiktoken.encoding_for_model(model_name) 545 except KeyError: 546 try: 547 enc = tiktoken.get_encoding("cl100k_base") 548 except Exception: 549 enc = None 550 if enc is not None: 551 try: 552 return max(1, len(enc.encode(text))) 553 except Exception: 554 pass 555 556 ascii_chars: list[str] = [] 557 non_ascii_count = 0 558 for ch in text: 559 if ord(ch) < 128: 560 ascii_chars.append(ch) 561 else: 562 non_ascii_count += 1 563 564 ascii_tokens = len(ascii_chars) / max(1, TOKEN_CHAR_RATIO) 565 non_ascii_tokens = non_ascii_count * 1.5 566 raw_estimate = ascii_tokens + non_ascii_tokens 567 with_margin = raw_estimate * 1.1 568 569 return max(1, int(with_margin)) 570 571 572# --------------------------------------------------------------------------- 573# Config setting readers 574# --------------------------------------------------------------------------- 575 576def read_int_setting( 577 analysis_config: Mapping[str, Any], key: str, default: int, 578 minimum: int = 1, maximum: int | None = None, 579) -> int: 580 """Read an integer setting with bounds clamping. 581 582 Args: 583 analysis_config: The ``analysis`` sub-dictionary. 584 key: Configuration key name. 585 default: Default value. 586 minimum: Lower bound (inclusive). 587 maximum: Optional upper bound (inclusive). 588 589 Returns: 590 The parsed and clamped integer value. 591 """ 592 raw_value = analysis_config.get(key, default) 593 try: 594 parsed_value = int(raw_value) 595 except (TypeError, ValueError): 596 parsed_value = default 597 if parsed_value < minimum: 598 parsed_value = minimum 599 if maximum is not None and parsed_value > maximum: 600 parsed_value = maximum 601 return parsed_value 602 603 604def read_bool_setting(analysis_config: Mapping[str, Any], key: str, default: bool) -> bool: 605 """Read a boolean setting from the analysis config. 606 607 Args: 608 analysis_config: The ``analysis`` sub-dictionary. 609 key: Configuration key name. 610 default: Default value. 611 612 Returns: 613 The parsed boolean value. 614 """ 615 raw_value = analysis_config.get(key, default) 616 if isinstance(raw_value, bool): 617 return raw_value 618 if isinstance(raw_value, str): 619 lowered = raw_value.strip().lower() 620 if lowered in {"true", "1", "yes", "on"}: 621 return True 622 if lowered in {"false", "0", "no", "off"}: 623 return False 624 if isinstance(raw_value, (int, float)): 625 return bool(raw_value) 626 return default 627 628 629def read_path_setting(analysis_config: Mapping[str, Any], key: str, default: str) -> str: 630 """Read a file-path setting from the analysis config. 631 632 Args: 633 analysis_config: The ``analysis`` sub-dictionary. 634 key: Configuration key name. 635 default: Default value. 636 637 Returns: 638 The cleaned path string. 639 """ 640 raw_value = analysis_config.get(key, default) 641 if isinstance(raw_value, (str, Path)): 642 cleaned = str(raw_value).strip() 643 if cleaned: 644 return cleaned 645 return default
60def stringify_value(value: Any) -> str: 61 """Convert an arbitrary value to a stripped string. 62 63 Args: 64 value: Any value (string, ``None``, number, etc.). 65 66 Returns: 67 The stripped string representation, or an empty string for ``None``. 68 """ 69 if value is None: 70 return "" 71 if isinstance(value, str): 72 return value.strip() 73 return str(value).strip()
Convert an arbitrary value to a stripped string.
Arguments:
- value: Any value (string,
None, number, etc.).
Returns:
The stripped string representation, or an empty string for
None.
76def format_datetime(value: datetime | None) -> str: 77 """Format a datetime as an ISO string, or ``"N/A"`` for ``None``. 78 79 Args: 80 value: Datetime to format, or ``None``. 81 82 Returns: 83 ISO-formatted datetime string or ``"N/A"``. 84 """ 85 if value is None: 86 return "N/A" 87 return value.isoformat()
Format a datetime as an ISO string, or "N/A" for None.
Arguments:
- value: Datetime to format, or
None.
Returns:
ISO-formatted datetime string or
"N/A".
90def normalize_table_cell(value: str, cell_limit: int) -> str: 91 """Normalize and truncate a cell value for table/statistics display. 92 93 Replaces newlines and pipe characters, strips whitespace, and 94 truncates with an ellipsis if the value exceeds *cell_limit*. 95 96 Args: 97 value: Raw cell value string. 98 cell_limit: Maximum character length for the output. 99 100 Returns: 101 The cleaned and possibly truncated string. 102 """ 103 text = value.replace("\r", " ").replace("\n", " ").replace("|", r"\|").strip() 104 if len(text) <= cell_limit: 105 return text 106 if cell_limit <= 3: 107 return text[:cell_limit] 108 return f"{text[: cell_limit - 3]}..."
Normalize and truncate a cell value for table/statistics display.
Replaces newlines and pipe characters, strips whitespace, and truncates with an ellipsis if the value exceeds cell_limit.
Arguments:
- value: Raw cell value string.
- cell_limit: Maximum character length for the output.
Returns:
The cleaned and possibly truncated string.
111def sanitize_filename(value: str) -> str: 112 """Sanitize a string for use as a safe filename. 113 114 Args: 115 value: Raw string to sanitize. 116 117 Returns: 118 A filesystem-safe filename string, or ``"artifact"`` if empty. 119 """ 120 cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("_") 121 return cleaned or "artifact"
Sanitize a string for use as a safe filename.
Arguments:
- value: Raw string to sanitize.
Returns:
A filesystem-safe filename string, or
"artifact"if empty.
174def build_datetime(year: str, month: str, day: str) -> datetime | None: 175 """Construct a datetime from string year, month, and day components. 176 177 Args: 178 year: Year string (e.g., ``"2025"``). 179 month: Month string (``"1"`` through ``"12"``). 180 day: Day string (``"1"`` through ``"31"``). 181 182 Returns: 183 A ``datetime`` at midnight for the given date, or ``None``. 184 """ 185 try: 186 return datetime(int(year), int(month), int(day)) 187 except ValueError: 188 return None
Construct a datetime from string year, month, and day components.
Arguments:
- year: Year string (e.g.,
"2025"). - month: Month string (
"1"through"12"). - day: Day string (
"1"through"31").
Returns:
A
datetimeat midnight for the given date, orNone.
205def parse_int(value: str) -> int | None: 206 """Extract and parse the first integer from a string. 207 208 Args: 209 value: String that may contain an integer. 210 211 Returns: 212 The parsed integer, or ``None``. 213 """ 214 if not value: 215 return None 216 match = INTEGER_RE.search(value) 217 if not match: 218 return None 219 try: 220 return int(match.group()) 221 except ValueError: 222 return None
Extract and parse the first integer from a string.
Arguments:
- value: String that may contain an integer.
Returns:
The parsed integer, or
None.
191def normalize_datetime(value: datetime) -> datetime: 192 """Convert a datetime to a naive UTC datetime. 193 194 Args: 195 value: Datetime to normalize. 196 197 Returns: 198 A naive ``datetime`` representing the same instant in UTC. 199 """ 200 if value.tzinfo is None: 201 return value 202 return value.astimezone(timezone.utc).replace(tzinfo=None)
Convert a datetime to a naive UTC datetime.
Arguments:
- value: Datetime to normalize.
Returns:
A naive
datetimerepresenting the same instant in UTC.
225def parse_datetime_value(value: str, *, allow_epoch: bool = True) -> datetime | None: 226 """Attempt to parse a string value into a naive UTC datetime. 227 228 Tries ISO format first, then common date/time formats, and optionally 229 epoch timestamps (seconds or milliseconds). 230 231 Args: 232 value: Raw string that may contain a date or timestamp. 233 allow_epoch: If ``True`` (default), bare integers in the plausible 234 epoch range are accepted. Set to ``False`` when scanning 235 columns that are not known to hold timestamps, to avoid 236 misinterpreting numeric IDs or counters as dates. 237 238 Returns: 239 A naive ``datetime`` in UTC, or ``None`` if parsing fails. 240 """ 241 text = stringify_value(value) 242 if not text: 243 return None 244 245 cleaned = text.replace("Z", "+00:00") 246 try: 247 parsed = datetime.fromisoformat(cleaned) 248 return normalize_datetime(parsed) 249 except ValueError: 250 pass 251 252 for fmt in ( 253 "%Y-%m-%d %H:%M:%S.%f%z", 254 "%Y-%m-%d %H:%M:%S%z", 255 "%Y-%m-%d %H:%M:%S.%f", 256 "%Y-%m-%d %H:%M:%S", 257 "%Y-%m-%d", 258 "%d-%m-%Y", 259 "%d/%m/%Y", 260 "%m/%d/%Y", 261 "%B %d, %Y", 262 "%b %d, %Y", 263 "%B %d %Y", 264 "%b %d %Y", 265 ): 266 try: 267 parsed = datetime.strptime(cleaned, fmt) 268 return normalize_datetime(parsed) 269 except ValueError: 270 continue 271 272 if not allow_epoch: 273 return None 274 275 int_value = parse_int(cleaned) 276 if int_value is not None: 277 if int_value > 1_000_000_000_000: 278 int_value //= 1000 279 if 946684800 <= int_value <= 4_102_444_800: 280 try: 281 parsed = datetime.fromtimestamp(int_value, tz=timezone.utc) 282 return normalize_datetime(parsed) 283 except (ValueError, OSError): 284 return None 285 286 return None
Attempt to parse a string value into a naive UTC datetime.
Tries ISO format first, then common date/time formats, and optionally epoch timestamps (seconds or milliseconds).
Arguments:
- value: Raw string that may contain a date or timestamp.
- allow_epoch: If
True(default), bare integers in the plausible epoch range are accepted. Set toFalsewhen scanning columns that are not known to hold timestamps, to avoid misinterpreting numeric IDs or counters as dates.
Returns:
A naive
datetimein UTC, orNoneif parsing fails.
289def looks_like_timestamp_column(column_name: str) -> bool: 290 """Check whether a column name suggests it contains timestamp data. 291 292 Args: 293 column_name: CSV column header name. 294 295 Returns: 296 ``True`` if the lowercased name contains any timestamp hint substring. 297 """ 298 lowered = column_name.strip().lower() 299 return any(hint in lowered for hint in TIMESTAMP_COLUMN_HINTS)
Check whether a column name suggests it contains timestamp data.
Arguments:
- column_name: CSV column header name.
Returns:
Trueif the lowercased name contains any timestamp hint substring.
316def extract_row_datetime(row: dict[str, str], columns: list[str] | None = None) -> datetime | None: 317 """Extract the first parseable timestamp from a CSV row. 318 319 Prioritizes columns whose names look like timestamps (with full 320 parsing including epoch integers). Falls back to remaining columns 321 but only accepts string-format dates — bare numeric values are 322 **not** treated as epoch timestamps in the fallback pass, to avoid 323 misinterpreting IDs or counters as dates. 324 325 Args: 326 row: Normalized row dict. 327 columns: Optional column list to constrain the search. 328 329 Returns: 330 The first successfully parsed ``datetime``, or ``None``. 331 """ 332 all_columns = columns if columns else list(row.keys()) 333 timestamp_columns = [c for c in all_columns if looks_like_timestamp_column(c)] 334 335 # Pass 1: timestamp-named columns — full parsing including epochs. 336 for column in timestamp_columns: 337 parsed = parse_datetime_value(row.get(column, ""), allow_epoch=True) 338 if parsed is not None: 339 return parsed 340 341 # Pass 2: remaining columns — string dates only, no epoch integers. 342 timestamp_set = set(timestamp_columns) 343 for column in all_columns: 344 if column in timestamp_set: 345 continue 346 parsed = parse_datetime_value(row.get(column, ""), allow_epoch=False) 347 if parsed is not None: 348 return parsed 349 350 return None
Extract the first parseable timestamp from a CSV row.
Prioritizes columns whose names look like timestamps (with full parsing including epoch integers). Falls back to remaining columns but only accepts string-format dates — bare numeric values are not treated as epoch timestamps in the fallback pass, to avoid misinterpreting IDs or counters as dates.
Arguments:
- row: Normalized row dict.
- columns: Optional column list to constrain the search.
Returns:
The first successfully parsed
datetime, orNone.
353def time_range_for_rows(rows: Iterable[dict[str, str]]) -> tuple[datetime | None, datetime | None]: 354 """Compute the earliest and latest timestamps across all rows. 355 356 Args: 357 rows: Iterable of row dicts to scan for timestamp values. 358 359 Returns: 360 A ``(min_time, max_time)`` tuple. 361 """ 362 min_time: datetime | None = None 363 max_time: datetime | None = None 364 for row in rows: 365 parsed = extract_row_datetime(row=row) 366 if parsed is None: 367 continue 368 if min_time is None or parsed < min_time: 369 min_time = parsed 370 if max_time is None or parsed > max_time: 371 max_time = parsed 372 return min_time, max_time
Compute the earliest and latest timestamps across all rows.
Arguments:
- rows: Iterable of row dicts to scan for timestamp values.
Returns:
A
(min_time, max_time)tuple.
383def normalize_artifact_key(artifact_key: str) -> str: 384 """Normalize an artifact key to its canonical short form. 385 386 Args: 387 artifact_key: Raw artifact key string. 388 389 Returns: 390 The lowercased, normalized artifact key. 391 """ 392 key = artifact_key.strip().lower() 393 if key == "mft": 394 return "mft" 395 if key.startswith("evtx") or key.endswith(".evtx") or ".evtx" in key: 396 return "evtx" 397 if key.startswith("shimcache"): 398 return "shimcache" 399 if key.startswith("amcache"): 400 return "amcache" 401 if key.startswith("prefetch"): 402 return "prefetch" 403 if key.startswith("services"): 404 return "services" 405 if key.startswith("tasks"): 406 return "tasks" 407 if key.startswith("userassist"): 408 return "userassist" 409 if key.startswith("runkeys"): 410 return "runkeys" 411 return key
Normalize an artifact key to its canonical short form.
Arguments:
- artifact_key: Raw artifact key string.
Returns:
The lowercased, normalized artifact key.
142def unique_preserve_order(values: Iterable[str]) -> list[str]: 143 """Deduplicate strings while preserving first-occurrence order. 144 145 Values are stripped, surrounding quotes/brackets are removed, and 146 trailing punctuation is trimmed before deduplication (case-insensitive). 147 148 Args: 149 values: Iterable of raw string values to deduplicate. 150 151 Returns: 152 A list of cleaned, unique strings in their original order. 153 """ 154 unique: list[str] = [] 155 seen: set[str] = set() 156 for raw_value in values: 157 value = str(raw_value).strip() 158 value = value.strip("\"'()[]{}<>") 159 value = value.rstrip(".,;:") 160 if not value: 161 continue 162 key = value.lower() 163 if key in seen: 164 continue 165 seen.add(key) 166 unique.append(value) 167 return unique
Deduplicate strings while preserving first-occurrence order.
Values are stripped, surrounding quotes/brackets are removed, and trailing punctuation is trimmed before deduplication (case-insensitive).
Arguments:
- values: Iterable of raw string values to deduplicate.
Returns:
A list of cleaned, unique strings in their original order.
124def truncate_for_prompt(value: str, limit: int) -> str: 125 """Truncate a string to fit within a character limit for prompt inclusion. 126 127 Args: 128 value: The string to truncate. 129 limit: Maximum allowed character count. 130 131 Returns: 132 The original string if it fits, or a truncated version. 133 """ 134 text = str(value or "").strip() 135 if len(text) <= limit: 136 return text 137 if limit <= 20: 138 return text[:limit] 139 return f"{text[: limit - 14].rstrip()} ... [truncated]"
Truncate a string to fit within a character limit for prompt inclusion.
Arguments:
- value: The string to truncate.
- limit: Maximum allowed character count.
Returns:
The original string if it fits, or a truncated version.
418def extract_url_host(url: str) -> str: 419 """Extract the lowercase hostname from a URL string. 420 421 Args: 422 url: A URL string. 423 424 Returns: 425 The lowercased hostname portion without scheme, port, or path. 426 """ 427 text = url.strip() 428 if "://" in text: 429 text = text.split("://", 1)[1] 430 text = text.split("/", 1)[0] 431 text = text.split(":", 1)[0] 432 return text.lower().strip()
Extract the lowercase hostname from a URL string.
Arguments:
- url: A URL string.
Returns:
The lowercased hostname portion without scheme, port, or path.
439def normalize_csv_row(row: dict[str | None, str | None | list[str]], columns: list[str]) -> dict[str, str]: 440 """Normalize a raw CSV DictReader row to a clean string-to-string dict. 441 442 Args: 443 row: Raw row dict from ``csv.DictReader``. 444 columns: Expected column names in the CSV. 445 446 Returns: 447 A normalized dict mapping column names to stripped string values. 448 """ 449 normalized: dict[str, str] = {} 450 for column in columns: 451 normalized[column] = stringify_value(row.get(column)) 452 453 extras = row.get(None) 454 if extras: 455 extra_values = [stringify_value(value) for value in extras] 456 normalized["__extra__"] = " | ".join(value for value in extra_values if value) 457 458 return normalized
Normalize a raw CSV DictReader row to a clean string-to-string dict.
Arguments:
- row: Raw row dict from
csv.DictReader. - columns: Expected column names in the CSV.
Returns:
A normalized dict mapping column names to stripped string values.
461def coerce_projection_columns(value: Any) -> list[str]: 462 """Coerce a raw YAML value into a deduplicated list of column names. 463 464 Args: 465 value: Raw value from the YAML config (string, list, or other). 466 467 Returns: 468 A deduplicated list of non-empty column name strings. 469 """ 470 if isinstance(value, str): 471 candidates = [part.strip() for part in value.split(",")] 472 elif isinstance(value, list): 473 candidates = [str(item).strip() for item in value] 474 else: 475 return [] 476 477 deduplicated: list[str] = [] 478 for candidate in candidates: 479 if candidate and candidate not in deduplicated: 480 deduplicated.append(candidate) 481 return deduplicated
Coerce a raw YAML value into a deduplicated list of column names.
Arguments:
- value: Raw value from the YAML config (string, list, or other).
Returns:
A deduplicated list of non-empty column name strings.
488def emit_analysis_progress( 489 progress_callback: Any, 490 artifact_key: str, 491 status: str, 492 payload: dict[str, Any], 493) -> None: 494 """Emit a progress event to the frontend via the callback. 495 496 Args: 497 progress_callback: The user-supplied progress callback. 498 artifact_key: Artifact identifier for the event. 499 status: Event status. 500 payload: Event payload dict. 501 """ 502 try: 503 progress_callback(artifact_key, status, payload) 504 return 505 except TypeError: 506 pass 507 except Exception: 508 return 509 510 try: 511 progress_callback({ 512 "artifact_key": artifact_key, 513 "status": status, 514 "result": payload, 515 }) 516 except Exception: 517 return
Emit a progress event to the frontend via the callback.
Arguments:
- progress_callback: The user-supplied progress callback.
- artifact_key: Artifact identifier for the event.
- status: Event status.
- payload: Event payload dict.
524def estimate_tokens(text: str, model_info: Mapping[str, str] | None = None) -> int: 525 """Estimate the token count of a text string. 526 527 When ``tiktoken`` is available and the provider is OpenAI-compatible, 528 an exact BPE token count is returned. Otherwise a heuristic is used. 529 530 Args: 531 text: The text to estimate token count for. 532 model_info: Optional dict with ``provider`` and ``model`` keys. 533 534 Returns: 535 Estimated number of tokens (minimum 1). 536 """ 537 if not text: 538 return 1 539 540 if _TIKTOKEN_AVAILABLE and model_info is not None: 541 provider_name = str(model_info.get("provider", "")).lower() 542 if provider_name in {"openai", "local", "custom"}: 543 model_name = str(model_info.get("model", "")) 544 try: 545 enc = tiktoken.encoding_for_model(model_name) 546 except KeyError: 547 try: 548 enc = tiktoken.get_encoding("cl100k_base") 549 except Exception: 550 enc = None 551 if enc is not None: 552 try: 553 return max(1, len(enc.encode(text))) 554 except Exception: 555 pass 556 557 ascii_chars: list[str] = [] 558 non_ascii_count = 0 559 for ch in text: 560 if ord(ch) < 128: 561 ascii_chars.append(ch) 562 else: 563 non_ascii_count += 1 564 565 ascii_tokens = len(ascii_chars) / max(1, TOKEN_CHAR_RATIO) 566 non_ascii_tokens = non_ascii_count * 1.5 567 raw_estimate = ascii_tokens + non_ascii_tokens 568 with_margin = raw_estimate * 1.1 569 570 return max(1, int(with_margin))
Estimate the token count of a text string.
When tiktoken is available and the provider is OpenAI-compatible,
an exact BPE token count is returned. Otherwise a heuristic is used.
Arguments:
- text: The text to estimate token count for.
- model_info: Optional dict with
providerandmodelkeys.
Returns:
Estimated number of tokens (minimum 1).
302def is_dedup_safe_identifier_column(column_name: str) -> bool: 303 """Return True only for auto-incremented record IDs safe for dedup. 304 305 Args: 306 column_name: CSV column header name. 307 308 Returns: 309 ``True`` if the column is a safe dedup identifier. 310 """ 311 from .constants import DEDUP_SAFE_IDENTIFIER_HINTS 312 lowered = column_name.strip().lower().replace("-", "_").replace(" ", "_") 313 return lowered in DEDUP_SAFE_IDENTIFIER_HINTS
Return True only for auto-incremented record IDs safe for dedup.
Arguments:
- column_name: CSV column header name.
Returns:
Trueif the column is a safe dedup identifier.
24def normalize_os_type(os_type: str | None) -> str: 25 """Normalize an OS type identifier to its canonical lowercase form. 26 27 Args: 28 os_type: Operating system identifier (e.g. ``"windows"``, 29 ``"linux"``, ``"Linux "``). ``None`` or empty values 30 default to ``"windows"``. 31 32 Returns: 33 The lowercased, stripped OS type string, defaulting to 34 ``"windows"`` when *os_type* is falsy. 35 """ 36 return str(os_type).strip().lower() if os_type else "windows"
Normalize an OS type identifier to its canonical lowercase form.
Arguments:
- os_type: Operating system identifier (e.g.
"windows","linux","Linux ").Noneor empty values default to"windows".
Returns:
The lowercased, stripped OS type string, defaulting to
"windows"when os_type is falsy.
577def read_int_setting( 578 analysis_config: Mapping[str, Any], key: str, default: int, 579 minimum: int = 1, maximum: int | None = None, 580) -> int: 581 """Read an integer setting with bounds clamping. 582 583 Args: 584 analysis_config: The ``analysis`` sub-dictionary. 585 key: Configuration key name. 586 default: Default value. 587 minimum: Lower bound (inclusive). 588 maximum: Optional upper bound (inclusive). 589 590 Returns: 591 The parsed and clamped integer value. 592 """ 593 raw_value = analysis_config.get(key, default) 594 try: 595 parsed_value = int(raw_value) 596 except (TypeError, ValueError): 597 parsed_value = default 598 if parsed_value < minimum: 599 parsed_value = minimum 600 if maximum is not None and parsed_value > maximum: 601 parsed_value = maximum 602 return parsed_value
Read an integer setting with bounds clamping.
Arguments:
- analysis_config: The
analysissub-dictionary. - key: Configuration key name.
- default: Default value.
- minimum: Lower bound (inclusive).
- maximum: Optional upper bound (inclusive).
Returns:
The parsed and clamped integer value.
605def read_bool_setting(analysis_config: Mapping[str, Any], key: str, default: bool) -> bool: 606 """Read a boolean setting from the analysis config. 607 608 Args: 609 analysis_config: The ``analysis`` sub-dictionary. 610 key: Configuration key name. 611 default: Default value. 612 613 Returns: 614 The parsed boolean value. 615 """ 616 raw_value = analysis_config.get(key, default) 617 if isinstance(raw_value, bool): 618 return raw_value 619 if isinstance(raw_value, str): 620 lowered = raw_value.strip().lower() 621 if lowered in {"true", "1", "yes", "on"}: 622 return True 623 if lowered in {"false", "0", "no", "off"}: 624 return False 625 if isinstance(raw_value, (int, float)): 626 return bool(raw_value) 627 return default
Read a boolean setting from the analysis config.
Arguments:
- analysis_config: The
analysissub-dictionary. - key: Configuration key name.
- default: Default value.
Returns:
The parsed boolean value.
630def read_path_setting(analysis_config: Mapping[str, Any], key: str, default: str) -> str: 631 """Read a file-path setting from the analysis config. 632 633 Args: 634 analysis_config: The ``analysis`` sub-dictionary. 635 key: Configuration key name. 636 default: Default value. 637 638 Returns: 639 The cleaned path string. 640 """ 641 raw_value = analysis_config.get(key, default) 642 if isinstance(raw_value, (str, Path)): 643 cleaned = str(raw_value).strip() 644 if cleaned: 645 return cleaned 646 return default
Read a file-path setting from the analysis config.
Arguments:
- analysis_config: The
analysissub-dictionary. - key: Configuration key name.
- default: Default value.
Returns:
The cleaned path string.