Files
MASForensic/main.py
BattleTag 81ade8f7ac feat(refit): complete S1-S6 — case abstraction, grounding, log-odds, plugins, coref, multi-source
Consolidates the long-running refit work (DESIGN.md as authoritative spec)
into a single baseline commit. Six stages landed together:

  S1  Case + EvidenceSource abstraction; tools parameterised by source_id
      (case.py, main.py multi-source bootstrap, .bin extension support)
  S2  Grounding gateway in add_phenomenon: verified_facts cite real
      ToolInvocation ids; substring / normalised match enforced; agent +
      task scope checked. Phenomenon.description split into verified_facts
      (grounded) + interpretation (free text). [invocation: inv-xxx]
      prefix on every wrapped tool result so the LLM can cite.
  S3  Confidence as additive log-odds: edge_type → log10(LR) calibration
      table; commutative updates; supported / refuted thresholds derived
      from log_odds; hypothesis × evidence matrix view.
  S4  iOS plugin: unzip_archive + parse_plist / sqlite_tables /
      sqlite_query / parse_ios_keychain / read_idevice_info;
      IOSArtifactAgent; SOURCE_TYPE_AGENTS routing.
  S5  Cross-source entity resolution: typed identifiers on Entity,
      observe_identity gateway, auto coref hypothesis with shared /
      conflicting strong/weak LR edges, reversible same_as edges,
      actor_clusters() view.
  S6  Android partition probe + AndroidArtifactAgent; MediaAgent with
      OCR fallback; orchestrator Phase 1 iterates every analysable
      source; platform-aware get_triage_agent_type; ReportAgent renders
      actor clusters + per-source breakdown.

142 unit tests / 1 skipped — full coverage of the new gateway, log-odds
math, coref hypothesis fall-out, and orchestrator multi-source dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:12:10 -10:00

402 lines
14 KiB
Python

"""MASForensics — Multi-Agent System for Digital Forensics."""
from __future__ import annotations
import asyncio
import json
import logging
import re
import shutil
import subprocess
import sys
from datetime import datetime
from pathlib import Path
import yaml
from agent_factory import AgentFactory
from case import (
DISK_IMAGE_EXTS, Case, EvidenceSource, load_case, single_source_case,
)
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from log_config import setup_logging
from orchestrator import AnalysisAborted, Orchestrator
from tool_registry import register_all_tools
from tools.archive import unzip_archive_sync
RUNS_DIR = Path("runs")
IMAGE_DIR = Path("image")
# Persistent unpack cache for tree-mode sources (zip extractions). Lives
# at project root so multiple runs can reuse the same unpacked tree.
SOURCE_CACHE_DIR = Path(".cache/sources")
def load_config(path: str = "config.yaml") -> dict:
with open(path) as f:
return yaml.safe_load(f)
# ---------------------------------------------------------------------------
# Interactive image & partition selection
# ---------------------------------------------------------------------------
def _discover_images(search_dir: Path = IMAGE_DIR) -> list[Path]:
"""Find forensic disk image files under *search_dir* (case-insensitive ext)."""
if not search_dir.is_dir():
return []
return sorted(
p for p in search_dir.iterdir()
if p.is_file() and p.suffix.lower() in DISK_IMAGE_EXTS
)
def _parse_mmls(output: str) -> list[dict]:
"""Parse mmls output into a list of partition dicts.
Returns only data partitions (skips Meta / Unallocated rows).
"""
partitions: list[dict] = []
for line in output.splitlines():
# Typical line: "002: 000:000 0000000063 0009510479 0009510417 NTFS / exFAT (0x07)"
m = re.match(
r"\s*\d+:\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.*)",
line,
)
if not m:
continue
slot, start, end, length, desc = m.groups()
# Skip non-data rows
if slot == "Meta" or slot.startswith("---"):
continue
partitions.append({
"slot": slot,
"start": int(start),
"end": int(end),
"length": int(length),
"description": desc.strip(),
})
return partitions
def _run_mmls(image_path: str) -> list[dict]:
"""Run mmls and return parsed partition list."""
try:
result = subprocess.run(
["mmls", image_path],
capture_output=True, text=True, timeout=30,
)
except FileNotFoundError:
print("Error: mmls not found. Is The Sleuth Kit installed?")
sys.exit(1)
if result.returncode != 0:
return []
return _parse_mmls(result.stdout)
def select_image_interactive(image_dir: Path | None = None) -> tuple[str, int]:
"""Interactively select a disk image and partition.
If *image_dir* is None, prompts the user for the image folder path.
Returns (image_path, partition_offset_in_sectors).
"""
# --- Image folder selection ---
if image_dir is None:
raw = input("Image folder path: ").strip()
if not raw:
print("No path provided.")
sys.exit(1)
image_dir = Path(raw).expanduser().resolve()
if not image_dir.is_dir():
print(f"Error: {image_dir} is not a directory.")
sys.exit(1)
# --- Image file selection ---
images = _discover_images(image_dir)
if not images:
print(f"No disk images found in {image_dir}/")
print("Supported extensions: " + ", ".join(sorted(DISK_IMAGE_EXTS)))
sys.exit(1)
if len(images) == 1:
image_path = images[0]
print(f"Found image: {image_path}")
else:
print("Available disk images:")
for i, img in enumerate(images, 1):
size_mb = img.stat().st_size / (1024 * 1024)
print(f" [{i}] {img.name} ({size_mb:.0f} MB)")
while True:
choice = input(f"Select image [1-{len(images)}]: ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(images):
image_path = images[int(choice) - 1]
break
print("Invalid choice.")
# --- Partition detection ---
print(f"Detecting partitions ({image_path}) ...")
partitions = _run_mmls(str(image_path))
if not partitions:
print("Warning: mmls could not detect partitions. Using offset 0.")
return str(image_path), 0
if len(partitions) == 1:
p = partitions[0]
print(f"Found partition: {p['description']} (offset={p['start']})")
return str(image_path), p["start"]
print("Partitions:")
for i, p in enumerate(partitions, 1):
size_mb = p["length"] * 512 / (1024 * 1024)
print(f" [{i}] {p['description']} (offset={p['start']}, {size_mb:.0f} MB)")
while True:
choice = input(f"Select partition [1-{len(partitions)}]: ").strip()
if choice.isdigit() and 1 <= int(choice) <= len(partitions):
p = partitions[int(choice) - 1]
return str(image_path), p["start"]
print("Invalid choice.")
def resolve_case() -> Case:
"""Resolve the Case to analyze.
Priority: an explicit case file given as a CLI argument, then ./case.yaml
in the working directory, then legacy interactive single-image selection.
"""
# 1. Explicit case file passed on the command line
if len(sys.argv) > 1 and sys.argv[1].lower().endswith((".yaml", ".yml")):
case = load_case(sys.argv[1])
if case is None:
print(f"Error: could not load case file {sys.argv[1]}")
sys.exit(1)
print(f"Loaded case: {case.name} ({len(case.sources)} sources)")
return case
# 2. ./case.yaml in the working directory
case = load_case()
if case is not None:
print(f"Loaded case: {case.name} ({len(case.sources)} sources)")
return case
# 3. Legacy interactive single-image selection
cli_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else None
image_path, partition_offset = select_image_interactive(cli_dir)
return single_source_case(image_path, partition_offset)
def _is_analysable(src: EvidenceSource) -> bool:
"""A source is analysable when it has a path AND its mode has tooling.
S4 lights up tree-mode iOS extractions; image-mode disks were already
supported. Media-collection (screenshots) remain skipped until S6.
"""
if not src.path:
return False
if src.access_mode == "image":
return True
if src.access_mode == "tree" and src.type in ("mobile_extraction", "archive"):
return True
return False
def list_analysable_sources(case: Case) -> list[EvidenceSource]:
"""Return every analysable source in the case (orchestrator iterates them).
Pre-S6 main.py used to force-choose one source here; the multi-source
orchestrator (Phase 1 per-source triage) now consumes the full list.
Skipped sources are still reported for visibility.
"""
analysable = [s for s in case.sources if _is_analysable(s)]
skipped = [s for s in case.sources if not _is_analysable(s)]
if skipped:
print(
f"Note: {len(skipped)} source(s) not analysable in this build: "
+ ", ".join(f"{s.label} ({s.type})" for s in skipped)
)
if not analysable:
print("No analysable sources in this case.")
sys.exit(1)
print(f"Analysing {len(analysable)} source(s) — orchestrator will triage each in Phase 1:")
for s in analysable:
print(f" - {s.summary()}")
return analysable
def prepare_source(src: EvidenceSource) -> EvidenceSource:
"""Materialise a tree-mode source for analysis.
Mobile / archive sources arrive as .zip files. We unpack once into a
project-level cache (``.cache/sources/<src.id>/``) and rewrite
``src.path`` to point at the unpacked directory. Idempotent — a
second run with the cache present is a no-op (unzip_archive_sync
skips files that already exist with the matching size).
Disk-image and already-tree sources pass through unchanged.
"""
if src.access_mode != "tree":
return src
p = Path(src.path)
if p.is_dir():
return src # already a directory, nothing to do
if not p.is_file():
print(f"Warning: source path {src.path} does not exist; leaving as-is.")
return src
if p.suffix.lower() != ".zip":
# Other archive types (tar, 7z, ...) — not handled yet.
print(f"Warning: tree-mode source {src.id} is not a .zip "
f"({p.suffix}); leaving as-is.")
return src
dest = SOURCE_CACHE_DIR / src.id
dest.mkdir(parents=True, exist_ok=True)
# Password-protected zips (e.g. CTF artefacts) carry their key in
# case.yaml's meta.password — never logged, never persisted.
password = (src.meta or {}).get("password")
pw_note = " (password from meta)" if password else ""
print(f"Unpacking {p.name}{dest}{pw_note} (idempotent) ...")
result = unzip_archive_sync(str(p), str(dest), password=password)
first_line = result.split("\n", 1)[0]
print(" " + first_line)
if first_line.startswith("Error:"):
# Surface the multi-line guidance from _do_extract verbatim.
for extra in result.split("\n")[1:]:
print(" " + extra)
print(f" Source {src.id} stays unanalysable until this is resolved.")
# Leave src.path unchanged so the source remains marked unanalysable.
return src
src.path = str(dest)
src.access_mode = "tree"
return src
def find_resumable_run() -> Path | None:
"""Find the most recent incomplete run with a saved graph state."""
if not RUNS_DIR.exists():
return None
candidates = sorted(RUNS_DIR.glob("*/graph_state.json"), reverse=True)
for state_file in candidates:
# Incomplete = has graph_state.json but no run_metadata.json (final archive)
if not (state_file.parent / "run_metadata.json").exists():
return state_file
return None
def setup_run_dir() -> Path:
"""Create a timestamped run directory."""
run_dir = RUNS_DIR / datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
run_dir.mkdir(parents=True, exist_ok=True)
return run_dir
async def async_main() -> None:
config = load_config()
agent_cfg = config["agent"]
# Check for resumable run
resumable = find_resumable_run()
resume_phase = 1
run_dir: Path | None = None
graph: EvidenceGraph | None = None
if resumable:
print(f"Found incomplete run: {resumable.parent.name}")
try:
data = json.loads(resumable.read_text())
ph_count = len(data.get("phenomena", {}))
hyp_count = len(data.get("hypotheses", {}))
print(f" ({ph_count} phenomena, {hyp_count} hypotheses)")
except Exception:
pass
print("Resume? [y/N] ", end="", flush=True)
choice = input().strip().lower()
if choice == "y":
run_dir = resumable.parent
graph = EvidenceGraph.load_state(resumable)
status = graph.agent_status
if any(h.status != "active" for h in graph.hypotheses.values()):
resume_phase = 3 # hypotheses exist, resume investigation
elif graph.phenomena:
resume_phase = 2 # have phenomena, generate hypotheses
print(f"Resuming from Phase {resume_phase}...")
if run_dir is None:
run_dir = setup_run_dir()
# Setup logging — colored terminal + detailed log file
setup_logging(run_dir)
# Save config snapshot
shutil.copy2("config.yaml", run_dir / "config.yaml")
# Initialize LLM client
llm = LLMClient(
base_url=agent_cfg["base_url"],
api_key=agent_cfg["api_key"],
model=agent_cfg["model"],
max_tokens=agent_cfg.get("max_tokens", 4096),
proxy=agent_cfg.get("proxy", "auto"),
reasoning_effort=agent_cfg.get("reasoning_effort"),
thinking_enabled=agent_cfg.get("thinking_enabled", False),
)
# Initialize evidence graph
if graph is None:
case = resolve_case()
# case_info derived from THIS case's meta (case.yaml), not from
# config.yaml's legacy `cfreds_hacking_case` block. Without this,
# the old CFReDS evidence MD5s would be embedded in reports for
# every subsequent unrelated case.
graph = EvidenceGraph(
case_info=dict(case.meta or {}),
persist_path=run_dir / "graph_state.json",
edge_log_lr=config.get("hypothesis_log_lr"),
)
graph.case = case
graph.extracted_dir = str(run_dir / "extracted")
analysable = list_analysable_sources(case)
# Prepare every analysable source up front (unzip tree-mode zips,
# etc.). Idempotent on cache hits — second run is a no-op.
prepared = [prepare_source(s) for s in analysable]
# Seed the active source so tools that resolve lazily have a target
# before Phase 1 begins; the orchestrator resets it per source.
graph.set_active_source(prepared[0])
else:
graph._persist_path = run_dir / "graph_state.json"
# Register all tools — they resolve the active evidence source at call time
register_all_tools(graph)
# Create agent factory
factory = AgentFactory(llm, graph)
# Run orchestrator
orchestrator = Orchestrator(llm, graph, factory, config=config, run_dir=run_dir)
try:
report = await orchestrator.run(resume_phase=resume_phase)
print("\n" + "=" * 60)
print("FORENSIC ANALYSIS COMPLETE")
print("=" * 60)
print(f"Results archived to: {run_dir}")
print(report)
except AnalysisAborted:
print("\n" + "=" * 60)
print("ANALYSIS ABORTED — too many consecutive failures")
print("=" * 60)
print(f"Partial results saved to: {run_dir}")
print("Run again to resume from saved state.")
except KeyboardInterrupt:
print("\nInterrupted. State saved.")
print(f"Partial results in: {run_dir}")
finally:
await llm.close()
def main() -> None:
asyncio.run(async_main())
if __name__ == "__main__":
main()