Issues found running the system end-to-end on the NIST CFReDS Hacking Case
disk image (SCHARDT.001, Mr. Evil). Four interconnected fixes:
1. HypothesisAgent boundary leak (two layers)
B.1 Tool set: BaseAgent._register_graph_tools was registering
add_phenomenon / add_lead / link_to_entity for every agent. With
an empty graph in Phase 2, HypothesisAgent "compensated" by
inventing phenomena, dispatching leads, and linking entities.
B.2 Prompt leak: BaseAgent's shared system prompt hard-coded "Call
investigation tools (list_directory, parse_registry_key, etc.)".
HypothesisAgent hallucinated list_directory and wasted 2 LLM
rounds on 'unknown tool' errors before backing off.
Fix:
- Split _register_graph_tools into _register_graph_read_tools +
_register_graph_write_tools.
- HypothesisAgent, ReportAgent, TimelineAgent override
_register_graph_tools to skip write tools.
- HypothesisAgent and TimelineAgent override _build_system_prompt
with focused, role-specific workflows (no Phase A-D investigation
boilerplate).
2. JSON parse failures in Phase 3 lead generation (5/6 hypotheses lost)
DeepSeek emits JSON with stray backslashes (Windows path references)
and occasional minor syntax slips. Old single-stage sanitize couldn't
recover; per-hypothesis fallback silently swallowed each failure.
Fix:
- _safe_json_loads: progressive — stage 0 as-is, stage 1 escape stray
\X (anything not in valid JSON escape set), log raw input on final
failure for diagnosis.
- New _call_llm_for_json helper: on parse failure, append the error
to the prompt and re-call LLM (self-correcting retry, up to 2).
- All 4 LLM-JSON callsites in orchestrator refactored to use it.
3. Phase 1 sometimes skipped add_phenomenon (LLM treated <answer> as deliverable)
Strengthen BaseAgent's RECORDING REQUIREMENT — explicit "your <answer>
is DISCARDED; only graph mutations propagate" plus a new rule:
negative findings (searched X, found nothing) MUST also be recorded
as phenomena, since they constrain the hypothesis space.
4. Phase 4 Timeline was a no-op
TimelineAgent inherited BaseAgent's Phase A-D prompt and never called
add_temporal_edge — produced 0 temporal edges. Override the prompt
with concrete workflow (build_filesystem_timeline ->
get_timestamped_phenomena -> 15-40 add_temporal_edge calls) and
restrict tool set to read-only + its 3 temporal tools.
Verified end-to-end: HypothesisAgent now 8 tools (no writes), ReportAgent
13 (no graph writes), TimelineAgent 10 (read + temporal + timeline).
All 60 unit tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
467 lines
19 KiB
Python
467 lines
19 KiB
Python
"""Base class for forensic analysis agents."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Any
|
|
|
|
from evidence_graph import EvidenceGraph
|
|
from llm_client import LLMClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _log(msg: str, **extra) -> None:
|
|
"""Emit a structured log message with extra fields."""
|
|
logger.info(msg, extra=extra)
|
|
|
|
|
|
class BaseAgent:
|
|
"""Base class for all forensic agents.
|
|
|
|
Each agent has:
|
|
- A name and role description
|
|
- A set of tools it can use (registered as methods)
|
|
- Access to the shared EvidenceGraph
|
|
- An LLM client for reasoning and tool-calling
|
|
"""
|
|
|
|
name: str = "base"
|
|
role: str = "A forensic analysis agent."
|
|
|
|
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
|
|
self.llm = llm
|
|
self.graph = graph
|
|
self._tools: dict[str, dict] = {} # name -> schema
|
|
self._executors: dict[str, Any] = {} # name -> async callable
|
|
self._work_log: list[str] = []
|
|
self._current_lead_id: str | None = None
|
|
|
|
def register_tool(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
input_schema: dict,
|
|
executor: Any,
|
|
) -> None:
|
|
"""Register a tool that this agent can use."""
|
|
self._tools[name] = {
|
|
"name": name,
|
|
"description": description,
|
|
"input_schema": input_schema,
|
|
}
|
|
self._executors[name] = executor
|
|
|
|
def get_tool_definitions(self) -> list[dict]:
|
|
"""Get tool definitions in Claude API format."""
|
|
return list(self._tools.values())
|
|
|
|
def _build_system_prompt(self, task: str) -> str:
|
|
"""Build the system prompt — lightweight stats, no full evidence dump."""
|
|
work_log_section = ""
|
|
if self._work_log:
|
|
entries = self._work_log[-5:]
|
|
log_lines = "\n".join(f" {i+1}. {entry}" for i, entry in enumerate(entries))
|
|
work_log_section = (
|
|
f"\nYour prior work on this investigation:\n{log_lines}\n"
|
|
f"Avoid repeating tools/approaches that already succeeded or failed. Build on prior findings.\n"
|
|
)
|
|
|
|
return (
|
|
f"You are {self.name}, a specialized digital forensics agent.\n"
|
|
f"Role: {self.role}\n\n"
|
|
f"You are analyzing a disk image as part of a multi-agent forensic investigation.\n"
|
|
f"Image: {self.graph.image_path}\n\n"
|
|
f"Current investigation state:\n{self.graph.stats_summary()}\n"
|
|
f"{work_log_section}\n"
|
|
f"Your current task: {task}\n\n"
|
|
f"CRITICAL WORKFLOW — you MUST follow these steps IN ORDER, one phase at a time:\n\n"
|
|
f"Phase A — INVESTIGATE:\n"
|
|
f" Use list_phenomena/search_graph to review existing findings.\n"
|
|
f" Call list_assets to see what files are already extracted.\n"
|
|
f" Call investigation tools (list_directory, parse_registry_key, etc.) to gather data.\n"
|
|
f" Only extract_file for forensically relevant files (user data, logs, configs, hives) — NOT system DLLs or OS files.\n"
|
|
f" Create add_lead for anything outside your expertise.\n\n"
|
|
f"Phase B — RECORD PHENOMENA:\n"
|
|
f" For EACH significant finding from Phase A, call add_phenomenon.\n"
|
|
f" Do NOT call link_to_entity yet — just record all phenomena first.\n\n"
|
|
f"Phase C — LINK ENTITIES:\n"
|
|
f" FIRST call list_phenomena to get the current IDs — do NOT rely on memory.\n"
|
|
f" Then call link_to_entity for each relevant phenomenon.\n"
|
|
f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n"
|
|
f"Phase D — ANSWER:\n"
|
|
f" Only give your <answer> AFTER completing Phases B and C.\n\n"
|
|
f"CRITICAL — RECORDING REQUIREMENT:\n"
|
|
f"- Your <answer> block is DISCARDED by the orchestrator. Only graph mutations propagate.\n"
|
|
f"- Other agents and the final report read ONLY the evidence graph "
|
|
f"(phenomena, entities, edges).\n"
|
|
f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you end.\n"
|
|
f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, "
|
|
f"a registry key) and found NOTHING, that absence IS evidence — call "
|
|
f"add_phenomenon with a 'No matches for X' title and the search scope in "
|
|
f"raw_data. Negative findings constrain the hypothesis space and prevent "
|
|
f"the next agent from wasting time re-searching.\n"
|
|
f"- If you produce <answer> without having called add_phenomenon at least once, "
|
|
f"the task is FAILED regardless of what you wrote in <answer>.\n"
|
|
f"- Include exact file paths, inode numbers, timestamps, and the source_tool "
|
|
f"that produced each finding.\n\n"
|
|
f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n"
|
|
f"- ONLY record findings that appear VERBATIM in tool results you received\n"
|
|
f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n"
|
|
f"- If tool output was truncated, state '[truncated]' — do NOT fill in the missing data\n"
|
|
f"- If you are unsure whether something exists, call a tool to verify or create a lead — do NOT assume\n"
|
|
f"- Quote exact strings from tool output when recording evidence descriptions\n"
|
|
f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools"
|
|
)
|
|
|
|
async def run(self, task: str, lead_id: str | None = None) -> str:
|
|
"""Run this agent with a specific task."""
|
|
_log(task, event="agent_start", agent=self.name)
|
|
self.graph.agent_status[self.name] = "running"
|
|
self.graph._current_agent = self.name
|
|
self._current_lead_id = lead_id
|
|
|
|
self._register_graph_tools()
|
|
|
|
system = self._build_system_prompt(task)
|
|
messages = [{"role": "user", "content": task}]
|
|
|
|
t0 = time.monotonic()
|
|
ph_before = len(self.graph.phenomena)
|
|
|
|
try:
|
|
final_text, _ = await self.llm.tool_call_loop(
|
|
messages=messages,
|
|
tools=self.get_tool_definitions(),
|
|
tool_executor=self._executors,
|
|
system=system,
|
|
)
|
|
self._work_log.append(f"[Task: {task[:80]}] -> {final_text[:150]}")
|
|
except Exception:
|
|
self.graph.agent_status[self.name] = "failed"
|
|
logger.error("[%s] Failed during task execution", self.name, exc_info=True)
|
|
raise
|
|
|
|
self.graph.agent_status[self.name] = "completed"
|
|
elapsed = time.monotonic() - t0
|
|
new_ph = len(self.graph.phenomena) - ph_before
|
|
_log(f"+{new_ph} phenomena, {len(final_text)} chars", event="agent_done", agent=self.name, elapsed=elapsed)
|
|
return final_text
|
|
|
|
# ---- Graph interaction tools --------------------------------------------
|
|
|
|
def _register_graph_tools(self) -> None:
|
|
"""Register graph query + mutation tools.
|
|
|
|
Subclasses can override to restrict the toolset. For example, a
|
|
read-only agent (hypothesis, report) overrides this to skip
|
|
_register_graph_write_tools.
|
|
"""
|
|
self._register_graph_read_tools()
|
|
self._register_graph_write_tools()
|
|
|
|
def _register_graph_read_tools(self) -> None:
|
|
"""Register read-only graph + asset query tools."""
|
|
|
|
self.register_tool(
|
|
name="list_phenomena",
|
|
description=(
|
|
"List all phenomena (evidence artifacts) on the graph. "
|
|
"Returns one-line summaries with IDs. Use get_phenomenon(id) for full details."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"description": "Filter by category (filesystem, registry, email, network, timeline). Omit for all.",
|
|
},
|
|
},
|
|
},
|
|
executor=self._list_phenomena,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="get_phenomenon",
|
|
description="Get full details of a specific phenomenon by ID, including raw_data.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string", "description": "Phenomenon ID (e.g. 'ph-a1b2c3d4')."},
|
|
},
|
|
"required": ["id"],
|
|
},
|
|
executor=self._get_phenomenon,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="search_graph",
|
|
description="Search across phenomena, hypotheses, and entities by keyword. Returns matching summaries.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"keyword": {"type": "string", "description": "Search keyword."},
|
|
},
|
|
"required": ["keyword"],
|
|
},
|
|
executor=self._search_graph,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="get_related",
|
|
description="Get all nodes connected to a given node via edges. Returns summaries and edge types.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"node_id": {"type": "string", "description": "Any node ID (ph-*, hyp-*, ent-*)."},
|
|
},
|
|
"required": ["node_id"],
|
|
},
|
|
executor=self._get_related,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="get_hypothesis_status",
|
|
description="Get current status and confidence of all hypotheses being investigated.",
|
|
input_schema={"type": "object", "properties": {}},
|
|
executor=self._get_hypothesis_status,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="list_assets",
|
|
description=(
|
|
"List all files extracted from the disk image. "
|
|
"Shows filename, category, size, local path, and inode. "
|
|
"Check this before calling extract_file to avoid re-extraction."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": [
|
|
"registry_hive", "chat_log", "prefetch", "network_capture",
|
|
"config_file", "address_book", "recycle_bin", "executable",
|
|
"text_log", "other",
|
|
],
|
|
"description": "Filter by category. Omit to list all.",
|
|
},
|
|
},
|
|
},
|
|
executor=self._list_assets,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="find_extracted_file",
|
|
description=(
|
|
"Find an already-extracted file by inode or filename. "
|
|
"Returns the local path so you can use it directly with "
|
|
"parse_registry_key, read_text_file, etc. without re-extracting."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"inode": {"type": "string", "description": "Inode to look up."},
|
|
"filename": {"type": "string", "description": "Filename or partial name to search."},
|
|
},
|
|
},
|
|
executor=self._find_extracted_file,
|
|
)
|
|
|
|
def _register_graph_write_tools(self) -> None:
|
|
"""Register graph mutation tools (add_phenomenon, add_lead, link_to_entity)."""
|
|
|
|
self.register_tool(
|
|
name="add_phenomenon",
|
|
description=(
|
|
"Record a forensic finding (phenomenon) on the evidence graph. "
|
|
"You MUST specify source_tool: the name of the tool call that produced this finding."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {"type": "string", "description": "Category of the finding."},
|
|
"title": {"type": "string", "description": "Short title."},
|
|
"description": {"type": "string", "description": "Detailed description. Quote exact data from tool output."},
|
|
"raw_data": {"type": "object", "description": "Structured raw data supporting this finding."},
|
|
"timestamp": {"type": "string", "description": "Timestamp if any. ONLY use timestamps from tool output."},
|
|
"source_tool": {"type": "string", "description": "Name of the tool that produced this (e.g. 'list_directory')."},
|
|
},
|
|
"required": ["category", "title", "description", "source_tool"],
|
|
},
|
|
executor=self._add_phenomenon,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="add_lead",
|
|
description="Create an investigative lead for another agent to follow up on.",
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"target_agent": {
|
|
"type": "string",
|
|
"enum": ["filesystem", "registry", "communication", "network", "timeline"],
|
|
"description": "Which agent should handle this lead.",
|
|
},
|
|
"description": {"type": "string", "description": "What should be investigated."},
|
|
"priority": {"type": "integer", "description": "Priority 1 (highest) to 10 (lowest). Default 5."},
|
|
},
|
|
"required": ["target_agent", "description"],
|
|
},
|
|
executor=self._add_lead,
|
|
)
|
|
|
|
self.register_tool(
|
|
name="link_to_entity",
|
|
description=(
|
|
"Link a phenomenon to a named entity (person, program, host, etc). "
|
|
"Creates the entity if it doesn't exist."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"phenomenon_id": {"type": "string", "description": "Phenomenon ID to link from."},
|
|
"entity_name": {"type": "string", "description": "Name of the entity (e.g. 'Mr. Evil', 'mIRC.exe')."},
|
|
"entity_type": {
|
|
"type": "string",
|
|
"enum": ["person", "program", "file", "host", "ip_address"],
|
|
"description": "Type of entity.",
|
|
},
|
|
"edge_type": {
|
|
"type": "string",
|
|
"enum": ["created_by", "executed_by", "owned_by", "targets", "associated_with", "found_on", "used_by"],
|
|
"description": "Relationship type.",
|
|
},
|
|
},
|
|
"required": ["phenomenon_id", "entity_name", "entity_type", "edge_type"],
|
|
},
|
|
executor=self._link_to_entity,
|
|
)
|
|
|
|
# ---- Tool executors -----------------------------------------------------
|
|
|
|
async def _list_phenomena(self, category: str | None = None) -> str:
|
|
results = self.graph.list_phenomena(category)
|
|
if not results:
|
|
return "No phenomena recorded yet." if not category else f"No phenomena in category '{category}'."
|
|
return "\n".join(results)
|
|
|
|
async def _get_phenomenon(self, id: str) -> str:
|
|
data = self.graph.get_phenomenon(id)
|
|
if data is None:
|
|
return f"Phenomenon not found: {id}"
|
|
return json.dumps(data, ensure_ascii=False, indent=2)
|
|
|
|
async def _search_graph(self, keyword: str) -> str:
|
|
results = self.graph.search_graph(keyword)
|
|
if not results:
|
|
return f"No matches for '{keyword}'."
|
|
return "\n".join(results)
|
|
|
|
async def _get_related(self, node_id: str) -> str:
|
|
results = self.graph.get_related(node_id)
|
|
if not results:
|
|
return f"No connections found for {node_id}."
|
|
lines = []
|
|
for r in results:
|
|
lines.append(f" {r['direction']} [{r['edge_type']}] → {r['node']}")
|
|
return "\n".join(lines)
|
|
|
|
async def _get_hypothesis_status(self) -> str:
|
|
results = self.graph.get_hypothesis_status()
|
|
if not results:
|
|
return "No hypotheses defined yet."
|
|
return "\n".join(results)
|
|
|
|
async def _add_phenomenon(
|
|
self,
|
|
category: str,
|
|
title: str,
|
|
description: str,
|
|
raw_data: dict | None = None,
|
|
timestamp: str | None = None,
|
|
source_tool: str = "",
|
|
) -> str:
|
|
pid, merged = await self.graph.add_phenomenon(
|
|
source_agent=self.name,
|
|
category=category,
|
|
title=title,
|
|
description=description,
|
|
raw_data=raw_data,
|
|
timestamp=timestamp,
|
|
source_tool=source_tool,
|
|
from_lead_id=self._current_lead_id,
|
|
)
|
|
if merged:
|
|
return f"Phenomenon merged into existing: {pid} — {title} (corroboration boost)"
|
|
return f"Phenomenon recorded: {pid} — {title}"
|
|
|
|
async def _add_lead(
|
|
self,
|
|
target_agent: str,
|
|
description: str,
|
|
priority: int = 5,
|
|
) -> str:
|
|
lid = await self.graph.add_lead(
|
|
target_agent=target_agent,
|
|
description=description,
|
|
priority=priority,
|
|
)
|
|
return f"Lead created: {lid} — [{target_agent}] {description}"
|
|
|
|
async def _link_to_entity(
|
|
self,
|
|
phenomenon_id: str,
|
|
entity_name: str,
|
|
entity_type: str,
|
|
edge_type: str,
|
|
) -> str:
|
|
# Validate phenomenon exists before creating entity
|
|
if not self.graph._node_exists(phenomenon_id):
|
|
return (
|
|
f"Error: phenomenon '{phenomenon_id}' not found. "
|
|
f"Call list_phenomena first to get valid IDs."
|
|
)
|
|
eid, existing = await self.graph.add_entity(entity_name, entity_type)
|
|
await self.graph.add_edge(
|
|
source_id=phenomenon_id,
|
|
target_id=eid,
|
|
edge_type=edge_type,
|
|
created_by=self.name,
|
|
)
|
|
status = "linked to existing" if existing else "created and linked"
|
|
return f"Entity {status}: {entity_name} ({entity_type}) ←[{edge_type}]— {phenomenon_id}"
|
|
|
|
async def _list_assets(self, category: str | None = None) -> str:
|
|
results = self.graph.list_assets(category)
|
|
if not results:
|
|
return "No files extracted yet." if not category else f"No assets in category '{category}'."
|
|
return "\n".join(results)
|
|
|
|
async def _find_extracted_file(
|
|
self,
|
|
inode: str | None = None,
|
|
filename: str | None = None,
|
|
) -> str:
|
|
if inode:
|
|
asset = self.graph.lookup_asset_by_inode(inode)
|
|
if asset:
|
|
return (
|
|
f"Found: {asset.local_path} "
|
|
f"({asset.size_bytes} bytes, {asset.category}, inode:{asset.inode})"
|
|
)
|
|
return f"No extracted file with inode {inode}."
|
|
|
|
if filename:
|
|
results = self.graph.query_assets(filename_pattern=filename)
|
|
if not results:
|
|
return f"No extracted files matching '{filename}'."
|
|
lines = [f"Found {len(results)} matching file(s):"]
|
|
for a in results:
|
|
lines.append(f" {a.local_path} (inode:{a.inode}, {a.size_bytes} bytes, {a.category})")
|
|
return "\n".join(lines)
|
|
|
|
return "Provide either inode or filename to search."
|