"""MASForensics — Multi-Agent System for Digital Forensics.""" from __future__ import annotations import asyncio import json import logging import re import shutil import subprocess import sys from datetime import datetime from pathlib import Path import yaml from agent_factory import AgentFactory from evidence_graph import EvidenceGraph from llm_client import LLMClient from log_config import setup_logging from orchestrator import AnalysisAborted, Orchestrator from tool_registry import register_all_tools RUNS_DIR = Path("runs") IMAGE_DIR = Path("image") # Common forensic image extensions (only first segment / single-file formats) _IMAGE_GLOBS = ["*.001", "*.dd", "*.raw", "*.img", "*.E01", "*.iso"] def load_config(path: str = "config.yaml") -> dict: with open(path) as f: return yaml.safe_load(f) # --------------------------------------------------------------------------- # Interactive image & partition selection # --------------------------------------------------------------------------- def _discover_images(search_dir: Path = IMAGE_DIR) -> list[Path]: """Find forensic disk image files under *search_dir*.""" images: set[Path] = set() for glob in _IMAGE_GLOBS: images.update(search_dir.glob(glob)) return sorted(images) def _parse_mmls(output: str) -> list[dict]: """Parse mmls output into a list of partition dicts. Returns only data partitions (skips Meta / Unallocated rows). """ partitions: list[dict] = [] for line in output.splitlines(): # Typical line: "002: 000:000 0000000063 0009510479 0009510417 NTFS / exFAT (0x07)" m = re.match( r"\s*\d+:\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.*)", line, ) if not m: continue slot, start, end, length, desc = m.groups() # Skip non-data rows if slot == "Meta" or slot.startswith("---"): continue partitions.append({ "slot": slot, "start": int(start), "end": int(end), "length": int(length), "description": desc.strip(), }) return partitions def _run_mmls(image_path: str) -> list[dict]: """Run mmls and return parsed partition list.""" try: result = subprocess.run( ["mmls", image_path], capture_output=True, text=True, timeout=30, ) except FileNotFoundError: print("Error: mmls not found. Is The Sleuth Kit installed?") sys.exit(1) if result.returncode != 0: return [] return _parse_mmls(result.stdout) def select_image_interactive(image_dir: Path | None = None) -> tuple[str, int]: """Interactively select a disk image and partition. If *image_dir* is None, prompts the user for the image folder path. Returns (image_path, partition_offset_in_sectors). """ # --- Image folder selection --- if image_dir is None: raw = input("Image folder path: ").strip() if not raw: print("No path provided.") sys.exit(1) image_dir = Path(raw).expanduser().resolve() if not image_dir.is_dir(): print(f"Error: {image_dir} is not a directory.") sys.exit(1) # --- Image file selection --- images = _discover_images(image_dir) if not images: print(f"No disk images found in {image_dir}/") print("Supported formats: " + ", ".join(_IMAGE_GLOBS)) sys.exit(1) if len(images) == 1: image_path = images[0] print(f"Found image: {image_path}") else: print("Available disk images:") for i, img in enumerate(images, 1): size_mb = img.stat().st_size / (1024 * 1024) print(f" [{i}] {img.name} ({size_mb:.0f} MB)") while True: choice = input(f"Select image [1-{len(images)}]: ").strip() if choice.isdigit() and 1 <= int(choice) <= len(images): image_path = images[int(choice) - 1] break print("Invalid choice.") # --- Partition detection --- print(f"Detecting partitions ({image_path}) ...") partitions = _run_mmls(str(image_path)) if not partitions: print("Warning: mmls could not detect partitions. Using offset 0.") return str(image_path), 0 if len(partitions) == 1: p = partitions[0] print(f"Found partition: {p['description']} (offset={p['start']})") return str(image_path), p["start"] print("Partitions:") for i, p in enumerate(partitions, 1): size_mb = p["length"] * 512 / (1024 * 1024) print(f" [{i}] {p['description']} (offset={p['start']}, {size_mb:.0f} MB)") while True: choice = input(f"Select partition [1-{len(partitions)}]: ").strip() if choice.isdigit() and 1 <= int(choice) <= len(partitions): p = partitions[int(choice) - 1] return str(image_path), p["start"] print("Invalid choice.") def find_resumable_run() -> Path | None: """Find the most recent incomplete run with a saved graph state.""" if not RUNS_DIR.exists(): return None candidates = sorted(RUNS_DIR.glob("*/graph_state.json"), reverse=True) for state_file in candidates: # Incomplete = has graph_state.json but no run_metadata.json (final archive) if not (state_file.parent / "run_metadata.json").exists(): return state_file return None def setup_run_dir() -> Path: """Create a timestamped run directory.""" run_dir = RUNS_DIR / datetime.now().strftime("%Y-%m-%dT%H-%M-%S") run_dir.mkdir(parents=True, exist_ok=True) return run_dir async def async_main() -> None: config = load_config() agent_cfg = config["agent"] # Check for resumable run resumable = find_resumable_run() resume_phase = 1 run_dir: Path | None = None graph: EvidenceGraph | None = None if resumable: print(f"Found incomplete run: {resumable.parent.name}") try: data = json.loads(resumable.read_text()) ph_count = len(data.get("phenomena", {})) hyp_count = len(data.get("hypotheses", {})) print(f" ({ph_count} phenomena, {hyp_count} hypotheses)") except Exception: pass print("Resume? [y/N] ", end="", flush=True) choice = input().strip().lower() if choice == "y": run_dir = resumable.parent graph = EvidenceGraph.load_state(resumable) status = graph.agent_status if any(h.status != "active" for h in graph.hypotheses.values()): resume_phase = 3 # hypotheses exist, resume investigation elif graph.phenomena: resume_phase = 2 # have phenomena, generate hypotheses print(f"Resuming from Phase {resume_phase}...") if run_dir is None: run_dir = setup_run_dir() # Setup logging — colored terminal + detailed log file setup_logging(run_dir) # Save config snapshot shutil.copy2("config.yaml", run_dir / "config.yaml") # Initialize LLM client llm = LLMClient( base_url=agent_cfg["base_url"], api_key=agent_cfg["api_key"], model=agent_cfg["model"], max_tokens=agent_cfg.get("max_tokens", 4096), proxy=agent_cfg.get("proxy", "auto"), ) # Initialize evidence graph if graph is None: # CLI arg takes priority, otherwise interactive prompt cli_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else None image_path, partition_offset = select_image_interactive(cli_dir) graph = EvidenceGraph( case_info=config.get("cfreds_hacking_case", {}), persist_path=run_dir / "graph_state.json", ) graph.image_path = image_path graph.partition_offset = partition_offset graph.extracted_dir = str(run_dir / "extracted") else: graph._persist_path = run_dir / "graph_state.json" # Register all tools with bound image path register_all_tools(graph.image_path, graph.partition_offset, graph, graph.extracted_dir) # Create agent factory factory = AgentFactory(llm, graph) # Run orchestrator orchestrator = Orchestrator(llm, graph, factory, config=config, run_dir=run_dir) try: report = await orchestrator.run(resume_phase=resume_phase) print("\n" + "=" * 60) print("FORENSIC ANALYSIS COMPLETE") print("=" * 60) print(f"Results archived to: {run_dir}") print(report) except AnalysisAborted: print("\n" + "=" * 60) print("ANALYSIS ABORTED — too many consecutive failures") print("=" * 60) print(f"Partial results saved to: {run_dir}") print("Run again to resume from saved state.") except KeyboardInterrupt: print("\nInterrupted. State saved.") print(f"Partial results in: {run_dir}") finally: await llm.close() def main() -> None: asyncio.run(async_main()) if __name__ == "__main__": main()