Files
MASForensic/base_agent.py
BattleTag 444d58726a refactor: native tool calling + generic forced-retry + terminal exit
- llm_client: switch tool_call_loop from text-based <tool_call> regex
  to OpenAI-native tools=[...] / structured tool_calls field; accumulate
  delta.reasoning_content for DeepSeek thinking-mode echo-back; fold
  preserves system msg and aligns boundary to never orphan role:tool
- base_agent: generic forced-retry via mandatory_record_tools class attr
  (filesystem -> add_phenomenon, timeline -> add_temporal_edge,
  hypothesis -> add_hypothesis, report -> save_report); count via
  executor wrapper
- terminal_tools class attr + loop short-circuit: when a terminal tool
  is called, loop exits with its raw return as final_text. ReportAgent
  declares save_report as terminal - replaces the <answer>-tag stop
  signal that native tool calling broke
- _execute_*: return (raw, formatted) - terminal exit uses untruncated
  raw, conversation history uses 3000-char-capped formatted
- evidence_graph + orchestrator: LLM-derived InvestigationArea support
  (hypothesis-driven coverage check, replaces hardcoded _AREA_KEYWORDS /
  _AREA_TOOLS); manual yaml block kept as optional seed
- strip <answer> references from agent prompts (no longer load-bearing)

Verified on CFReDS image across 4 smoke runs: 0 JSON parse failures
(was 3); 22 temporal edges from Phase 4 (was 0); ReportAgent exits via
save_report (was max_iterations regression). 78/78 unit tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 13:51:19 +08:00

541 lines
23 KiB
Python

"""Base class for forensic analysis agents."""
from __future__ import annotations
import json
import logging
import time
from typing import Any
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
logger = logging.getLogger(__name__)
def _log(msg: str, **extra) -> None:
"""Emit a structured log message with extra fields."""
logger.info(msg, extra=extra)
class BaseAgent:
"""Base class for all forensic agents.
Each agent has:
- A name and role description
- A set of tools it can use (registered as methods)
- Access to the shared EvidenceGraph
- An LLM client for reasoning and tool-calling
"""
name: str = "base"
role: str = "A forensic analysis agent."
# Tools the agent MUST invoke at least once for the run to count as productive.
# If none of these were called when tool_call_loop returns, run() fires a
# forced retry with an explicit "you forgot to record" instruction.
# Subclasses override to declare their own recording responsibility
# (timeline → add_temporal_edge, hypothesis → add_hypothesis, report → save_report).
mandatory_record_tools: tuple[str, ...] = ("add_phenomenon",)
# Tools whose invocation ends the run immediately. After any terminal tool
# is called, tool_call_loop returns with that tool's result text as
# final_text. Used by agents whose "completion" is a single explicit
# action rather than "model decides to stop calling tools". For multi-call
# agents (filesystem records many phenomena) leave empty.
terminal_tools: tuple[str, ...] = ()
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
self.llm = llm
self.graph = graph
self._tools: dict[str, dict] = {} # name -> schema
self._executors: dict[str, Any] = {} # name -> async callable
self._record_call_counts: dict[str, int] = {}
self._work_log: list[str] = []
self._current_lead_id: str | None = None
def register_tool(
self,
name: str,
description: str,
input_schema: dict,
executor: Any,
) -> None:
"""Register a tool that this agent can use."""
self._tools[name] = {
"name": name,
"description": description,
"input_schema": input_schema,
}
if name in self.mandatory_record_tools:
self._executors[name] = self._wrap_record_executor(name, executor)
else:
self._executors[name] = executor
def _wrap_record_executor(self, name: str, executor: Any) -> Any:
"""Wrap a mandatory-record executor to count successful invocations."""
async def wrapped(*args, **kwargs):
result = await executor(*args, **kwargs)
self._record_call_counts[name] = self._record_call_counts.get(name, 0) + 1
return result
return wrapped
def get_tool_definitions(self) -> list[dict]:
"""Get tool definitions in Claude API format."""
return list(self._tools.values())
def _build_system_prompt(self, task: str) -> str:
"""Build the system prompt — lightweight stats, no full evidence dump."""
work_log_section = ""
if self._work_log:
entries = self._work_log[-5:]
log_lines = "\n".join(f" {i+1}. {entry}" for i, entry in enumerate(entries))
work_log_section = (
f"\nYour prior work on this investigation:\n{log_lines}\n"
f"Avoid repeating tools/approaches that already succeeded or failed. Build on prior findings.\n"
)
return (
f"You are {self.name}, a specialized digital forensics agent.\n"
f"Role: {self.role}\n\n"
f"You are analyzing a disk image as part of a multi-agent forensic investigation.\n"
f"Image: {self.graph.image_path}\n\n"
f"Current investigation state:\n{self.graph.stats_summary()}\n"
f"{work_log_section}\n"
f"Your current task: {task}\n\n"
f"CRITICAL WORKFLOW — you MUST follow these steps IN ORDER, one phase at a time:\n\n"
f"Phase A — INVESTIGATE:\n"
f" Use list_phenomena/search_graph to review existing findings.\n"
f" Call list_assets to see what files are already extracted.\n"
f" Call investigation tools (list_directory, parse_registry_key, etc.) to gather data.\n"
f" Only extract_file for forensically relevant files (user data, logs, configs, hives) — NOT system DLLs or OS files.\n"
f" Create add_lead for anything outside your expertise.\n\n"
f"Phase B — RECORD PHENOMENA:\n"
f" For EACH significant finding from Phase A, call add_phenomenon.\n"
f" Do NOT call link_to_entity yet — just record all phenomena first.\n\n"
f"Phase C — LINK ENTITIES:\n"
f" FIRST call list_phenomena to get the current IDs — do NOT rely on memory.\n"
f" Then call link_to_entity for each relevant phenomenon.\n"
f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n"
f"Phase D — STOP:\n"
f" Once all phenomena are recorded and entities linked, you are DONE.\n"
f" Do not call any more tools. The orchestrator picks up automatically.\n\n"
f"CRITICAL — RECORDING REQUIREMENT:\n"
f"- Only graph mutations propagate to other agents and the final report.\n"
f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you stop.\n"
f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, "
f"a registry key) and found NOTHING, that absence IS evidence — call "
f"add_phenomenon with a 'No matches for X' title and the search scope in "
f"raw_data. Negative findings constrain the hypothesis space and prevent "
f"the next agent from wasting time re-searching.\n"
f"- If you stop without having called add_phenomenon at least once, the task "
f"is FAILED and a forced retry will fire.\n"
f"- Include exact file paths, inode numbers, timestamps, and the source_tool "
f"that produced each finding.\n\n"
f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n"
f"- ONLY record findings that appear VERBATIM in tool results you received\n"
f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n"
f"- If tool output was truncated, state '[truncated]' — do NOT fill in the missing data\n"
f"- If you are unsure whether something exists, call a tool to verify or create a lead — do NOT assume\n"
f"- Quote exact strings from tool output when recording evidence descriptions\n"
f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools"
)
async def run(self, task: str, lead_id: str | None = None) -> str:
"""Run this agent with a specific task."""
_log(task, event="agent_start", agent=self.name)
self.graph.agent_status[self.name] = "running"
self.graph._current_agent = self.name
self._current_lead_id = lead_id
self._register_graph_tools()
self._record_call_counts.clear()
system = self._build_system_prompt(task)
messages = [{"role": "user", "content": task}]
t0 = time.monotonic()
ph_before = len(self.graph.phenomena)
try:
final_text, conversation = await self.llm.tool_call_loop(
messages=messages,
tools=self.get_tool_definitions(),
tool_executor=self._executors,
system=system,
terminal_tools=self.terminal_tools,
)
# Forced-record retry: if the agent has any mandatory recording
# tools but never invoked any of them, force one more round with
# an explicit "you forgot to record" instruction. The mandatory
# set is declared on the class — Timeline → add_temporal_edge,
# Hypothesis → add_hypothesis, ReportAgent → (). For agents with
# empty mandatory_record_tools this branch is a no-op.
registered_mandatory = [
t for t in self.mandatory_record_tools if t in self._executors
]
recorded_any = any(
self._record_call_counts.get(t, 0) > 0
for t in registered_mandatory
)
if registered_mandatory and not recorded_any:
missing = "/".join(registered_mandatory)
logger.warning(
"[%s] finished without calling any of [%s] — forcing RECORD retry",
self.name, missing,
)
conversation.append({
"role": "user",
"content": (
f"STOP. You produced an answer without ever calling "
f"{missing}. Your answer is DISCARDED — only graph "
f"mutations propagate to other agents and the final "
f"report.\n\n"
f"You MUST now call {missing} for every significant "
f"finding from your prior investigation, including "
f"exact identifiers, timestamps, and the source_tool "
f"that produced each finding. If you genuinely found "
f"NOTHING noteworthy, call the recording tool ONCE "
f"with a 'No significant findings' style entry "
f"summarizing what you searched.\n\n"
f"Do not run more investigation tools. Just record "
f"what you already found. Then end."
),
})
final_text, _ = await self.llm.tool_call_loop(
messages=conversation,
tools=self.get_tool_definitions(),
tool_executor=self._executors,
system=system,
max_iterations=10,
terminal_tools=self.terminal_tools,
)
self._work_log.append(f"[Task: {task[:80]}] -> {final_text[:150]}")
except Exception:
self.graph.agent_status[self.name] = "failed"
logger.error("[%s] Failed during task execution", self.name, exc_info=True)
raise
self.graph.agent_status[self.name] = "completed"
elapsed = time.monotonic() - t0
new_ph = len(self.graph.phenomena) - ph_before
_log(f"+{new_ph} phenomena, {len(final_text)} chars", event="agent_done", agent=self.name, elapsed=elapsed)
return final_text
# ---- Graph interaction tools --------------------------------------------
def _register_graph_tools(self) -> None:
"""Register graph query + mutation tools.
Subclasses can override to restrict the toolset. For example, a
read-only agent (hypothesis, report) overrides this to skip
_register_graph_write_tools.
"""
self._register_graph_read_tools()
self._register_graph_write_tools()
def _register_graph_read_tools(self) -> None:
"""Register read-only graph + asset query tools."""
self.register_tool(
name="list_phenomena",
description=(
"List all phenomena (evidence artifacts) on the graph. "
"Returns one-line summaries with IDs. Use get_phenomenon(id) for full details."
),
input_schema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": "Filter by category (filesystem, registry, email, network, timeline). Omit for all.",
},
},
},
executor=self._list_phenomena,
)
self.register_tool(
name="get_phenomenon",
description="Get full details of a specific phenomenon by ID, including raw_data.",
input_schema={
"type": "object",
"properties": {
"id": {"type": "string", "description": "Phenomenon ID (e.g. 'ph-a1b2c3d4')."},
},
"required": ["id"],
},
executor=self._get_phenomenon,
)
self.register_tool(
name="search_graph",
description="Search across phenomena, hypotheses, and entities by keyword. Returns matching summaries.",
input_schema={
"type": "object",
"properties": {
"keyword": {"type": "string", "description": "Search keyword."},
},
"required": ["keyword"],
},
executor=self._search_graph,
)
self.register_tool(
name="get_related",
description="Get all nodes connected to a given node via edges. Returns summaries and edge types.",
input_schema={
"type": "object",
"properties": {
"node_id": {"type": "string", "description": "Any node ID (ph-*, hyp-*, ent-*)."},
},
"required": ["node_id"],
},
executor=self._get_related,
)
self.register_tool(
name="get_hypothesis_status",
description="Get current status and confidence of all hypotheses being investigated.",
input_schema={"type": "object", "properties": {}},
executor=self._get_hypothesis_status,
)
self.register_tool(
name="list_assets",
description=(
"List all files extracted from the disk image. "
"Shows filename, category, size, local path, and inode. "
"Check this before calling extract_file to avoid re-extraction."
),
input_schema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": [
"registry_hive", "chat_log", "prefetch", "network_capture",
"config_file", "address_book", "recycle_bin", "executable",
"text_log", "other",
],
"description": "Filter by category. Omit to list all.",
},
},
},
executor=self._list_assets,
)
self.register_tool(
name="find_extracted_file",
description=(
"Find an already-extracted file by inode or filename. "
"Returns the local path so you can use it directly with "
"parse_registry_key, read_text_file, etc. without re-extracting."
),
input_schema={
"type": "object",
"properties": {
"inode": {"type": "string", "description": "Inode to look up."},
"filename": {"type": "string", "description": "Filename or partial name to search."},
},
},
executor=self._find_extracted_file,
)
def _register_graph_write_tools(self) -> None:
"""Register graph mutation tools (add_phenomenon, add_lead, link_to_entity)."""
self.register_tool(
name="add_phenomenon",
description=(
"Record a forensic finding (phenomenon) on the evidence graph. "
"You MUST specify source_tool: the name of the tool call that produced this finding."
),
input_schema={
"type": "object",
"properties": {
"category": {"type": "string", "description": "Category of the finding."},
"title": {"type": "string", "description": "Short title."},
"description": {"type": "string", "description": "Detailed description. Quote exact data from tool output."},
"raw_data": {"type": "object", "description": "Structured raw data supporting this finding."},
"timestamp": {"type": "string", "description": "Timestamp if any. ONLY use timestamps from tool output."},
"source_tool": {"type": "string", "description": "Name of the tool that produced this (e.g. 'list_directory')."},
},
"required": ["category", "title", "description", "source_tool"],
},
executor=self._add_phenomenon,
)
self.register_tool(
name="add_lead",
description="Create an investigative lead for another agent to follow up on.",
input_schema={
"type": "object",
"properties": {
"target_agent": {
"type": "string",
"enum": ["filesystem", "registry", "communication", "network", "timeline"],
"description": "Which agent should handle this lead.",
},
"description": {"type": "string", "description": "What should be investigated."},
"priority": {"type": "integer", "description": "Priority 1 (highest) to 10 (lowest). Default 5."},
},
"required": ["target_agent", "description"],
},
executor=self._add_lead,
)
self.register_tool(
name="link_to_entity",
description=(
"Link a phenomenon to a named entity (person, program, host, etc). "
"Creates the entity if it doesn't exist."
),
input_schema={
"type": "object",
"properties": {
"phenomenon_id": {"type": "string", "description": "Phenomenon ID to link from."},
"entity_name": {"type": "string", "description": "Name of the entity (e.g. 'Mr. Evil', 'mIRC.exe')."},
"entity_type": {
"type": "string",
"enum": ["person", "program", "file", "host", "ip_address"],
"description": "Type of entity.",
},
"edge_type": {
"type": "string",
"enum": ["created_by", "executed_by", "owned_by", "targets", "associated_with", "found_on", "used_by"],
"description": "Relationship type.",
},
},
"required": ["phenomenon_id", "entity_name", "entity_type", "edge_type"],
},
executor=self._link_to_entity,
)
# ---- Tool executors -----------------------------------------------------
async def _list_phenomena(self, category: str | None = None) -> str:
results = self.graph.list_phenomena(category)
if not results:
return "No phenomena recorded yet." if not category else f"No phenomena in category '{category}'."
return "\n".join(results)
async def _get_phenomenon(self, id: str) -> str:
data = self.graph.get_phenomenon(id)
if data is None:
return f"Phenomenon not found: {id}"
return json.dumps(data, ensure_ascii=False, indent=2)
async def _search_graph(self, keyword: str) -> str:
results = self.graph.search_graph(keyword)
if not results:
return f"No matches for '{keyword}'."
return "\n".join(results)
async def _get_related(self, node_id: str) -> str:
results = self.graph.get_related(node_id)
if not results:
return f"No connections found for {node_id}."
lines = []
for r in results:
lines.append(f" {r['direction']} [{r['edge_type']}] → {r['node']}")
return "\n".join(lines)
async def _get_hypothesis_status(self) -> str:
results = self.graph.get_hypothesis_status()
if not results:
return "No hypotheses defined yet."
return "\n".join(results)
async def _add_phenomenon(
self,
category: str,
title: str,
description: str,
raw_data: dict | None = None,
timestamp: str | None = None,
source_tool: str = "",
) -> str:
pid, merged = await self.graph.add_phenomenon(
source_agent=self.name,
category=category,
title=title,
description=description,
raw_data=raw_data,
timestamp=timestamp,
source_tool=source_tool,
from_lead_id=self._current_lead_id,
)
if merged:
return f"Phenomenon merged into existing: {pid}{title} (corroboration boost)"
return f"Phenomenon recorded: {pid}{title}"
async def _add_lead(
self,
target_agent: str,
description: str,
priority: int = 5,
) -> str:
lid = await self.graph.add_lead(
target_agent=target_agent,
description=description,
priority=priority,
)
return f"Lead created: {lid} — [{target_agent}] {description}"
async def _link_to_entity(
self,
phenomenon_id: str,
entity_name: str,
entity_type: str,
edge_type: str,
) -> str:
# Validate phenomenon exists before creating entity
if not self.graph._node_exists(phenomenon_id):
return (
f"Error: phenomenon '{phenomenon_id}' not found. "
f"Call list_phenomena first to get valid IDs."
)
eid, existing = await self.graph.add_entity(entity_name, entity_type)
await self.graph.add_edge(
source_id=phenomenon_id,
target_id=eid,
edge_type=edge_type,
created_by=self.name,
)
status = "linked to existing" if existing else "created and linked"
return f"Entity {status}: {entity_name} ({entity_type}) ←[{edge_type}]— {phenomenon_id}"
async def _list_assets(self, category: str | None = None) -> str:
results = self.graph.list_assets(category)
if not results:
return "No files extracted yet." if not category else f"No assets in category '{category}'."
return "\n".join(results)
async def _find_extracted_file(
self,
inode: str | None = None,
filename: str | None = None,
) -> str:
if inode:
asset = self.graph.lookup_asset_by_inode(inode)
if asset:
return (
f"Found: {asset.local_path} "
f"({asset.size_bytes} bytes, {asset.category}, inode:{asset.inode})"
)
return f"No extracted file with inode {inode}."
if filename:
results = self.graph.query_assets(filename_pattern=filename)
if not results:
return f"No extracted files matching '{filename}'."
lines = [f"Found {len(results)} matching file(s):"]
for a in results:
lines.append(f" {a.local_path} (inode:{a.inode}, {a.size_bytes} bytes, {a.category})")
return "\n".join(lines)
return "Provide either inode or filename to search."