Files
MASForensic/tools/parsers.py
BattleTag 81ade8f7ac feat(refit): complete S1-S6 — case abstraction, grounding, log-odds, plugins, coref, multi-source
Consolidates the long-running refit work (DESIGN.md as authoritative spec)
into a single baseline commit. Six stages landed together:

  S1  Case + EvidenceSource abstraction; tools parameterised by source_id
      (case.py, main.py multi-source bootstrap, .bin extension support)
  S2  Grounding gateway in add_phenomenon: verified_facts cite real
      ToolInvocation ids; substring / normalised match enforced; agent +
      task scope checked. Phenomenon.description split into verified_facts
      (grounded) + interpretation (free text). [invocation: inv-xxx]
      prefix on every wrapped tool result so the LLM can cite.
  S3  Confidence as additive log-odds: edge_type → log10(LR) calibration
      table; commutative updates; supported / refuted thresholds derived
      from log_odds; hypothesis × evidence matrix view.
  S4  iOS plugin: unzip_archive + parse_plist / sqlite_tables /
      sqlite_query / parse_ios_keychain / read_idevice_info;
      IOSArtifactAgent; SOURCE_TYPE_AGENTS routing.
  S5  Cross-source entity resolution: typed identifiers on Entity,
      observe_identity gateway, auto coref hypothesis with shared /
      conflicting strong/weak LR edges, reversible same_as edges,
      actor_clusters() view.
  S6  Android partition probe + AndroidArtifactAgent; MediaAgent with
      OCR fallback; orchestrator Phase 1 iterates every analysable
      source; platform-aware get_triage_agent_type; ReportAgent renders
      actor clusters + per-source breakdown.

142 unit tests / 1 skipped — full coverage of the new gateway, log-odds
math, coref hypothesis fall-out, and orchestrator multi-source dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:12:10 -10:00

393 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Parsers for various forensic artifact formats."""
from __future__ import annotations
import asyncio
import logging
import os
import re
import struct
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
async def read_text_file(file_path: str, max_bytes: int = 8000) -> str:
"""Read a text file, with size limit."""
try:
with open(file_path, "r", errors="replace") as f:
content = f.read(max_bytes)
size = os.path.getsize(file_path)
if size > max_bytes:
content += f"\n\n[Truncated: file is {size} bytes, showing first {max_bytes}]"
return content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_binary_preview(file_path: str, max_bytes: int = 2000) -> str:
"""Read a binary file and show hex + ASCII preview."""
try:
with open(file_path, "rb") as f:
data = f.read(max_bytes)
lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f"{i:08x} {hex_part:<48} {ascii_part}")
size = os.path.getsize(file_path)
header = f"File: {file_path} ({size} bytes)\n"
return header + "\n".join(lines)
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_text_file_section(file_path: str, start: int = 0, max_bytes: int = 8000) -> str:
"""Read a section of a text file starting at byte offset `start`."""
try:
size = os.path.getsize(file_path)
with open(file_path, "r", errors="replace") as f:
if start > 0:
f.seek(start)
content = f.read(max_bytes)
remaining = size - start - len(content.encode("utf-8", errors="replace"))
header = f"[File: {file_path}, {size} bytes, showing offset {start}{start + len(content.encode('utf-8', errors='replace'))}]"
if remaining > 0:
content += f"\n\n[{remaining} bytes remaining after this section]"
return header + "\n" + content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def search_text_file(file_path: str, pattern: str, max_matches: int = 50) -> str:
"""Search for a pattern in an extracted text file. Returns matching lines with line numbers."""
try:
size = os.path.getsize(file_path)
matches = []
try:
compiled = re.compile(pattern, re.IGNORECASE)
except re.error:
compiled = re.compile(re.escape(pattern), re.IGNORECASE)
with open(file_path, "r", errors="replace") as f:
for lineno, line in enumerate(f, 1):
if compiled.search(line):
matches.append(f" {lineno}: {line.rstrip()[:200]}")
if len(matches) >= max_matches:
matches.append(f" [Truncated: more than {max_matches} matches]")
break
header = f"Search '{pattern}' in {file_path} ({size} bytes): {len(matches)} matches"
if not matches:
return header + "\n (no matches)"
return header + "\n" + "\n".join(matches)
except Exception as e:
return f"[Error searching {file_path}: {e}]"
async def parse_pcap_strings(file_path: str) -> str:
"""Extract HTTP headers and other readable strings from a PCAP/capture file.
Uses the `strings` command to find printable text, then filters for
forensically relevant patterns (HTTP headers, URLs, credentials).
"""
try:
proc = await asyncio.create_subprocess_exec(
"srch_strings", "-a", "-n", "8", file_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
all_strings = stdout.decode("utf-8", errors="replace").splitlines()
hosts = set()
user_agents = set()
urls = []
cookies = []
http_methods = []
other_interesting = []
for line in all_strings:
stripped = line.strip()
if stripped.startswith("Host: "):
hosts.add(stripped[6:])
elif stripped.startswith("User-Agent: "):
user_agents.add(stripped[12:])
elif stripped.startswith("Cookie: "):
cookies.append(stripped[:200])
elif re.match(r"^(GET|POST|PUT|DELETE|HEAD) /", stripped):
urls.append(stripped[:200])
elif stripped.startswith("HTTP/"):
http_methods.append(stripped[:200])
elif any(kw in stripped.lower() for kw in ("password", "login", "username", "email", "set-cookie")):
other_interesting.append(stripped[:200])
size = os.path.getsize(file_path)
lines = [f"=== PCAP String Analysis: {file_path} ({size} bytes) ==="]
lines.append(f"Total printable strings (>=8 chars): {len(all_strings)}")
lines.append(f"\nUnique Hosts ({len(hosts)}):")
for h in sorted(hosts):
lines.append(f" {h}")
lines.append(f"\nUser-Agent strings ({len(user_agents)}):")
for ua in sorted(user_agents):
lines.append(f" {ua}")
lines.append(f"\nHTTP Requests ({len(urls)}):")
for u in urls[:30]:
lines.append(f" {u}")
if len(urls) > 30:
lines.append(f" ... ({len(urls) - 30} more)")
lines.append(f"\nHTTP Responses ({len(http_methods)}):")
for m in http_methods[:20]:
lines.append(f" {m}")
if cookies:
lines.append(f"\nCookies ({len(cookies)}):")
for c in cookies[:20]:
lines.append(f" {c}")
if other_interesting:
lines.append(f"\nOther interesting strings ({len(other_interesting)}):")
for o in other_interesting[:30]:
lines.append(f" {o}")
return "\n".join(lines)
except Exception as e:
return f"[Error parsing PCAP strings: {e}]"
async def parse_prefetch(file_path: str) -> str:
"""Parse a Windows XP Prefetch (.pf) file to extract execution info.
Returns: executable name, last execution time, and run count.
"""
try:
with open(file_path, "rb") as f:
data = f.read()
if len(data) < 0x94:
return f"[Error: file too small for Prefetch format ({len(data)} bytes)]"
version = struct.unpack_from("<I", data, 0)[0]
sig = data[4:8]
if sig != b"SCCA":
return f"[Error: not a Prefetch file — signature is {sig!r}, expected b'SCCA']"
# Filename: null-terminated UTF-16LE at offset 0x10
raw_name = data[0x10:0x4C]
name_end = raw_name.find(b"\x00\x00")
if name_end > 0:
if name_end % 2 == 1:
name_end += 1
filename = raw_name[:name_end].decode("utf-16-le")
else:
filename = raw_name.decode("utf-16-le", errors="replace").rstrip("\x00")
# Last execution time: FILETIME at offset 0x78 (Windows XP, version 17)
ft = struct.unpack_from("<Q", data, 0x78)[0]
if ft > 0:
epoch = datetime(1601, 1, 1, tzinfo=timezone.utc)
last_run = epoch + timedelta(microseconds=ft // 10)
last_run_str = last_run.strftime("%Y-%m-%d %H:%M:%S UTC")
else:
last_run_str = "(not available)"
# Run count at offset 0x90
run_count = struct.unpack_from("<I", data, 0x90)[0]
lines = [
f"=== Prefetch Analysis: {file_path} ===",
f"Prefetch Version: {version}",
f"Executable: {filename}",
f"Last Execution: {last_run_str}",
f"Run Count: {run_count}",
f"File Size: {len(data)} bytes",
]
return "\n".join(lines)
except Exception as e:
return f"[Error parsing Prefetch: {e}]"
async def list_extracted_dir(dir_path: str, max_entries: int = 200) -> str:
"""Smart summary of a (potentially huge) extracted tree.
Earlier versions dumped up to 200 random entries then truncated — that
leaves the agent blind on 10k+-file iOS extractions. The new layout
returns a compact summary that scales: total counts, extension
breakdown, top-level directories with their sizes, and the largest
files. For targeted lookups (e.g. find every ``*.sqlite`` under the
tree) the agent should use ``find_files`` instead.
"""
if not os.path.isdir(dir_path):
return f"[Error: {dir_path} is not a directory]"
try:
total_files = 0
total_bytes = 0
ext_counts: dict[str, int] = {}
ext_bytes: dict[str, int] = {}
top_level_dirs: dict[str, dict] = {}
biggest: list[tuple[int, str]] = [] # (size, relpath)
dir_path_abs = os.path.abspath(dir_path)
for root, dirs, files in os.walk(dir_path_abs):
# Track top-level directory aggregates (cheap; no per-entry cost
# beyond the walk we're already doing).
rel_root = os.path.relpath(root, dir_path_abs)
if rel_root == ".":
top_dirs = {d: {"files": 0, "bytes": 0} for d in dirs}
top_level_dirs.update(top_dirs)
top_key = None
else:
top_key = rel_root.split(os.sep, 1)[0]
if top_key not in top_level_dirs:
top_level_dirs[top_key] = {"files": 0, "bytes": 0}
for f in files:
full = os.path.join(root, f)
try:
size = os.path.getsize(full)
except OSError:
continue
total_files += 1
total_bytes += size
ext = os.path.splitext(f)[1].lower() or "(no ext)"
ext_counts[ext] = ext_counts.get(ext, 0) + 1
ext_bytes[ext] = ext_bytes.get(ext, 0) + size
if top_key is not None:
top_level_dirs[top_key]["files"] += 1
top_level_dirs[top_key]["bytes"] += size
# Maintain a top-10 largest list cheaply (bounded insertion).
if len(biggest) < 10:
biggest.append((size, os.path.relpath(full, dir_path_abs)))
biggest.sort(reverse=True)
elif size > biggest[-1][0]:
biggest[-1] = (size, os.path.relpath(full, dir_path_abs))
biggest.sort(reverse=True)
def _human(n: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f}{unit}" if unit != "B" else f"{n}B"
n /= 1024
return f"{n:.1f}TB"
lines = [
f"Directory: {dir_path}",
f" Total: {total_files} file(s), {_human(total_bytes)}",
]
# Top-level directory layout (immediate children, sorted by file count).
if top_level_dirs:
lines.append(f"\nTop-level layout ({len(top_level_dirs)} dirs at root):")
sorted_tlds = sorted(
top_level_dirs.items(), key=lambda kv: -kv[1]["files"],
)[:15]
for d, stats in sorted_tlds:
lines.append(
f" {d}/ ({stats['files']} files, {_human(stats['bytes'])})"
)
if len(top_level_dirs) > 15:
lines.append(f" ... ({len(top_level_dirs) - 15} more top-level dirs)")
# Extension breakdown.
if ext_counts:
lines.append(f"\nExtension breakdown (top 15):")
for ext, count in sorted(ext_counts.items(), key=lambda kv: -kv[1])[:15]:
lines.append(
f" {ext}: {count} files, {_human(ext_bytes.get(ext, 0))}"
)
# Largest files (often the highest-value forensic targets).
if biggest:
lines.append("\nLargest files:")
for size, rel in biggest:
lines.append(f" {rel} ({_human(size)})")
lines.append(
f"\nNext step: call find_files with a pattern like "
f"'**/*.plist' or '**/keychain-2.db' to locate specific artefacts."
)
return "\n".join(lines)
except Exception as e:
return f"[Error listing {dir_path}: {e}]"
async def find_files(
root: str,
pattern: str,
max_results: int = 500,
) -> str:
"""Recursively find files under *root* whose path matches *pattern*.
Uses fnmatch-style globs against the *full relative path*; ``**`` is
treated as "any number of path segments" (so ``**/*.plist`` finds
every plist no matter how deep). Examples:
- ``**/sms.db`` — iOS SMS database
- ``**/keychain-2.db`` — iOS keychain
- ``**/ChatStorage.sqlite`` — WhatsApp app store
- ``HomeDomain/Library/**`` — anchor at a known iOS domain root
- ``**/*.{plist,sqlite,db}`` — multi-extension (use 2+ calls or a regex if needed)
Results are sorted by size descending — the biggest hits usually
matter most. Capped at *max_results* to keep the LLM context bounded.
"""
import fnmatch
if not os.path.isdir(root):
return f"[Error: {root} is not a directory]"
root_abs = os.path.abspath(root)
# Convert ``**`` (any-depth) to fnmatch's ``*`` (any chars including /).
# fnmatch doesn't natively distinguish segment vs path; expanding ``**``
# to ``*`` and letting fnmatch match the full relpath is good enough for
# forensic lookups.
fn_pattern = pattern.replace("**", "*")
hits: list[tuple[int, str]] = []
truncated = False
try:
for dirpath, _dirs, files in os.walk(root_abs):
for f in files:
full = os.path.join(dirpath, f)
rel = os.path.relpath(full, root_abs)
if fnmatch.fnmatch(rel, fn_pattern) or fnmatch.fnmatch(f, fn_pattern):
try:
size = os.path.getsize(full)
except OSError:
size = 0
hits.append((size, rel))
if len(hits) >= max_results * 4:
# Hard upper bound to keep the walk cheap on huge trees.
truncated = True
break
if truncated:
break
except Exception as e:
return f"[Error searching {root}: {e}]"
hits.sort(reverse=True)
if len(hits) > max_results:
truncated = True
hits = hits[:max_results]
lines = [
f"find_files: pattern={pattern!r} under {root}",
f" matches: {len(hits)}" + (" (truncated)" if truncated else ""),
]
if not hits:
lines.append(" (no matches)")
else:
for size, rel in hits:
lines.append(f" {rel} ({size} bytes)")
return "\n".join(lines)