fix: address agent boundary / JSON robustness / Phase 4 no-op from CFReDS run

Issues found running the system end-to-end on the NIST CFReDS Hacking Case
disk image (SCHARDT.001, Mr. Evil). Four interconnected fixes:

1. HypothesisAgent boundary leak (two layers)
   B.1 Tool set: BaseAgent._register_graph_tools was registering
       add_phenomenon / add_lead / link_to_entity for every agent. With
       an empty graph in Phase 2, HypothesisAgent "compensated" by
       inventing phenomena, dispatching leads, and linking entities.
   B.2 Prompt leak: BaseAgent's shared system prompt hard-coded "Call
       investigation tools (list_directory, parse_registry_key, etc.)".
       HypothesisAgent hallucinated list_directory and wasted 2 LLM
       rounds on 'unknown tool' errors before backing off.

   Fix:
   - Split _register_graph_tools into _register_graph_read_tools +
     _register_graph_write_tools.
   - HypothesisAgent, ReportAgent, TimelineAgent override
     _register_graph_tools to skip write tools.
   - HypothesisAgent and TimelineAgent override _build_system_prompt
     with focused, role-specific workflows (no Phase A-D investigation
     boilerplate).

2. JSON parse failures in Phase 3 lead generation (5/6 hypotheses lost)
   DeepSeek emits JSON with stray backslashes (Windows path references)
   and occasional minor syntax slips. Old single-stage sanitize couldn't
   recover; per-hypothesis fallback silently swallowed each failure.

   Fix:
   - _safe_json_loads: progressive — stage 0 as-is, stage 1 escape stray
     \X (anything not in valid JSON escape set), log raw input on final
     failure for diagnosis.
   - New _call_llm_for_json helper: on parse failure, append the error
     to the prompt and re-call LLM (self-correcting retry, up to 2).
   - All 4 LLM-JSON callsites in orchestrator refactored to use it.

3. Phase 1 sometimes skipped add_phenomenon (LLM treated <answer> as deliverable)
   Strengthen BaseAgent's RECORDING REQUIREMENT — explicit "your <answer>
   is DISCARDED; only graph mutations propagate" plus a new rule:
   negative findings (searched X, found nothing) MUST also be recorded
   as phenomena, since they constrain the hypothesis space.

4. Phase 4 Timeline was a no-op
   TimelineAgent inherited BaseAgent's Phase A-D prompt and never called
   add_temporal_edge — produced 0 temporal edges. Override the prompt
   with concrete workflow (build_filesystem_timeline ->
   get_timestamped_phenomena -> 15-40 add_temporal_edge calls) and
   restrict tool set to read-only + its 3 temporal tools.

Verified end-to-end: HypothesisAgent now 8 tools (no writes), ReportAgent
13 (no graph writes), TimelineAgent 10 (read + temporal + timeline).
All 60 unit tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-12 17:14:16 +08:00
parent 0a966d8476
commit 893f5b5de2
5 changed files with 251 additions and 82 deletions

View File

@@ -1,7 +1,9 @@
"""Hypothesis Agent — generates investigative hypotheses from phenomena.
Generates hypotheses only. Phenomenon→Hypothesis linking is handled centrally
by Orchestrator._judge_new_phenomena, so all link logic lives in one place.
by Orchestrator._judge_new_phenomena. Tool set is restricted to read-only
graph queries + add_hypothesis to prevent the agent from creating phenomena,
leads, or entity links.
"""
from __future__ import annotations
@@ -27,6 +29,10 @@ class HypothesisAgent(BaseAgent):
super().__init__(llm, graph)
self._register_hypothesis_tools()
def _register_graph_tools(self) -> None:
"""Restrict to read-only graph tools. add_hypothesis is registered separately."""
self._register_graph_read_tools()
def _register_hypothesis_tools(self) -> None:
self.register_tool(
name="add_hypothesis",
@@ -51,6 +57,32 @@ class HypothesisAgent(BaseAgent):
executor=self._add_hypothesis,
)
def _build_system_prompt(self, task: str) -> str:
"""Focused prompt — no INVESTIGATE/RECORD/LINK workflow."""
return (
f"You are {self.name}, a forensic hypothesis analyst.\n"
f"Role: {self.role}\n\n"
f"Image: {self.graph.image_path}\n"
f"Current investigation state: {self.graph.stats_summary()}\n\n"
f"Your task: {task}\n\n"
f"WORKFLOW:\n"
f"1. Call list_phenomena and search_graph to review existing findings.\n"
f"2. For each hypothesis you want to record, call add_hypothesis (title + description).\n"
f"3. Wrap a short summary in <answer> when you have generated 3-7 hypotheses.\n\n"
f"STRICT BOUNDARIES:\n"
f"- Your only mutation tool is add_hypothesis. Do NOT attempt list_directory, "
f"parse_registry_key, extract_file, or any disk-image investigation tools — "
f"they are not yours and you will get 'unknown tool' errors.\n"
f"- You CANNOT create phenomena, leads, or entity links. The orchestrator handles "
f"all phenomenon↔hypothesis linking after you finish.\n"
f"- Each hypothesis must be specific and testable. Avoid generic templates like "
f"'Unauthorized Remote Access' or 'Malware Deployment' unless concrete phenomena "
f"in the graph already point to them.\n"
f"- If the graph is empty, generate broad starting hypotheses and mark them "
f"clearly as exploratory in their description so downstream agents know they "
f"still need evidence."
)
async def _add_hypothesis(self, title: str, description: str) -> str:
hid = await self.graph.add_hypothesis(
title=title,

View File

@@ -25,6 +25,10 @@ class ReportAgent(BaseAgent):
super().__init__(llm, graph)
self._register_tools()
def _register_graph_tools(self) -> None:
"""Restrict to read-only graph tools. Report agent does not mutate state."""
self._register_graph_read_tools()
def _build_system_prompt(self, task: str) -> str:
"""Report agent gets a clean prompt — no Phase A/B/C/D workflow."""
return (

View File

@@ -1,14 +1,21 @@
"""Timeline Agent — correlates evidence across time."""
"""Timeline Agent — connects existing phenomena with temporal edges.
Operates on phenomena already in the graph. Does NOT investigate the disk
image itself. The agent's only useful output is the temporal edges it
creates between phenomena.
"""
from __future__ import annotations
import json
import logging
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
logger = logging.getLogger(__name__)
class TimelineAgent(BaseAgent):
name = "timeline"
@@ -22,24 +29,33 @@ class TimelineAgent(BaseAgent):
super().__init__(llm, graph)
self._register_tools()
def _register_graph_tools(self) -> None:
"""Restrict to read-only graph tools — Timeline does not add phenomena."""
self._register_graph_read_tools()
def _register_tools(self) -> None:
# Filesystem timeline tool from catalog
td = TOOL_CATALOG.get("build_filesystem_timeline")
if td:
self.register_tool(td.name, td.description, td.input_schema, td.executor)
# Custom tool to get all phenomena with timestamps for correlation
self.register_tool(
name="get_timestamped_phenomena",
description="Get all phenomena that have timestamps, sorted chronologically. Use for timeline correlation.",
description=(
"Get all phenomena that have timestamps, sorted chronologically. "
"Returns each phenomenon's id, category, title, and a short description "
"preview. Use this as your primary input for temporal correlation."
),
input_schema={"type": "object", "properties": {}},
executor=self._get_timestamped_phenomena,
)
# Tool to add temporal edges between phenomena
self.register_tool(
name="add_temporal_edge",
description="Add a temporal relationship between two phenomena (before, after, or concurrent).",
description=(
"Add a temporal relationship edge between two existing phenomena. "
"Use 'before' when source phenomenon happened before target, "
"'concurrent' when they occurred within seconds of each other."
),
input_schema={
"type": "object",
"properties": {
@@ -56,6 +72,42 @@ class TimelineAgent(BaseAgent):
executor=self._add_temporal_edge,
)
def _build_system_prompt(self, task: str) -> str:
"""Focused prompt — Timeline connects existing phenomena, doesn't investigate."""
return (
f"You are {self.name}, a forensic timeline correlation analyst.\n"
f"Role: {self.role}\n\n"
f"Image: {self.graph.image_path}\n"
f"Current state: {self.graph.stats_summary()}\n\n"
f"Your task: {task}\n\n"
f"WORKFLOW:\n"
f"1. Call build_filesystem_timeline once to materialize MAC times for the disk.\n"
f"2. Call get_timestamped_phenomena to see all phenomena with timestamps, "
f"sorted chronologically. THIS IS YOUR PRIMARY INPUT.\n"
f"3. For each meaningful temporal relationship between phenomena, call "
f"add_temporal_edge(source_id, target_id, relation). Use 'before' when "
f"source happened first (the common case); 'concurrent' for events within "
f"a few seconds of each other.\n"
f" Examples of meaningful connections:\n"
f" - 'Cain installer executed' (before) 'Cain.exe first execution'\n"
f" - 'WHOIS first lookup' (before) 'WHOIS second lookup'\n"
f" - 'Recon tool cluster' (before) 'Anti-forensics defrag'\n"
f" - 'Tool installation' (before) 'Tool execution'\n"
f"4. Aim for 15-40 temporal edges that connect the major events into a "
f"forensic story.\n"
f"5. Wrap a short summary in <answer> when done.\n\n"
f"STRICT BOUNDARIES:\n"
f"- Your job is to CONNECT existing phenomena, NOT to discover new ones. "
f"You CANNOT call add_phenomenon — the tool isn't yours.\n"
f"- Use ONLY phenomenon IDs returned by get_timestamped_phenomena or "
f"list_phenomena. NEVER fabricate IDs.\n"
f"- Connect events that tell a forensic story (recon -> exploit -> cover-up). "
f"Do not exhaustively pair every two phenomena; focus on causally-relevant "
f"sequences.\n"
f"- The orchestrator handles report writing in the next phase. Your only "
f"output that propagates is the temporal edges you create."
)
async def _get_timestamped_phenomena(self) -> str:
items = [
ph for ph in self.graph.phenomena.values()

View File

@@ -93,12 +93,20 @@ class BaseAgent:
f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n"
f"Phase D — ANSWER:\n"
f" Only give your <answer> AFTER completing Phases B and C.\n\n"
f"IMPORTANT:\n"
f"- You MUST call add_phenomenon at least once before finishing\n"
f"- Complete each phase before starting the next\n"
f"- Other agents can ONLY see what you write to the graph\n"
f"- If you don't record findings, they are LOST\n"
f"- Include relevant file paths, inode numbers, timestamps, and raw data\n\n"
f"CRITICAL — RECORDING REQUIREMENT:\n"
f"- Your <answer> block is DISCARDED by the orchestrator. Only graph mutations propagate.\n"
f"- Other agents and the final report read ONLY the evidence graph "
f"(phenomena, entities, edges).\n"
f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you end.\n"
f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, "
f"a registry key) and found NOTHING, that absence IS evidence — call "
f"add_phenomenon with a 'No matches for X' title and the search scope in "
f"raw_data. Negative findings constrain the hypothesis space and prevent "
f"the next agent from wasting time re-searching.\n"
f"- If you produce <answer> without having called add_phenomenon at least once, "
f"the task is FAILED regardless of what you wrote in <answer>.\n"
f"- Include exact file paths, inode numbers, timestamps, and the source_tool "
f"that produced each finding.\n\n"
f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n"
f"- ONLY record findings that appear VERBATIM in tool results you received\n"
f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n"
@@ -145,9 +153,17 @@ class BaseAgent:
# ---- Graph interaction tools --------------------------------------------
def _register_graph_tools(self) -> None:
"""Register tools for querying and writing to the evidence graph."""
"""Register graph query + mutation tools.
# --- Read tools ---
Subclasses can override to restrict the toolset. For example, a
read-only agent (hypothesis, report) overrides this to skip
_register_graph_write_tools.
"""
self._register_graph_read_tools()
self._register_graph_write_tools()
def _register_graph_read_tools(self) -> None:
"""Register read-only graph + asset query tools."""
self.register_tool(
name="list_phenomena",
@@ -213,7 +229,49 @@ class BaseAgent:
executor=self._get_hypothesis_status,
)
# --- Write tools ---
self.register_tool(
name="list_assets",
description=(
"List all files extracted from the disk image. "
"Shows filename, category, size, local path, and inode. "
"Check this before calling extract_file to avoid re-extraction."
),
input_schema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": [
"registry_hive", "chat_log", "prefetch", "network_capture",
"config_file", "address_book", "recycle_bin", "executable",
"text_log", "other",
],
"description": "Filter by category. Omit to list all.",
},
},
},
executor=self._list_assets,
)
self.register_tool(
name="find_extracted_file",
description=(
"Find an already-extracted file by inode or filename. "
"Returns the local path so you can use it directly with "
"parse_registry_key, read_text_file, etc. without re-extracting."
),
input_schema={
"type": "object",
"properties": {
"inode": {"type": "string", "description": "Inode to look up."},
"filename": {"type": "string", "description": "Filename or partial name to search."},
},
},
executor=self._find_extracted_file,
)
def _register_graph_write_tools(self) -> None:
"""Register graph mutation tools (add_phenomenon, add_lead, link_to_entity)."""
self.register_tool(
name="add_phenomenon",
@@ -282,49 +340,6 @@ class BaseAgent:
executor=self._link_to_entity,
)
# --- Asset library tools ---
self.register_tool(
name="list_assets",
description=(
"List all files extracted from the disk image. "
"Shows filename, category, size, local path, and inode. "
"Check this before calling extract_file to avoid re-extraction."
),
input_schema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": [
"registry_hive", "chat_log", "prefetch", "network_capture",
"config_file", "address_book", "recycle_bin", "executable",
"text_log", "other",
],
"description": "Filter by category. Omit to list all.",
},
},
},
executor=self._list_assets,
)
self.register_tool(
name="find_extracted_file",
description=(
"Find an already-extracted file by inode or filename. "
"Returns the local path so you can use it directly with "
"parse_registry_key, read_text_file, etc. without re-extracting."
),
input_schema={
"type": "object",
"properties": {
"inode": {"type": "string", "description": "Inode to look up."},
"filename": {"type": "string", "description": "Filename or partial name to search."},
},
},
executor=self._find_extracted_file,
)
# ---- Tool executors -----------------------------------------------------
async def _list_phenomena(self, category: str | None = None) -> str:

View File

@@ -17,6 +17,36 @@ from llm_client import LLMClient
logger = logging.getLogger(__name__)
def _safe_json_loads(text: str):
"""Parse JSON with progressive sanitization for LLM-produced output.
Tries: (0) as-is, (1) escape stray backslashes outside valid JSON
escapes (\\" \\\\ \\/ \\b \\f \\n \\r \\t \\uXXXX). On final failure,
logs raw input (first 600 chars) so we can diagnose what the model
actually emitted.
"""
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Escape backslashes not followed by a valid JSON escape character.
# NOTE: \\u must be followed by exactly 4 hex digits to be valid.
stage1 = re.sub(
r'\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})',
r'\\\\',
text,
)
try:
return json.loads(stage1)
except json.JSONDecodeError as e:
logger.warning(
"_safe_json_loads failed after sanitize (%s); raw head[:600]=%r",
e, text[:600],
)
raise
def _log(msg: str, **extra) -> None:
"""Emit a structured log message with extra fields for the terminal formatter."""
logger.info(msg, extra=extra)
@@ -216,6 +246,52 @@ class Orchestrator:
"The ultimate goal is to reconstruct a detailed timeline of what happened on this host."
)
# ---- LLM JSON helper -----------------------------------------------------
async def _call_llm_for_json(
self,
system: str,
user_prompt: str,
schema: str = "array",
max_retries: int = 2,
):
"""Call LLM expecting JSON output; self-correct on parse failure.
Two layers of safety:
1. _safe_json_loads escapes stray backslashes outside valid JSON escapes.
2. On any remaining parse error, append the error to the prompt and ask
the LLM to retry, up to max_retries additional attempts.
"""
error_hint = ""
last_err: Exception | None = None
pattern = r'\[.*?\]' if schema == "array" else r'\{.*?\}'
for attempt in range(max_retries + 1):
messages = [{"role": "user", "content": user_prompt + error_hint}]
response = await self.llm.chat(messages=messages, system=system)
match = re.search(pattern, response, re.DOTALL)
candidate = match.group() if match else response
try:
return _safe_json_loads(candidate)
except (json.JSONDecodeError, ValueError) as e:
last_err = e
if attempt < max_retries:
logger.info(
"JSON parse attempt %d/%d failed (%s); retrying with hint",
attempt + 1, max_retries + 1, e,
)
error_hint = (
f"\n\n[Your previous response could not be parsed as JSON: {e}]\n"
f"Output STRICT JSON only — no markdown fences, no code blocks. "
f"Do NOT include literal backslash characters in any string value; "
f"rephrase using forward slashes or describe paths in English "
f"(e.g. 'the Cain folder under Program Files' instead of "
f"'C:\\Program Files\\Cain'). "
f"The response must be a single JSON {schema}."
)
assert last_err is not None
raise last_err
# ---- Hypothesis-directed investigation -----------------------------------
async def _generate_hypothesis_leads(self) -> None:
@@ -252,15 +328,11 @@ class Orchestrator:
)
try:
response = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
tasks = await self._call_llm_for_json(
system=self._LEAD_GEN_SYSTEM,
user_prompt=prompt,
schema="array",
)
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
tasks = json.loads(match.group())
else:
tasks = json.loads(response)
for task in tasks:
hyp_id = task.get("hypothesis_id", "")
@@ -300,12 +372,11 @@ class Orchestrator:
f'[{{"agent": "agent_type", "task": "what to investigate", "priority": 1-10}}]'
)
try:
response = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
tasks = await self._call_llm_for_json(
system=self._LEAD_GEN_SYSTEM,
user_prompt=prompt,
schema="array",
)
match = re.search(r'\[.*?\]', response, re.DOTALL)
tasks = json.loads(match.group()) if match else json.loads(response)
for task in tasks:
await self.graph.add_lead(
target_agent=task.get("agent", "filesystem"),
@@ -351,15 +422,11 @@ class Orchestrator:
)
try:
response = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
judgments = await self._call_llm_for_json(
system=self._JUDGE_SYSTEM,
user_prompt=prompt,
schema="array",
)
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
judgments = json.loads(match.group())
else:
judgments = json.loads(response)
for j in judgments:
hyp_id = j.get("hypothesis_id", "")
@@ -402,12 +469,11 @@ class Orchestrator:
f'[{{"phenomenon_id": "ph-xxx", "edge_type": "supports|contradicts|...", "reason": "brief explanation"}}]'
)
try:
response = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
judgments = await self._call_llm_for_json(
system=self._JUDGE_SYSTEM,
user_prompt=prompt,
schema="array",
)
match = re.search(r'\[.*?\]', response, re.DOTALL)
judgments = json.loads(match.group()) if match else json.loads(response)
for j in judgments:
ph_id = j.get("phenomenon_id", "")
edge_type = j.get("edge_type", "")