"""MASForensics — Multi-Agent System for Digital Forensics.""" from __future__ import annotations import asyncio import json import logging import re import shutil import subprocess import sys from datetime import datetime from pathlib import Path import yaml from agent_factory import AgentFactory from case import ( DISK_IMAGE_EXTS, Case, EvidenceSource, load_case, single_source_case, ) from evidence_graph import EvidenceGraph from llm_client import LLMClient from log_config import setup_logging from orchestrator import AnalysisAborted, Orchestrator from tool_registry import register_all_tools from tools.archive import unzip_archive_sync RUNS_DIR = Path("runs") IMAGE_DIR = Path("image") # Persistent unpack cache for tree-mode sources (zip extractions). Lives # at project root so multiple runs can reuse the same unpacked tree. SOURCE_CACHE_DIR = Path(".cache/sources") def load_config(path: str = "config.yaml") -> dict: with open(path) as f: return yaml.safe_load(f) # --------------------------------------------------------------------------- # Interactive image & partition selection # --------------------------------------------------------------------------- def _discover_images(search_dir: Path = IMAGE_DIR) -> list[Path]: """Find forensic disk image files under *search_dir* (case-insensitive ext).""" if not search_dir.is_dir(): return [] return sorted( p for p in search_dir.iterdir() if p.is_file() and p.suffix.lower() in DISK_IMAGE_EXTS ) def _parse_mmls(output: str) -> list[dict]: """Parse mmls output into a list of partition dicts. Returns only data partitions (skips Meta / Unallocated rows). """ partitions: list[dict] = [] for line in output.splitlines(): # Typical line: "002: 000:000 0000000063 0009510479 0009510417 NTFS / exFAT (0x07)" m = re.match( r"\s*\d+:\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.*)", line, ) if not m: continue slot, start, end, length, desc = m.groups() # Skip non-data rows if slot == "Meta" or slot.startswith("---"): continue partitions.append({ "slot": slot, "start": int(start), "end": int(end), "length": int(length), "description": desc.strip(), }) return partitions def _run_mmls(image_path: str) -> list[dict]: """Run mmls and return parsed partition list.""" try: result = subprocess.run( ["mmls", image_path], capture_output=True, text=True, timeout=30, ) except FileNotFoundError: print("Error: mmls not found. Is The Sleuth Kit installed?") sys.exit(1) if result.returncode != 0: return [] return _parse_mmls(result.stdout) def select_image_interactive(image_dir: Path | None = None) -> tuple[str, int]: """Interactively select a disk image and partition. If *image_dir* is None, prompts the user for the image folder path. Returns (image_path, partition_offset_in_sectors). """ # --- Image folder selection --- if image_dir is None: raw = input("Image folder path: ").strip() if not raw: print("No path provided.") sys.exit(1) image_dir = Path(raw).expanduser().resolve() if not image_dir.is_dir(): print(f"Error: {image_dir} is not a directory.") sys.exit(1) # --- Image file selection --- images = _discover_images(image_dir) if not images: print(f"No disk images found in {image_dir}/") print("Supported extensions: " + ", ".join(sorted(DISK_IMAGE_EXTS))) sys.exit(1) if len(images) == 1: image_path = images[0] print(f"Found image: {image_path}") else: print("Available disk images:") for i, img in enumerate(images, 1): size_mb = img.stat().st_size / (1024 * 1024) print(f" [{i}] {img.name} ({size_mb:.0f} MB)") while True: choice = input(f"Select image [1-{len(images)}]: ").strip() if choice.isdigit() and 1 <= int(choice) <= len(images): image_path = images[int(choice) - 1] break print("Invalid choice.") # --- Partition detection --- print(f"Detecting partitions ({image_path}) ...") partitions = _run_mmls(str(image_path)) if not partitions: print("Warning: mmls could not detect partitions. Using offset 0.") return str(image_path), 0 if len(partitions) == 1: p = partitions[0] print(f"Found partition: {p['description']} (offset={p['start']})") return str(image_path), p["start"] print("Partitions:") for i, p in enumerate(partitions, 1): size_mb = p["length"] * 512 / (1024 * 1024) print(f" [{i}] {p['description']} (offset={p['start']}, {size_mb:.0f} MB)") while True: choice = input(f"Select partition [1-{len(partitions)}]: ").strip() if choice.isdigit() and 1 <= int(choice) <= len(partitions): p = partitions[int(choice) - 1] return str(image_path), p["start"] print("Invalid choice.") def resolve_case() -> Case: """Resolve the Case to analyze. Priority: an explicit case file given as a CLI argument, then ./case.yaml in the working directory, then legacy interactive single-image selection. """ # 1. Explicit case file passed on the command line if len(sys.argv) > 1 and sys.argv[1].lower().endswith((".yaml", ".yml")): case = load_case(sys.argv[1]) if case is None: print(f"Error: could not load case file {sys.argv[1]}") sys.exit(1) print(f"Loaded case: {case.name} ({len(case.sources)} sources)") return case # 2. ./case.yaml in the working directory case = load_case() if case is not None: print(f"Loaded case: {case.name} ({len(case.sources)} sources)") return case # 3. Legacy interactive single-image selection cli_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else None image_path, partition_offset = select_image_interactive(cli_dir) return single_source_case(image_path, partition_offset) def _is_analysable(src: EvidenceSource) -> bool: """A source is analysable when it has a path AND its mode has tooling. S4 lights up tree-mode iOS extractions; image-mode disks were already supported. Media-collection (screenshots) remain skipped until S6. """ if not src.path: return False if src.access_mode == "image": return True if src.access_mode == "tree" and src.type in ("mobile_extraction", "archive"): return True return False def list_analysable_sources(case: Case) -> list[EvidenceSource]: """Return every analysable source in the case (orchestrator iterates them). Pre-S6 main.py used to force-choose one source here; the multi-source orchestrator (Phase 1 per-source triage) now consumes the full list. Skipped sources are still reported for visibility. """ analysable = [s for s in case.sources if _is_analysable(s)] skipped = [s for s in case.sources if not _is_analysable(s)] if skipped: print( f"Note: {len(skipped)} source(s) not analysable in this build: " + ", ".join(f"{s.label} ({s.type})" for s in skipped) ) if not analysable: print("No analysable sources in this case.") sys.exit(1) print(f"Analysing {len(analysable)} source(s) — orchestrator will triage each in Phase 1:") for s in analysable: print(f" - {s.summary()}") return analysable def prepare_source(src: EvidenceSource) -> EvidenceSource: """Materialise a tree-mode source for analysis. Mobile / archive sources arrive as .zip files. We unpack once into a project-level cache (``.cache/sources//``) and rewrite ``src.path`` to point at the unpacked directory. Idempotent — a second run with the cache present is a no-op (unzip_archive_sync skips files that already exist with the matching size). Disk-image and already-tree sources pass through unchanged. """ if src.access_mode != "tree": return src p = Path(src.path) if p.is_dir(): return src # already a directory, nothing to do if not p.is_file(): print(f"Warning: source path {src.path} does not exist; leaving as-is.") return src if p.suffix.lower() != ".zip": # Other archive types (tar, 7z, ...) — not handled yet. print(f"Warning: tree-mode source {src.id} is not a .zip " f"({p.suffix}); leaving as-is.") return src dest = SOURCE_CACHE_DIR / src.id dest.mkdir(parents=True, exist_ok=True) # Password-protected zips (e.g. CTF artefacts) carry their key in # case.yaml's meta.password — never logged, never persisted. password = (src.meta or {}).get("password") pw_note = " (password from meta)" if password else "" print(f"Unpacking {p.name} → {dest}{pw_note} (idempotent) ...") result = unzip_archive_sync(str(p), str(dest), password=password) first_line = result.split("\n", 1)[0] print(" " + first_line) if first_line.startswith("Error:"): # Surface the multi-line guidance from _do_extract verbatim. for extra in result.split("\n")[1:]: print(" " + extra) print(f" Source {src.id} stays unanalysable until this is resolved.") # Leave src.path unchanged so the source remains marked unanalysable. return src src.path = str(dest) src.access_mode = "tree" return src def find_resumable_run() -> Path | None: """Find the most recent incomplete run with a saved graph state.""" if not RUNS_DIR.exists(): return None candidates = sorted(RUNS_DIR.glob("*/graph_state.json"), reverse=True) for state_file in candidates: # Incomplete = has graph_state.json but no run_metadata.json (final archive) if not (state_file.parent / "run_metadata.json").exists(): return state_file return None def setup_run_dir() -> Path: """Create a timestamped run directory.""" run_dir = RUNS_DIR / datetime.now().strftime("%Y-%m-%dT%H-%M-%S") run_dir.mkdir(parents=True, exist_ok=True) return run_dir async def async_main() -> None: config = load_config() agent_cfg = config["agent"] # Check for resumable run resumable = find_resumable_run() resume_phase = 1 run_dir: Path | None = None graph: EvidenceGraph | None = None if resumable: print(f"Found incomplete run: {resumable.parent.name}") try: data = json.loads(resumable.read_text()) ph_count = len(data.get("phenomena", {})) hyp_count = len(data.get("hypotheses", {})) print(f" ({ph_count} phenomena, {hyp_count} hypotheses)") except Exception: pass print("Resume? [y/N] ", end="", flush=True) choice = input().strip().lower() if choice == "y": run_dir = resumable.parent graph = EvidenceGraph.load_state(resumable) status = graph.agent_status if any(h.status != "active" for h in graph.hypotheses.values()): resume_phase = 3 # hypotheses exist, resume investigation elif graph.phenomena: resume_phase = 2 # have phenomena, generate hypotheses print(f"Resuming from Phase {resume_phase}...") if run_dir is None: run_dir = setup_run_dir() # Setup logging — colored terminal + detailed log file setup_logging(run_dir) # Save config snapshot shutil.copy2("config.yaml", run_dir / "config.yaml") # Initialize LLM client llm = LLMClient( base_url=agent_cfg["base_url"], api_key=agent_cfg["api_key"], model=agent_cfg["model"], max_tokens=agent_cfg.get("max_tokens", 4096), proxy=agent_cfg.get("proxy", "auto"), reasoning_effort=agent_cfg.get("reasoning_effort"), thinking_enabled=agent_cfg.get("thinking_enabled", False), ) # Initialize evidence graph if graph is None: case = resolve_case() # case_info derived from THIS case's meta (case.yaml), not from # config.yaml's legacy `cfreds_hacking_case` block. Without this, # the old CFReDS evidence MD5s would be embedded in reports for # every subsequent unrelated case. graph = EvidenceGraph( case_info=dict(case.meta or {}), persist_path=run_dir / "graph_state.json", edge_log_lr=config.get("hypothesis_log_lr"), ) graph.case = case graph.extracted_dir = str(run_dir / "extracted") analysable = list_analysable_sources(case) # Prepare every analysable source up front (unzip tree-mode zips, # etc.). Idempotent on cache hits — second run is a no-op. prepared = [prepare_source(s) for s in analysable] # Seed the active source so tools that resolve lazily have a target # before Phase 1 begins; the orchestrator resets it per source. graph.set_active_source(prepared[0]) else: graph._persist_path = run_dir / "graph_state.json" # Register all tools — they resolve the active evidence source at call time register_all_tools(graph) # Create agent factory factory = AgentFactory(llm, graph) # Run orchestrator orchestrator = Orchestrator(llm, graph, factory, config=config, run_dir=run_dir) try: report = await orchestrator.run(resume_phase=resume_phase) print("\n" + "=" * 60) print("FORENSIC ANALYSIS COMPLETE") print("=" * 60) print(f"Results archived to: {run_dir}") print(report) except AnalysisAborted: print("\n" + "=" * 60) print("ANALYSIS ABORTED — too many consecutive failures") print("=" * 60) print(f"Partial results saved to: {run_dir}") print("Run again to resume from saved state.") except KeyboardInterrupt: print("\nInterrupted. State saved.") print(f"Partial results in: {run_dir}") finally: await llm.close() def main() -> None: asyncio.run(async_main()) if __name__ == "__main__": main()