"""Base class for forensic analysis agents.""" from __future__ import annotations import json import logging import time import uuid from typing import Any from evidence_graph import EvidenceGraph from llm_client import LLMClient logger = logging.getLogger(__name__) def _log(msg: str, **extra) -> None: """Emit a structured log message with extra fields.""" logger.info(msg, extra=extra) class BaseAgent: """Base class for all forensic agents. Each agent has: - A name and role description - A set of tools it can use (registered as methods) - Access to the shared EvidenceGraph - An LLM client for reasoning and tool-calling """ name: str = "base" role: str = "A forensic analysis agent." # Tools the agent MUST invoke at least once for the run to count as productive. # If none of these were called when tool_call_loop returns, run() fires a # forced retry with an explicit "you forgot to record" instruction. # Subclasses override to declare their own recording responsibility # (timeline → add_temporal_edge, hypothesis → add_hypothesis, report → save_report). # observe_identity (S5) counts as a recording too — it writes through the # same grounding gateway and produces an identity_observation phenomenon. mandatory_record_tools: tuple[str, ...] = ("add_phenomenon", "observe_identity") # Tools whose invocation ends the run immediately. After any terminal tool # is called, tool_call_loop returns with that tool's result text as # final_text. Used by agents whose "completion" is a single explicit # action rather than "model decides to stop calling tools". For multi-call # agents (filesystem records many phenomena) leave empty. terminal_tools: tuple[str, ...] = () def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None: self.llm = llm self.graph = graph self._tools: dict[str, dict] = {} # name -> schema self._executors: dict[str, Any] = {} # name -> async callable self._record_call_counts: dict[str, int] = {} self._work_log: list[str] = [] self._current_lead_id: str | None = None def register_tool( self, name: str, description: str, input_schema: dict, executor: Any, ) -> None: """Register a tool that this agent can use.""" self._tools[name] = { "name": name, "description": description, "input_schema": input_schema, } if name in self.mandatory_record_tools: self._executors[name] = self._wrap_record_executor(name, executor) else: self._executors[name] = executor def _wrap_record_executor(self, name: str, executor: Any) -> Any: """Wrap a mandatory-record executor to count successful invocations.""" async def wrapped(*args, **kwargs): result = await executor(*args, **kwargs) self._record_call_counts[name] = self._record_call_counts.get(name, 0) + 1 return result return wrapped def get_tool_definitions(self) -> list[dict]: """Get tool definitions in Claude API format.""" return list(self._tools.values()) def _build_system_prompt(self, task: str) -> str: """Build the system prompt — lightweight stats, no full evidence dump.""" work_log_section = "" if self._work_log: entries = self._work_log[-5:] log_lines = "\n".join(f" {i+1}. {entry}" for i, entry in enumerate(entries)) work_log_section = ( f"\nYour prior work on this investigation:\n{log_lines}\n" f"Avoid repeating tools/approaches that already succeeded or failed. Build on prior findings.\n" ) return ( f"You are {self.name}, a specialized digital forensics agent.\n" f"Role: {self.role}\n\n" f"You are analyzing a disk image as part of a multi-agent forensic investigation.\n" f"Image: {self.graph.image_path}\n\n" f"Current investigation state:\n{self.graph.stats_summary()}\n" f"{work_log_section}\n" f"Your current task: {task}\n\n" f"CRITICAL WORKFLOW — you MUST follow these steps IN ORDER, one phase at a time:\n\n" f"Phase A — INVESTIGATE:\n" f" Use list_phenomena/search_graph to review existing findings.\n" f" Call list_assets to see what files are already extracted.\n" f" Call investigation tools (list_directory, parse_registry_key, etc.) to gather data.\n" f" Only extract_file for forensically relevant files (user data, logs, configs, hives) — NOT system DLLs or OS files.\n" f" Create add_lead for anything outside your expertise.\n\n" f"Phase B — RECORD PHENOMENA (GROUNDED):\n" f" For EACH significant finding from Phase A, call add_phenomenon with:\n" f" * interpretation: your analysis — free text, NOT verified.\n" f" * verified_facts: one entry per concrete atom (path, timestamp,\n" f" inode, hash, identifier, count) you want recorded as truth.\n" f" Each entry MUST have:\n" f" - type: e.g. 'path', 'timestamp', 'inode', 'hash', 'identifier', 'count'\n" f" - value: a VERBATIM substring from the tool output\n" f" - invocation_id: the inv-xxx ID from the '[invocation: inv-xxx]'\n" f" header at the top of the tool result that produced this value\n" f" IDENTIFIERS — call observe_identity (in ADDITION to add_phenomenon)\n" f" whenever you see an email, phone number, Apple ID, IMEI, wallet\n" f" address, MAC, UDID, persistent nickname, or display name. Same\n" f" grounding contract: value must be verbatim in the cited tool\n" f" output. This is HOW cross-source attribution gets built — without\n" f" it, we can't tell whether the Apple ID in keychain belongs to the\n" f" same person as the Windows account on the USB.\n" f" Do NOT call link_to_entity yet — just record all phenomena first.\n\n" f"Phase C — LINK ENTITIES:\n" f" FIRST call list_phenomena to get the current IDs — do NOT rely on memory.\n" f" Then call link_to_entity for each relevant phenomenon.\n" f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n" f"Phase D — STOP:\n" f" Once all phenomena are recorded and entities linked, you are DONE.\n" f" Do not call any more tools. The orchestrator picks up automatically.\n\n" f"CRITICAL — RECORDING REQUIREMENT:\n" f"- Only graph mutations propagate to other agents and the final report.\n" f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you stop.\n" f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, " f"a registry key) and found NOTHING, that absence IS evidence — call " f"add_phenomenon with a 'No matches for X' title, the search scope in " f"raw_data, and cite the search tool's invocation_id (verified_facts may " f"be empty for a true negative; the cited invocation in source_tool still " f"anchors it). Negative findings constrain the hypothesis space.\n" f"- If you stop without having called add_phenomenon at least once, the task " f"is FAILED and a forced retry will fire.\n\n" f"GROUNDING GATEWAY — STRUCTURALLY ENFORCED:\n" f"- Every tool result begins with '[invocation: inv-xxxxxxxx]' — that ID\n" f" is what you cite in each fact's invocation_id.\n" f"- fact.value must be a substring of the cited invocation's output.\n" f" Case, whitespace, and path-separator (/ ↔ \\) variants are tolerated;\n" f" anything else fabricated is REJECTED with a per-fact reason.\n" f"- On REJECTED: quote the literal text from the output (or drop the\n" f" fact), and put guesses / inferred paths / model names in\n" f" `interpretation` instead. Then call add_phenomenon again.\n" f"- You may cite ONLY invocations made within THIS task." ) async def run(self, task: str, lead_id: str | None = None) -> str: """Run this agent with a specific task.""" _log(task, event="agent_start", agent=self.name) self.graph.agent_status[self.name] = "running" self.graph._current_agent = self.name # Fresh task scope per agent run. Used by the grounding gateway to # check that facts in add_phenomenon cite invocations made *within # this run* — preventing the agent from forwarding stale IDs from # earlier work or another agent. self.graph._current_task_id = f"task-{uuid.uuid4().hex[:8]}" self._current_lead_id = lead_id self._register_graph_tools() self._record_call_counts.clear() system = self._build_system_prompt(task) messages = [{"role": "user", "content": task}] t0 = time.monotonic() ph_before = len(self.graph.phenomena) try: final_text, conversation = await self.llm.tool_call_loop( messages=messages, tools=self.get_tool_definitions(), tool_executor=self._executors, system=system, terminal_tools=self.terminal_tools, ) # Forced-record retry: if the agent has any mandatory recording # tools but never invoked any of them, force one more round with # an explicit "you forgot to record" instruction. The mandatory # set is declared on the class — Timeline → add_temporal_edge, # Hypothesis → add_hypothesis, ReportAgent → (). For agents with # empty mandatory_record_tools this branch is a no-op. registered_mandatory = [ t for t in self.mandatory_record_tools if t in self._executors ] recorded_any = any( self._record_call_counts.get(t, 0) > 0 for t in registered_mandatory ) if registered_mandatory and not recorded_any: missing = "/".join(registered_mandatory) logger.warning( "[%s] finished without calling any of [%s] — forcing RECORD retry", self.name, missing, ) conversation.append({ "role": "user", "content": ( f"STOP. You produced an answer without ever calling " f"{missing}. Your answer is DISCARDED — only graph " f"mutations propagate to other agents and the final " f"report.\n\n" f"You MUST now call {missing} for every significant " f"finding from your prior investigation, including " f"exact identifiers, timestamps, and the source_tool " f"that produced each finding. If you genuinely found " f"NOTHING noteworthy, call the recording tool ONCE " f"with a 'No significant findings' style entry " f"summarizing what you searched.\n\n" f"Do not run more investigation tools. Just record " f"what you already found. Then end." ), }) final_text, _ = await self.llm.tool_call_loop( messages=conversation, tools=self.get_tool_definitions(), tool_executor=self._executors, system=system, max_iterations=10, terminal_tools=self.terminal_tools, ) self._work_log.append(f"[Task: {task[:80]}] -> {final_text[:150]}") except Exception: self.graph.agent_status[self.name] = "failed" logger.error("[%s] Failed during task execution", self.name, exc_info=True) raise self.graph.agent_status[self.name] = "completed" elapsed = time.monotonic() - t0 new_ph = len(self.graph.phenomena) - ph_before _log(f"+{new_ph} phenomena, {len(final_text)} chars", event="agent_done", agent=self.name, elapsed=elapsed) return final_text # ---- Graph interaction tools -------------------------------------------- def _register_graph_tools(self) -> None: """Register graph query + mutation tools. Subclasses can override to restrict the toolset. For example, a read-only agent (hypothesis, report) overrides this to skip _register_graph_write_tools. """ self._register_graph_read_tools() self._register_graph_write_tools() def _register_graph_read_tools(self) -> None: """Register read-only graph + asset query tools.""" self.register_tool( name="list_phenomena", description=( "List all phenomena (evidence artifacts) on the graph. " "Returns one-line summaries with IDs. Use get_phenomenon(id) for full details." ), input_schema={ "type": "object", "properties": { "category": { "type": "string", "description": "Filter by category (filesystem, registry, email, network, timeline). Omit for all.", }, }, }, executor=self._list_phenomena, ) self.register_tool( name="get_phenomenon", description="Get full details of a specific phenomenon by ID, including raw_data.", input_schema={ "type": "object", "properties": { "id": {"type": "string", "description": "Phenomenon ID (e.g. 'ph-a1b2c3d4')."}, }, "required": ["id"], }, executor=self._get_phenomenon, ) self.register_tool( name="search_graph", description="Search across phenomena, hypotheses, and entities by keyword. Returns matching summaries.", input_schema={ "type": "object", "properties": { "keyword": {"type": "string", "description": "Search keyword."}, }, "required": ["keyword"], }, executor=self._search_graph, ) self.register_tool( name="get_related", description="Get all nodes connected to a given node via edges. Returns summaries and edge types.", input_schema={ "type": "object", "properties": { "node_id": {"type": "string", "description": "Any node ID (ph-*, hyp-*, ent-*)."}, }, "required": ["node_id"], }, executor=self._get_related, ) self.register_tool( name="get_hypothesis_status", description="Get current status and confidence of all hypotheses being investigated.", input_schema={"type": "object", "properties": {}}, executor=self._get_hypothesis_status, ) self.register_tool( name="list_assets", description=( "List all files extracted from the disk image. " "Shows filename, category, size, local path, and inode. " "Check this before calling extract_file to avoid re-extraction." ), input_schema={ "type": "object", "properties": { "category": { "type": "string", "enum": [ "registry_hive", "chat_log", "prefetch", "network_capture", "config_file", "address_book", "recycle_bin", "executable", "text_log", "other", ], "description": "Filter by category. Omit to list all.", }, }, }, executor=self._list_assets, ) self.register_tool( name="find_extracted_file", description=( "Find an already-extracted file by inode or filename. " "Returns the local path so you can use it directly with " "parse_registry_key, read_text_file, etc. without re-extracting." ), input_schema={ "type": "object", "properties": { "inode": {"type": "string", "description": "Inode to look up."}, "filename": {"type": "string", "description": "Filename or partial name to search."}, }, }, executor=self._find_extracted_file, ) def _register_graph_write_tools(self) -> None: """Register graph mutation tools (add_phenomenon, add_lead, link_to_entity).""" self.register_tool( name="add_phenomenon", description=( "Record a forensic finding on the evidence graph. The finding is " "split into provenance-bound atoms (verified_facts) and free-form " "analysis (interpretation). Each fact MUST cite the invocation_id " "of a tool call you made in THIS task — the gateway checks every " "fact's value against that call's real output, byte-for-byte. " "Any fact that fails grounding causes the whole record to be " "rejected with a list of failures; fix the facts and call again." ), input_schema={ "type": "object", "properties": { "category": {"type": "string", "description": "Category of the finding."}, "title": {"type": "string", "description": "Short title."}, "interpretation": { "type": "string", "description": ( "Free-form analysis text — your reasoning, why this " "matters, what it implies. NOT verified by the gateway. " "Rendered in reports as 'agent analysis', not truth." ), }, "verified_facts": { "type": "array", "description": ( "Atoms you want preserved as ground truth. Each must " "appear verbatim in the cited tool output." ), "items": { "type": "object", "properties": { "type": { "type": "string", "description": ( "Kind of fact: path, timestamp, inode, " "hash, identifier, count, raw, ..." ), }, "value": { "type": "string", "description": ( "Verbatim substring from the cited tool " "output. The gateway does a literal " "string-in-string check — no paraphrasing." ), }, "invocation_id": { "type": "string", "description": ( "ID from the '[invocation: inv-xxx]' header " "of the tool call that produced this value." ), }, }, "required": ["type", "value", "invocation_id"], }, }, "raw_data": {"type": "object", "description": "Structured raw data supporting this finding."}, "timestamp": {"type": "string", "description": "Timestamp if any. ONLY use timestamps from tool output."}, "source_tool": {"type": "string", "description": "Name of the tool that produced this (e.g. 'list_directory')."}, }, "required": ["category", "title", "source_tool"], }, executor=self._add_phenomenon, ) self.register_tool( name="add_lead", description="Create an investigative lead for another agent to follow up on.", input_schema={ "type": "object", "properties": { "target_agent": { "type": "string", "enum": ["filesystem", "registry", "communication", "network", "timeline"], "description": "Which agent should handle this lead.", }, "description": {"type": "string", "description": "What should be investigated."}, "priority": {"type": "integer", "description": "Priority 1 (highest) to 10 (lowest). Default 5."}, }, "required": ["target_agent", "description"], }, executor=self._add_lead, ) self.register_tool( name="link_to_entity", description=( "Link a phenomenon to a named entity (person, program, host, etc). " "Creates the entity if it doesn't exist." ), input_schema={ "type": "object", "properties": { "phenomenon_id": {"type": "string", "description": "Phenomenon ID to link from."}, "entity_name": {"type": "string", "description": "Name of the entity (e.g. 'Mr. Evil', 'mIRC.exe')."}, "entity_type": { "type": "string", "enum": ["person", "program", "file", "host", "ip_address"], "description": "Type of entity.", }, "edge_type": { "type": "string", "enum": ["created_by", "executed_by", "owned_by", "targets", "associated_with", "found_on", "used_by"], "description": "Relationship type.", }, }, "required": ["phenomenon_id", "entity_name", "entity_type", "edge_type"], }, executor=self._link_to_entity, ) self.register_tool( name="observe_identity", description=( "Record a typed identifier (email / phone / Apple ID / IMEI / " "wallet address / nickname / display name / …) for an entity. " "Goes through the same grounding gateway as add_phenomenon — " "value MUST be a verbatim substring of the cited tool output. " "After attachment, the engine automatically proposes / " "strengthens / weakens cross-source coreference hypotheses " "between this entity and any others carrying the same or " "conflicting identifiers. This is how 'is the Apple ID in iOS " "keychain the same person as the Windows login name?' gets " "answered. Call this in ADDITION to add_phenomenon for " "identifier-bearing findings." ), input_schema={ "type": "object", "properties": { "entity_name": {"type": "string", "description": "Human-readable entity name (e.g. 'LEUNG YL', 'alice@example.com')."}, "entity_type": { "type": "string", "enum": ["person", "program", "file", "host", "ip_address"], "description": "Kind of entity this identifier belongs to (usually 'person').", }, "identifier_type": { "type": "string", "description": ( "Strong (near-unique): email, phone_number, imei, " "imsi, apple_id, icloud_id, google_account, " "wallet_address, udid, mac_address, device_serial. " "Weak (free-form, may collide): nickname, " "display_name, username, screen_name." ), }, "value": { "type": "string", "description": ( "The identifier value, quoted VERBATIM from the " "tool output you cite in invocation_id." ), }, "invocation_id": { "type": "string", "description": ( "ID from the '[invocation: inv-xxx]' header of " "the tool call that surfaced this identifier." ), }, "source_tool": { "type": "string", "description": "Name of the tool that produced the identifier.", }, }, "required": [ "entity_name", "entity_type", "identifier_type", "value", "invocation_id", ], }, executor=self._observe_identity, ) # ---- Tool executors ----------------------------------------------------- async def _list_phenomena(self, category: str | None = None) -> str: results = self.graph.list_phenomena(category) if not results: return "No phenomena recorded yet." if not category else f"No phenomena in category '{category}'." return "\n".join(results) async def _get_phenomenon(self, id: str) -> str: data = self.graph.get_phenomenon(id) if data is None: return f"Phenomenon not found: {id}" return json.dumps(data, ensure_ascii=False, indent=2) async def _search_graph(self, keyword: str) -> str: results = self.graph.search_graph(keyword) if not results: return f"No matches for '{keyword}'." return "\n".join(results) async def _get_related(self, node_id: str) -> str: results = self.graph.get_related(node_id) if not results: return f"No connections found for {node_id}." lines = [] for r in results: lines.append(f" {r['direction']} [{r['edge_type']}] → {r['node']}") return "\n".join(lines) async def _get_hypothesis_status(self) -> str: results = self.graph.get_hypothesis_status() if not results: return "No hypotheses defined yet." return "\n".join(results) async def _add_phenomenon( self, category: str, title: str, interpretation: str = "", verified_facts: list[dict] | None = None, raw_data: dict | None = None, timestamp: str | None = None, source_tool: str = "", # Back-compat: older prompts (and accidental LLM emissions) may pass # ``description``; treat it as ``interpretation`` rather than failing. description: str | None = None, ) -> str: if description and not interpretation: interpretation = description # GroundingError propagates: llm_client._execute_single_tool turns # raised exceptions into "Error executing add_phenomenon: " tool # results the LLM sees, and _wrap_record_executor does NOT increment # the mandatory-record counter (the increment only runs after a # successful return), so the forced-retry mechanism still fires if # the agent never lands a grounded phenomenon. pid, merged = await self.graph.add_phenomenon( source_agent=self.name, category=category, title=title, interpretation=interpretation, verified_facts=verified_facts, raw_data=raw_data, timestamp=timestamp, source_tool=source_tool, from_lead_id=self._current_lead_id, ) if merged: return f"Phenomenon merged into existing: {pid} — {title} (corroboration boost)" return f"Phenomenon recorded: {pid} — {title}" async def _add_lead( self, target_agent: str, description: str, priority: int = 5, ) -> str: lid = await self.graph.add_lead( target_agent=target_agent, description=description, priority=priority, ) return f"Lead created: {lid} — [{target_agent}] {description}" async def _link_to_entity( self, phenomenon_id: str, entity_name: str, entity_type: str, edge_type: str, ) -> str: # Validate phenomenon exists before creating entity if not self.graph._node_exists(phenomenon_id): return ( f"Error: phenomenon '{phenomenon_id}' not found. " f"Call list_phenomena first to get valid IDs." ) eid, existing = await self.graph.add_entity(entity_name, entity_type) await self.graph.add_edge( source_id=phenomenon_id, target_id=eid, edge_type=edge_type, created_by=self.name, ) status = "linked to existing" if existing else "created and linked" return f"Entity {status}: {entity_name} ({entity_type}) ←[{edge_type}]— {phenomenon_id}" async def _observe_identity( self, entity_name: str, entity_type: str, identifier_type: str, value: str, invocation_id: str, source_tool: str = "", ) -> str: # GroundingError / ValueError propagate to llm_client's per-tool # exception handler, which formats them back to the LLM. That keeps # the mandatory-record counter honest — only a successful return # triggers the increment in _wrap_record_executor. result = await self.graph.observe_identity( entity_name=entity_name, entity_type=entity_type, identifier_type=identifier_type, value=value, source_agent=self.name, source_tool=source_tool, invocation_id=invocation_id, ) lines = [ f"Identity observed: {identifier_type}={value} " f"on entity {result['entity_id']} ({entity_name})." ] if result.get("new_identifier"): lines.append( f" Observation phenomenon: {result['phenomenon_id']}" ) else: lines.append(" (identifier already recorded on this entity — idempotent)") for prop in result.get("coref_proposals", []): lines.append( f" → Coref candidate: {prop['other_entity_id']} via " f"{prop['match']['edge_type']} (conf={prop['confidence']:.2f}, " f"hypothesis={prop['hypothesis_id']})" ) for c in prop.get("conflicts", []): lines.append( f" ⚠ conflict on {c['type']}: " f"{c['new_value']} vs {c['other_value']}" ) return "\n".join(lines) async def _list_assets(self, category: str | None = None) -> str: results = self.graph.list_assets(category) if not results: return "No files extracted yet." if not category else f"No assets in category '{category}'." return "\n".join(results) async def _find_extracted_file( self, inode: str | None = None, filename: str | None = None, ) -> str: if inode: asset = self.graph.lookup_asset_by_inode(inode) if asset: return ( f"Found: {asset.local_path} " f"({asset.size_bytes} bytes, {asset.category}, inode:{asset.inode})" ) return f"No extracted file with inode {inode}." if filename: results = self.graph.query_assets(filename_pattern=filename) if not results: return f"No extracted files matching '{filename}'." lines = [f"Found {len(results)} matching file(s):"] for a in results: lines.append(f" {a.local_path} (inode:{a.inode}, {a.size_bytes} bytes, {a.category})") return "\n".join(lines) return "Provide either inode or filename to search."