Consolidates the long-running refit work (DESIGN.md as authoritative spec)
into a single baseline commit. Six stages landed together:
S1 Case + EvidenceSource abstraction; tools parameterised by source_id
(case.py, main.py multi-source bootstrap, .bin extension support)
S2 Grounding gateway in add_phenomenon: verified_facts cite real
ToolInvocation ids; substring / normalised match enforced; agent +
task scope checked. Phenomenon.description split into verified_facts
(grounded) + interpretation (free text). [invocation: inv-xxx]
prefix on every wrapped tool result so the LLM can cite.
S3 Confidence as additive log-odds: edge_type → log10(LR) calibration
table; commutative updates; supported / refuted thresholds derived
from log_odds; hypothesis × evidence matrix view.
S4 iOS plugin: unzip_archive + parse_plist / sqlite_tables /
sqlite_query / parse_ios_keychain / read_idevice_info;
IOSArtifactAgent; SOURCE_TYPE_AGENTS routing.
S5 Cross-source entity resolution: typed identifiers on Entity,
observe_identity gateway, auto coref hypothesis with shared /
conflicting strong/weak LR edges, reversible same_as edges,
actor_clusters() view.
S6 Android partition probe + AndroidArtifactAgent; MediaAgent with
OCR fallback; orchestrator Phase 1 iterates every analysable
source; platform-aware get_triage_agent_type; ReportAgent renders
actor clusters + per-source breakdown.
142 unit tests / 1 skipped — full coverage of the new gateway, log-odds
math, coref hypothesis fall-out, and orchestrator multi-source dispatch.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
150 lines
6.5 KiB
Python
150 lines
6.5 KiB
Python
"""Timeline Agent — connects existing phenomena with temporal edges.
|
|
|
|
Operates on phenomena already in the graph. Does NOT investigate the disk
|
|
image itself. The agent's only useful output is the temporal edges it
|
|
creates between phenomena.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from base_agent import BaseAgent
|
|
from evidence_graph import EvidenceGraph
|
|
from llm_client import LLMClient
|
|
from tool_registry import TOOL_CATALOG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TimelineAgent(BaseAgent):
|
|
name = "timeline"
|
|
role = (
|
|
"Timeline forensic analyst. You build chronological timelines from filesystem "
|
|
"MAC timestamps and correlate events across all phenomena categories in the "
|
|
"evidence graph to reconstruct the sequence of activities on the system."
|
|
)
|
|
mandatory_record_tools = ("add_temporal_edge",)
|
|
|
|
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
|
|
super().__init__(llm, graph)
|
|
self._register_tools()
|
|
|
|
def _register_graph_tools(self) -> None:
|
|
"""Restrict to read-only graph tools — Timeline does not add phenomena."""
|
|
self._register_graph_read_tools()
|
|
|
|
def _register_tools(self) -> None:
|
|
td = TOOL_CATALOG.get("build_filesystem_timeline")
|
|
if td:
|
|
self.register_tool(td.name, td.description, td.input_schema, td.executor)
|
|
|
|
self.register_tool(
|
|
name="get_timestamped_phenomena",
|
|
description=(
|
|
"Get all phenomena that have timestamps, sorted chronologically. "
|
|
"Returns each phenomenon's id, category, title, and a short description "
|
|
"preview. Use this as your primary input for temporal correlation."
|
|
),
|
|
input_schema={"type": "object", "properties": {}},
|
|
executor=self._get_timestamped_phenomena,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="add_temporal_edge",
|
|
description=(
|
|
"Add a temporal relationship edge between two existing phenomena. "
|
|
"Use 'before' when source phenomenon happened before target, "
|
|
"'concurrent' when they occurred within seconds of each other."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"source_id": {"type": "string", "description": "ID of the earlier/source phenomenon."},
|
|
"target_id": {"type": "string", "description": "ID of the later/target phenomenon."},
|
|
"relation": {
|
|
"type": "string",
|
|
"enum": ["before", "after", "concurrent"],
|
|
"description": "Temporal relationship.",
|
|
},
|
|
},
|
|
"required": ["source_id", "target_id", "relation"],
|
|
},
|
|
executor=self._add_temporal_edge,
|
|
)
|
|
|
|
def _build_system_prompt(self, task: str) -> str:
|
|
"""Focused prompt — Timeline connects existing phenomena, doesn't investigate."""
|
|
return (
|
|
f"You are {self.name}, a forensic timeline correlation analyst.\n"
|
|
f"Role: {self.role}\n\n"
|
|
f"Image: {self.graph.image_path}\n"
|
|
f"Current state: {self.graph.stats_summary()}\n\n"
|
|
f"Your task: {task}\n\n"
|
|
f"WORKFLOW:\n"
|
|
f"1. Call build_filesystem_timeline once to materialize MAC times for the disk.\n"
|
|
f"2. Call get_timestamped_phenomena to see all phenomena with timestamps, "
|
|
f"sorted chronologically. THIS IS YOUR PRIMARY INPUT.\n"
|
|
f"3. For each meaningful temporal relationship between phenomena, call "
|
|
f"add_temporal_edge(source_id, target_id, relation). Use 'before' when "
|
|
f"source happened first (the common case); 'concurrent' for events within "
|
|
f"a few seconds of each other.\n"
|
|
f" Examples of meaningful connections:\n"
|
|
f" - 'Cain installer executed' (before) 'Cain.exe first execution'\n"
|
|
f" - 'WHOIS first lookup' (before) 'WHOIS second lookup'\n"
|
|
f" - 'Recon tool cluster' (before) 'Anti-forensics defrag'\n"
|
|
f" - 'Tool installation' (before) 'Tool execution'\n"
|
|
f"4. Aim for 15-40 temporal edges that connect the major events into a "
|
|
f"forensic story.\n"
|
|
f"5. STOP after recording all meaningful temporal edges. Do not call any more tools.\n\n"
|
|
f"STRICT BOUNDARIES:\n"
|
|
f"- Your job is to CONNECT existing phenomena, NOT to discover new ones. "
|
|
f"You CANNOT call add_phenomenon — the tool isn't yours.\n"
|
|
f"- Use ONLY phenomenon IDs returned by get_timestamped_phenomena or "
|
|
f"list_phenomena. NEVER fabricate IDs.\n"
|
|
f"- Connect events that tell a forensic story (recon -> exploit -> cover-up). "
|
|
f"Do not exhaustively pair every two phenomena; focus on causally-relevant "
|
|
f"sequences.\n"
|
|
f"- The orchestrator handles report writing in the next phase. Your only "
|
|
f"output that propagates is the temporal edges you create."
|
|
)
|
|
|
|
async def _get_timestamped_phenomena(self) -> str:
|
|
items = [
|
|
ph for ph in self.graph.phenomena.values()
|
|
if ph.timestamp
|
|
]
|
|
items.sort(key=lambda ph: ph.timestamp or "")
|
|
|
|
if not items:
|
|
return "No phenomena with timestamps found."
|
|
|
|
lines = []
|
|
for ph in items:
|
|
lines.append(f"{ph.timestamp} | [{ph.category}] {ph.title} ({ph.id})")
|
|
preview = ph.interpretation[:150] if ph.interpretation else ""
|
|
if ph.verified_facts:
|
|
fact_preview = ", ".join(
|
|
f"{f.get('type','?')}={str(f.get('value',''))[:40]}"
|
|
for f in ph.verified_facts[:3]
|
|
)
|
|
preview = f"{preview} [facts: {fact_preview}]" if preview else f"[facts: {fact_preview}]"
|
|
if preview:
|
|
lines.append(f" {preview}")
|
|
return "\n".join(lines)
|
|
|
|
async def _add_temporal_edge(
|
|
self, source_id: str, target_id: str, relation: str,
|
|
) -> str:
|
|
try:
|
|
await self.graph.add_edge(
|
|
source_id=source_id,
|
|
target_id=target_id,
|
|
edge_type="temporal",
|
|
metadata={"relation": relation},
|
|
created_by=self.name,
|
|
)
|
|
return f"Temporal edge added: {source_id} —[{relation}]→ {target_id}"
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|