"""Base class for forensic analysis agents.""" from __future__ import annotations import json import logging import time from typing import Any from evidence_graph import EvidenceGraph from llm_client import LLMClient logger = logging.getLogger(__name__) def _log(msg: str, **extra) -> None: """Emit a structured log message with extra fields.""" logger.info(msg, extra=extra) class BaseAgent: """Base class for all forensic agents. Each agent has: - A name and role description - A set of tools it can use (registered as methods) - Access to the shared EvidenceGraph - An LLM client for reasoning and tool-calling """ name: str = "base" role: str = "A forensic analysis agent." # Tools the agent MUST invoke at least once for the run to count as productive. # If none of these were called when tool_call_loop returns, run() fires a # forced retry with an explicit "you forgot to record" instruction. # Subclasses override to declare their own recording responsibility # (timeline → add_temporal_edge, hypothesis → add_hypothesis, report → save_report). mandatory_record_tools: tuple[str, ...] = ("add_phenomenon",) # Tools whose invocation ends the run immediately. After any terminal tool # is called, tool_call_loop returns with that tool's result text as # final_text. Used by agents whose "completion" is a single explicit # action rather than "model decides to stop calling tools". For multi-call # agents (filesystem records many phenomena) leave empty. terminal_tools: tuple[str, ...] = () def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None: self.llm = llm self.graph = graph self._tools: dict[str, dict] = {} # name -> schema self._executors: dict[str, Any] = {} # name -> async callable self._record_call_counts: dict[str, int] = {} self._work_log: list[str] = [] self._current_lead_id: str | None = None def register_tool( self, name: str, description: str, input_schema: dict, executor: Any, ) -> None: """Register a tool that this agent can use.""" self._tools[name] = { "name": name, "description": description, "input_schema": input_schema, } if name in self.mandatory_record_tools: self._executors[name] = self._wrap_record_executor(name, executor) else: self._executors[name] = executor def _wrap_record_executor(self, name: str, executor: Any) -> Any: """Wrap a mandatory-record executor to count successful invocations.""" async def wrapped(*args, **kwargs): result = await executor(*args, **kwargs) self._record_call_counts[name] = self._record_call_counts.get(name, 0) + 1 return result return wrapped def get_tool_definitions(self) -> list[dict]: """Get tool definitions in Claude API format.""" return list(self._tools.values()) def _build_system_prompt(self, task: str) -> str: """Build the system prompt — lightweight stats, no full evidence dump.""" work_log_section = "" if self._work_log: entries = self._work_log[-5:] log_lines = "\n".join(f" {i+1}. {entry}" for i, entry in enumerate(entries)) work_log_section = ( f"\nYour prior work on this investigation:\n{log_lines}\n" f"Avoid repeating tools/approaches that already succeeded or failed. Build on prior findings.\n" ) return ( f"You are {self.name}, a specialized digital forensics agent.\n" f"Role: {self.role}\n\n" f"You are analyzing a disk image as part of a multi-agent forensic investigation.\n" f"Image: {self.graph.image_path}\n\n" f"Current investigation state:\n{self.graph.stats_summary()}\n" f"{work_log_section}\n" f"Your current task: {task}\n\n" f"CRITICAL WORKFLOW — you MUST follow these steps IN ORDER, one phase at a time:\n\n" f"Phase A — INVESTIGATE:\n" f" Use list_phenomena/search_graph to review existing findings.\n" f" Call list_assets to see what files are already extracted.\n" f" Call investigation tools (list_directory, parse_registry_key, etc.) to gather data.\n" f" Only extract_file for forensically relevant files (user data, logs, configs, hives) — NOT system DLLs or OS files.\n" f" Create add_lead for anything outside your expertise.\n\n" f"Phase B — RECORD PHENOMENA:\n" f" For EACH significant finding from Phase A, call add_phenomenon.\n" f" Do NOT call link_to_entity yet — just record all phenomena first.\n\n" f"Phase C — LINK ENTITIES:\n" f" FIRST call list_phenomena to get the current IDs — do NOT rely on memory.\n" f" Then call link_to_entity for each relevant phenomenon.\n" f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n" f"Phase D — STOP:\n" f" Once all phenomena are recorded and entities linked, you are DONE.\n" f" Do not call any more tools. The orchestrator picks up automatically.\n\n" f"CRITICAL — RECORDING REQUIREMENT:\n" f"- Only graph mutations propagate to other agents and the final report.\n" f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you stop.\n" f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, " f"a registry key) and found NOTHING, that absence IS evidence — call " f"add_phenomenon with a 'No matches for X' title and the search scope in " f"raw_data. Negative findings constrain the hypothesis space and prevent " f"the next agent from wasting time re-searching.\n" f"- If you stop without having called add_phenomenon at least once, the task " f"is FAILED and a forced retry will fire.\n" f"- Include exact file paths, inode numbers, timestamps, and the source_tool " f"that produced each finding.\n\n" f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n" f"- ONLY record findings that appear VERBATIM in tool results you received\n" f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n" f"- If tool output was truncated, state '[truncated]' — do NOT fill in the missing data\n" f"- If you are unsure whether something exists, call a tool to verify or create a lead — do NOT assume\n" f"- Quote exact strings from tool output when recording evidence descriptions\n" f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools" ) async def run(self, task: str, lead_id: str | None = None) -> str: """Run this agent with a specific task.""" _log(task, event="agent_start", agent=self.name) self.graph.agent_status[self.name] = "running" self.graph._current_agent = self.name self._current_lead_id = lead_id self._register_graph_tools() self._record_call_counts.clear() system = self._build_system_prompt(task) messages = [{"role": "user", "content": task}] t0 = time.monotonic() ph_before = len(self.graph.phenomena) try: final_text, conversation = await self.llm.tool_call_loop( messages=messages, tools=self.get_tool_definitions(), tool_executor=self._executors, system=system, terminal_tools=self.terminal_tools, ) # Forced-record retry: if the agent has any mandatory recording # tools but never invoked any of them, force one more round with # an explicit "you forgot to record" instruction. The mandatory # set is declared on the class — Timeline → add_temporal_edge, # Hypothesis → add_hypothesis, ReportAgent → (). For agents with # empty mandatory_record_tools this branch is a no-op. registered_mandatory = [ t for t in self.mandatory_record_tools if t in self._executors ] recorded_any = any( self._record_call_counts.get(t, 0) > 0 for t in registered_mandatory ) if registered_mandatory and not recorded_any: missing = "/".join(registered_mandatory) logger.warning( "[%s] finished without calling any of [%s] — forcing RECORD retry", self.name, missing, ) conversation.append({ "role": "user", "content": ( f"STOP. You produced an answer without ever calling " f"{missing}. Your answer is DISCARDED — only graph " f"mutations propagate to other agents and the final " f"report.\n\n" f"You MUST now call {missing} for every significant " f"finding from your prior investigation, including " f"exact identifiers, timestamps, and the source_tool " f"that produced each finding. If you genuinely found " f"NOTHING noteworthy, call the recording tool ONCE " f"with a 'No significant findings' style entry " f"summarizing what you searched.\n\n" f"Do not run more investigation tools. Just record " f"what you already found. Then end." ), }) final_text, _ = await self.llm.tool_call_loop( messages=conversation, tools=self.get_tool_definitions(), tool_executor=self._executors, system=system, max_iterations=10, terminal_tools=self.terminal_tools, ) self._work_log.append(f"[Task: {task[:80]}] -> {final_text[:150]}") except Exception: self.graph.agent_status[self.name] = "failed" logger.error("[%s] Failed during task execution", self.name, exc_info=True) raise self.graph.agent_status[self.name] = "completed" elapsed = time.monotonic() - t0 new_ph = len(self.graph.phenomena) - ph_before _log(f"+{new_ph} phenomena, {len(final_text)} chars", event="agent_done", agent=self.name, elapsed=elapsed) return final_text # ---- Graph interaction tools -------------------------------------------- def _register_graph_tools(self) -> None: """Register graph query + mutation tools. Subclasses can override to restrict the toolset. For example, a read-only agent (hypothesis, report) overrides this to skip _register_graph_write_tools. """ self._register_graph_read_tools() self._register_graph_write_tools() def _register_graph_read_tools(self) -> None: """Register read-only graph + asset query tools.""" self.register_tool( name="list_phenomena", description=( "List all phenomena (evidence artifacts) on the graph. " "Returns one-line summaries with IDs. Use get_phenomenon(id) for full details." ), input_schema={ "type": "object", "properties": { "category": { "type": "string", "description": "Filter by category (filesystem, registry, email, network, timeline). Omit for all.", }, }, }, executor=self._list_phenomena, ) self.register_tool( name="get_phenomenon", description="Get full details of a specific phenomenon by ID, including raw_data.", input_schema={ "type": "object", "properties": { "id": {"type": "string", "description": "Phenomenon ID (e.g. 'ph-a1b2c3d4')."}, }, "required": ["id"], }, executor=self._get_phenomenon, ) self.register_tool( name="search_graph", description="Search across phenomena, hypotheses, and entities by keyword. Returns matching summaries.", input_schema={ "type": "object", "properties": { "keyword": {"type": "string", "description": "Search keyword."}, }, "required": ["keyword"], }, executor=self._search_graph, ) self.register_tool( name="get_related", description="Get all nodes connected to a given node via edges. Returns summaries and edge types.", input_schema={ "type": "object", "properties": { "node_id": {"type": "string", "description": "Any node ID (ph-*, hyp-*, ent-*)."}, }, "required": ["node_id"], }, executor=self._get_related, ) self.register_tool( name="get_hypothesis_status", description="Get current status and confidence of all hypotheses being investigated.", input_schema={"type": "object", "properties": {}}, executor=self._get_hypothesis_status, ) self.register_tool( name="list_assets", description=( "List all files extracted from the disk image. " "Shows filename, category, size, local path, and inode. " "Check this before calling extract_file to avoid re-extraction." ), input_schema={ "type": "object", "properties": { "category": { "type": "string", "enum": [ "registry_hive", "chat_log", "prefetch", "network_capture", "config_file", "address_book", "recycle_bin", "executable", "text_log", "other", ], "description": "Filter by category. Omit to list all.", }, }, }, executor=self._list_assets, ) self.register_tool( name="find_extracted_file", description=( "Find an already-extracted file by inode or filename. " "Returns the local path so you can use it directly with " "parse_registry_key, read_text_file, etc. without re-extracting." ), input_schema={ "type": "object", "properties": { "inode": {"type": "string", "description": "Inode to look up."}, "filename": {"type": "string", "description": "Filename or partial name to search."}, }, }, executor=self._find_extracted_file, ) def _register_graph_write_tools(self) -> None: """Register graph mutation tools (add_phenomenon, add_lead, link_to_entity).""" self.register_tool( name="add_phenomenon", description=( "Record a forensic finding (phenomenon) on the evidence graph. " "You MUST specify source_tool: the name of the tool call that produced this finding." ), input_schema={ "type": "object", "properties": { "category": {"type": "string", "description": "Category of the finding."}, "title": {"type": "string", "description": "Short title."}, "description": {"type": "string", "description": "Detailed description. Quote exact data from tool output."}, "raw_data": {"type": "object", "description": "Structured raw data supporting this finding."}, "timestamp": {"type": "string", "description": "Timestamp if any. ONLY use timestamps from tool output."}, "source_tool": {"type": "string", "description": "Name of the tool that produced this (e.g. 'list_directory')."}, }, "required": ["category", "title", "description", "source_tool"], }, executor=self._add_phenomenon, ) self.register_tool( name="add_lead", description="Create an investigative lead for another agent to follow up on.", input_schema={ "type": "object", "properties": { "target_agent": { "type": "string", "enum": ["filesystem", "registry", "communication", "network", "timeline"], "description": "Which agent should handle this lead.", }, "description": {"type": "string", "description": "What should be investigated."}, "priority": {"type": "integer", "description": "Priority 1 (highest) to 10 (lowest). Default 5."}, }, "required": ["target_agent", "description"], }, executor=self._add_lead, ) self.register_tool( name="link_to_entity", description=( "Link a phenomenon to a named entity (person, program, host, etc). " "Creates the entity if it doesn't exist." ), input_schema={ "type": "object", "properties": { "phenomenon_id": {"type": "string", "description": "Phenomenon ID to link from."}, "entity_name": {"type": "string", "description": "Name of the entity (e.g. 'Mr. Evil', 'mIRC.exe')."}, "entity_type": { "type": "string", "enum": ["person", "program", "file", "host", "ip_address"], "description": "Type of entity.", }, "edge_type": { "type": "string", "enum": ["created_by", "executed_by", "owned_by", "targets", "associated_with", "found_on", "used_by"], "description": "Relationship type.", }, }, "required": ["phenomenon_id", "entity_name", "entity_type", "edge_type"], }, executor=self._link_to_entity, ) # ---- Tool executors ----------------------------------------------------- async def _list_phenomena(self, category: str | None = None) -> str: results = self.graph.list_phenomena(category) if not results: return "No phenomena recorded yet." if not category else f"No phenomena in category '{category}'." return "\n".join(results) async def _get_phenomenon(self, id: str) -> str: data = self.graph.get_phenomenon(id) if data is None: return f"Phenomenon not found: {id}" return json.dumps(data, ensure_ascii=False, indent=2) async def _search_graph(self, keyword: str) -> str: results = self.graph.search_graph(keyword) if not results: return f"No matches for '{keyword}'." return "\n".join(results) async def _get_related(self, node_id: str) -> str: results = self.graph.get_related(node_id) if not results: return f"No connections found for {node_id}." lines = [] for r in results: lines.append(f" {r['direction']} [{r['edge_type']}] → {r['node']}") return "\n".join(lines) async def _get_hypothesis_status(self) -> str: results = self.graph.get_hypothesis_status() if not results: return "No hypotheses defined yet." return "\n".join(results) async def _add_phenomenon( self, category: str, title: str, description: str, raw_data: dict | None = None, timestamp: str | None = None, source_tool: str = "", ) -> str: pid, merged = await self.graph.add_phenomenon( source_agent=self.name, category=category, title=title, description=description, raw_data=raw_data, timestamp=timestamp, source_tool=source_tool, from_lead_id=self._current_lead_id, ) if merged: return f"Phenomenon merged into existing: {pid} — {title} (corroboration boost)" return f"Phenomenon recorded: {pid} — {title}" async def _add_lead( self, target_agent: str, description: str, priority: int = 5, ) -> str: lid = await self.graph.add_lead( target_agent=target_agent, description=description, priority=priority, ) return f"Lead created: {lid} — [{target_agent}] {description}" async def _link_to_entity( self, phenomenon_id: str, entity_name: str, entity_type: str, edge_type: str, ) -> str: # Validate phenomenon exists before creating entity if not self.graph._node_exists(phenomenon_id): return ( f"Error: phenomenon '{phenomenon_id}' not found. " f"Call list_phenomena first to get valid IDs." ) eid, existing = await self.graph.add_entity(entity_name, entity_type) await self.graph.add_edge( source_id=phenomenon_id, target_id=eid, edge_type=edge_type, created_by=self.name, ) status = "linked to existing" if existing else "created and linked" return f"Entity {status}: {entity_name} ({entity_type}) ←[{edge_type}]— {phenomenon_id}" async def _list_assets(self, category: str | None = None) -> str: results = self.graph.list_assets(category) if not results: return "No files extracted yet." if not category else f"No assets in category '{category}'." return "\n".join(results) async def _find_extracted_file( self, inode: str | None = None, filename: str | None = None, ) -> str: if inode: asset = self.graph.lookup_asset_by_inode(inode) if asset: return ( f"Found: {asset.local_path} " f"({asset.size_bytes} bytes, {asset.category}, inode:{asset.inode})" ) return f"No extracted file with inode {inode}." if filename: results = self.graph.query_assets(filename_pattern=filename) if not results: return f"No extracted files matching '{filename}'." lines = [f"Found {len(results)} matching file(s):"] for a in results: lines.append(f" {a.local_path} (inode:{a.inode}, {a.size_bytes} bytes, {a.category})") return "\n".join(lines) return "Provide either inode or filename to search."