- llm_client: switch tool_call_loop from text-based <tool_call> regex to OpenAI-native tools=[...] / structured tool_calls field; accumulate delta.reasoning_content for DeepSeek thinking-mode echo-back; fold preserves system msg and aligns boundary to never orphan role:tool - base_agent: generic forced-retry via mandatory_record_tools class attr (filesystem -> add_phenomenon, timeline -> add_temporal_edge, hypothesis -> add_hypothesis, report -> save_report); count via executor wrapper - terminal_tools class attr + loop short-circuit: when a terminal tool is called, loop exits with its raw return as final_text. ReportAgent declares save_report as terminal - replaces the <answer>-tag stop signal that native tool calling broke - _execute_*: return (raw, formatted) - terminal exit uses untruncated raw, conversation history uses 3000-char-capped formatted - evidence_graph + orchestrator: LLM-derived InvestigationArea support (hypothesis-driven coverage check, replaces hardcoded _AREA_KEYWORDS / _AREA_TOOLS); manual yaml block kept as optional seed - strip <answer> references from agent prompts (no longer load-bearing) Verified on CFReDS image across 4 smoke runs: 0 JSON parse failures (was 3); 22 temporal edges from Phase 4 (was 0); ReportAgent exits via save_report (was max_iterations regression). 78/78 unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
94 lines
4.0 KiB
Python
94 lines
4.0 KiB
Python
"""Hypothesis Agent — generates investigative hypotheses from phenomena.
|
|
|
|
Generates hypotheses only. Phenomenon→Hypothesis linking is handled centrally
|
|
by Orchestrator._judge_new_phenomena. Tool set is restricted to read-only
|
|
graph queries + add_hypothesis to prevent the agent from creating phenomena,
|
|
leads, or entity links.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from base_agent import BaseAgent
|
|
from evidence_graph import EvidenceGraph
|
|
from llm_client import LLMClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HypothesisAgent(BaseAgent):
|
|
name = "hypothesis"
|
|
role = (
|
|
"Hypothesis analyst. You review all phenomena discovered so far "
|
|
"and formulate investigative hypotheses about what happened on this system. "
|
|
"Your ultimate goal: build the most complete picture of events that occurred."
|
|
)
|
|
mandatory_record_tools = ("add_hypothesis",)
|
|
|
|
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
|
|
super().__init__(llm, graph)
|
|
self._register_hypothesis_tools()
|
|
|
|
def _register_graph_tools(self) -> None:
|
|
"""Restrict to read-only graph tools. add_hypothesis is registered separately."""
|
|
self._register_graph_read_tools()
|
|
|
|
def _register_hypothesis_tools(self) -> None:
|
|
self.register_tool(
|
|
name="add_hypothesis",
|
|
description=(
|
|
"Create a new investigative hypothesis about what happened on the system. "
|
|
"Each hypothesis should be a specific, testable claim."
|
|
),
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"title": {
|
|
"type": "string",
|
|
"description": "Short title for the hypothesis.",
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "Detailed description of what this hypothesis claims.",
|
|
},
|
|
},
|
|
"required": ["title", "description"],
|
|
},
|
|
executor=self._add_hypothesis,
|
|
)
|
|
|
|
def _build_system_prompt(self, task: str) -> str:
|
|
"""Focused prompt — no INVESTIGATE/RECORD/LINK workflow."""
|
|
return (
|
|
f"You are {self.name}, a forensic hypothesis analyst.\n"
|
|
f"Role: {self.role}\n\n"
|
|
f"Image: {self.graph.image_path}\n"
|
|
f"Current investigation state: {self.graph.stats_summary()}\n\n"
|
|
f"Your task: {task}\n\n"
|
|
f"WORKFLOW:\n"
|
|
f"1. Call list_phenomena and search_graph to review existing findings.\n"
|
|
f"2. For each hypothesis you want to record, call add_hypothesis (title + description).\n"
|
|
f"3. STOP after you have generated 3-7 hypotheses. Do not call any more tools.\n\n"
|
|
f"STRICT BOUNDARIES:\n"
|
|
f"- Your only mutation tool is add_hypothesis. Do NOT attempt list_directory, "
|
|
f"parse_registry_key, extract_file, or any disk-image investigation tools — "
|
|
f"they are not yours and you will get 'unknown tool' errors.\n"
|
|
f"- You CANNOT create phenomena, leads, or entity links. The orchestrator handles "
|
|
f"all phenomenon↔hypothesis linking after you finish.\n"
|
|
f"- Each hypothesis must be specific and testable. Avoid generic templates like "
|
|
f"'Unauthorized Remote Access' or 'Malware Deployment' unless concrete phenomena "
|
|
f"in the graph already point to them.\n"
|
|
f"- If the graph is empty, generate broad starting hypotheses and mark them "
|
|
f"clearly as exploratory in their description so downstream agents know they "
|
|
f"still need evidence."
|
|
)
|
|
|
|
async def _add_hypothesis(self, title: str, description: str) -> str:
|
|
hid = await self.graph.add_hypothesis(
|
|
title=title,
|
|
description=description,
|
|
created_by=self.name,
|
|
)
|
|
return f"Hypothesis created: {hid} — {title} (confidence: 0.50)"
|