feat(refit): complete S1-S6 — case abstraction, grounding, log-odds, plugins, coref, multi-source

Consolidates the long-running refit work (DESIGN.md as authoritative spec)
into a single baseline commit. Six stages landed together:

  S1  Case + EvidenceSource abstraction; tools parameterised by source_id
      (case.py, main.py multi-source bootstrap, .bin extension support)
  S2  Grounding gateway in add_phenomenon: verified_facts cite real
      ToolInvocation ids; substring / normalised match enforced; agent +
      task scope checked. Phenomenon.description split into verified_facts
      (grounded) + interpretation (free text). [invocation: inv-xxx]
      prefix on every wrapped tool result so the LLM can cite.
  S3  Confidence as additive log-odds: edge_type → log10(LR) calibration
      table; commutative updates; supported / refuted thresholds derived
      from log_odds; hypothesis × evidence matrix view.
  S4  iOS plugin: unzip_archive + parse_plist / sqlite_tables /
      sqlite_query / parse_ios_keychain / read_idevice_info;
      IOSArtifactAgent; SOURCE_TYPE_AGENTS routing.
  S5  Cross-source entity resolution: typed identifiers on Entity,
      observe_identity gateway, auto coref hypothesis with shared /
      conflicting strong/weak LR edges, reversible same_as edges,
      actor_clusters() view.
  S6  Android partition probe + AndroidArtifactAgent; MediaAgent with
      OCR fallback; orchestrator Phase 1 iterates every analysable
      source; platform-aware get_triage_agent_type; ReportAgent renders
      actor clusters + per-source breakdown.

142 unit tests / 1 skipped — full coverage of the new gateway, log-odds
math, coref hypothesis fall-out, and orchestrator multi-source dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-21 02:12:10 -10:00
parent 444d58726a
commit 81ade8f7ac
24 changed files with 5137 additions and 244 deletions

226
case.py Normal file
View File

@@ -0,0 +1,226 @@
"""Case and evidence-source model — the foundation for multi-evidence analysis.
A :class:`Case` is a collection of :class:`EvidenceSource` entries. Each source
has a *type* (disk image, mobile extraction, archive, ...) and an *access mode*
that determines how forensic tools reach its contents:
- ``"image"`` — a block device / disk image, navigated by The Sleuth Kit via
inode addressing (raw, E01, dd, ...).
- ``"tree"`` — an already-mounted filesystem or unpacked extraction,
navigated by ordinary filesystem paths.
This module is pure data model + loading. Partition probing and interactive
selection live in ``main.py``.
"""
from __future__ import annotations
import logging
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
# Recognised source types and access modes.
SOURCE_TYPES = {"disk_image", "mobile_extraction", "archive", "media_collection"}
ACCESS_MODES = {"image", "tree"}
# Disk-image file extensions for interactive discovery.
# P6 fix: ``.bin`` (and vmdk/vhd) added — extension globbing previously missed
# raw block-device dumps such as ``blk0_sda.bin``.
DISK_IMAGE_EXTS = {
".001", ".dd", ".raw", ".img", ".bin", ".e01", ".iso", ".vmdk", ".vhd",
}
# Default access mode per source type.
_DEFAULT_ACCESS_MODE = {
"disk_image": "image",
"mobile_extraction": "tree",
"archive": "tree",
"media_collection": "tree",
}
def slugify(text: str) -> str:
"""Reduce *text* to a lowercase, hyphen-separated slug for use in IDs."""
slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-")
return slug or "src"
@dataclass
class EvidenceSource:
"""One piece of evidence within a :class:`Case`."""
id: str # "src-<slug>"
label: str # human-readable name
type: str # one of SOURCE_TYPES
path: str # filesystem path to the evidence
access_mode: str # "image" | "tree"
owner: str = "" # associated person, if known
partition_offset: int = 0 # sector offset (image-mode sources only)
meta: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> EvidenceSource:
"""Reconstruct from a dict, ignoring unknown keys (forward-compatible)."""
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
def summary(self) -> str:
loc = (
f"@{self.partition_offset}"
if self.access_mode == "image" and self.partition_offset
else ""
)
owner = f" owner={self.owner}" if self.owner else ""
return f"[{self.id}] {self.label} ({self.type}/{self.access_mode}{loc}){owner}"
@dataclass
class Case:
"""A forensic case: a set of evidence sources plus metadata."""
case_id: str
name: str
sources: list[EvidenceSource] = field(default_factory=list)
meta: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"case_id": self.case_id,
"name": self.name,
"sources": [s.to_dict() for s in self.sources],
"meta": dict(self.meta),
}
@classmethod
def from_dict(cls, d: dict) -> Case:
return cls(
case_id=d.get("case_id", ""),
name=d.get("name", ""),
sources=[EvidenceSource.from_dict(s) for s in d.get("sources", [])],
meta=d.get("meta", {}),
)
def get_source(self, source_id: str) -> EvidenceSource | None:
for s in self.sources:
if s.id == source_id:
return s
return None
# ---------------------------------------------------------------------------
# case.yaml loading
# ---------------------------------------------------------------------------
def _build_source(raw: dict, base_dir: Path, index: int) -> EvidenceSource:
"""Validate and normalise one source entry from case.yaml.
Missing ``id`` is derived from the label; missing ``access_mode`` defaults
by type; relative paths are resolved against *base_dir* (the case file's
directory).
"""
label = str(raw.get("label") or raw.get("id") or f"source-{index}")
src_type = str(raw.get("type", "disk_image"))
if src_type not in SOURCE_TYPES:
logger.warning("Unknown source type %r for %r — treating as disk_image",
src_type, label)
src_type = "disk_image"
access_mode = str(raw.get("access_mode") or _DEFAULT_ACCESS_MODE.get(src_type, "tree"))
if access_mode not in ACCESS_MODES:
logger.warning("Unknown access_mode %r for %r — defaulting", access_mode, label)
access_mode = _DEFAULT_ACCESS_MODE.get(src_type, "tree")
src_id = str(raw.get("id") or f"src-{slugify(label)}")
if not src_id.startswith("src-"):
src_id = f"src-{slugify(src_id)}"
raw_path = str(raw.get("path", "")).strip()
path = raw_path
if raw_path:
p = Path(raw_path).expanduser()
if not p.is_absolute():
p = (base_dir / p)
path = str(p)
return EvidenceSource(
id=src_id,
label=label,
type=src_type,
path=path,
access_mode=access_mode,
owner=str(raw.get("owner", "")),
partition_offset=int(raw.get("partition_offset", 0) or 0),
meta=dict(raw.get("meta", {})),
)
def build_case(data: dict, base_dir: Path | None = None) -> Case:
"""Build a validated :class:`Case` from a loosely-typed case.yaml dict."""
base_dir = base_dir or Path.cwd()
sources: list[EvidenceSource] = []
seen_ids: set[str] = set()
for i, raw in enumerate(data.get("sources", []) or []):
if not isinstance(raw, dict):
logger.warning("Skipping malformed source entry #%d", i)
continue
src = _build_source(raw, base_dir, i)
if src.id in seen_ids:
src.id = f"{src.id}-{i}"
seen_ids.add(src.id)
if not src.path:
logger.warning("Source %r has no path — keeping but it is not analysable",
src.label)
sources.append(src)
return Case(
case_id=str(data.get("case_id", "case")),
name=str(data.get("name", "Untitled case")),
sources=sources,
meta=dict(data.get("meta", {})),
)
def load_case(path: str | Path = "case.yaml") -> Case | None:
"""Load a :class:`Case` from a case.yaml file. Returns None if absent."""
case_path = Path(path)
if not case_path.exists():
return None
import yaml
try:
data = yaml.safe_load(case_path.read_text()) or {}
except Exception as e:
logger.error("Failed to parse %s: %s", case_path, e)
return None
if not isinstance(data, dict):
logger.error("%s is not a YAML mapping", case_path)
return None
case = build_case(data, base_dir=case_path.resolve().parent)
logger.info("Loaded case %r with %d source(s) from %s",
case.name, len(case.sources), case_path)
return case
def single_source_case(
image_path: str,
partition_offset: int = 0,
label: str | None = None,
) -> Case:
"""Wrap a single disk image as a one-source Case (interactive fallback)."""
name = label or Path(image_path).name
src = EvidenceSource(
id=f"src-{slugify(Path(image_path).stem)}",
label=name,
type="disk_image",
path=image_path,
access_mode="image",
partition_offset=partition_offset,
)
return Case(case_id="adhoc", name=name, sources=[src])