refactor: lead provenance, unified link path, SSOT cleanup, configurable weights

Five interrelated cleanups:

1. Lead -> Phenomenon provenance
   - Phenomenon.from_lead_id field on the dataclass
   - BaseAgent.run(lead_id=...) writes self._current_lead_id
   - _add_phenomenon auto-injects from agent state (LLM unaware)
   - Orchestrator dispatch passes lead.id; Phase 1/2-auto/4/5 stay None
   - Merge path preserves the first non-None lead_id on collision

2. Unified Phenomenon <-> Hypothesis link path
   - HypothesisAgent only adds hypotheses, never links
   - link_phenomenon_to_hypothesis tool + executor removed
   - All links go through Orchestrator._judge_new_phenomena
   - Phase 2 unconditionally judges after hypothesis generation
   - Gap Analysis judges after each dispatch round
   (Three previously-missing judge calls now in place.)

3. SSOT in agent subclasses
   - Remove RoleTemplate dataclass, ROLE_TEMPLATES dict,
     _instantiate_from_template method
   - Each agent subclass owns name, role, and tool list
   - agent_factory.py shrinks from 299 to 153 lines
   - All 7 agents now route through _AGENT_CLASSES (filesystem,
     registry, communication, network, timeline were previously dead
     subclasses overridden by templates)

4. Configurable edge weights
   - HYPOTHESIS_EDGE_WEIGHTS -> _DEFAULT_EDGE_WEIGHTS (private default)
   - EvidenceGraph(edge_weights=...) override via config.yaml
   - hypothesis_edge_weights section in config.yaml (commented example)
   - main.py and regenerate_report.py read and pass through

5. regenerate_report.py auto-picks the latest run/*/graph_state.json
   when no CLI arg is given (was a hardcoded date path)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-12 14:10:15 +08:00
parent fde96c7d9f
commit 74e6bde13a
7 changed files with 92 additions and 254 deletions

View File

@@ -1,150 +1,50 @@
"""Agent Factory — composes agents from tool registry and role templates.
"""Agent Factory — instantiates agents from registered classes.
Provides both pre-defined agent templates (filesystem, registry, etc.)
and LLM-driven dynamic agent composition for capability gaps.
Each agent type has a dedicated subclass under agents/ that owns its name,
role description, and tool list (single source of truth). The factory just
maps agent_type → class. Also supports LLM-driven dynamic composition for
capability gaps via create_specialized_agent().
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG, ToolDefinition
from tool_registry import TOOL_CATALOG
# Agent classes with custom tools — keyed by template name
_AGENT_CLASSES: dict[str, type] = {}
# Agent classes keyed by name. Populated lazily to avoid circular imports.
_AGENT_CLASSES: dict[str, type[BaseAgent]] = {}
def _load_agent_classes() -> None:
"""Lazy-import custom agent classes to avoid circular imports."""
"""Lazy-import agent classes to avoid circular imports."""
if _AGENT_CLASSES:
return
from agents.communication import CommunicationAgent
from agents.filesystem import FileSystemAgent
from agents.hypothesis import HypothesisAgent
from agents.network import NetworkAgent
from agents.registry import RegistryAgent
from agents.report import ReportAgent
from agents.timeline import TimelineAgent
_AGENT_CLASSES["filesystem"] = FileSystemAgent
_AGENT_CLASSES["registry"] = RegistryAgent
_AGENT_CLASSES["communication"] = CommunicationAgent
_AGENT_CLASSES["network"] = NetworkAgent
_AGENT_CLASSES["timeline"] = TimelineAgent
_AGENT_CLASSES["hypothesis"] = HypothesisAgent
_AGENT_CLASSES["report"] = ReportAgent
logger = logging.getLogger(__name__)
@dataclass
class RoleTemplate:
"""Pre-defined agent archetype."""
name: str
role: str
default_tools: list[str] # tool names from TOOL_CATALOG
tags: list[str] = field(default_factory=list)
# Pre-defined templates matching the original 6 agents + hypothesis agent.
ROLE_TEMPLATES: dict[str, RoleTemplate] = {
"filesystem": RoleTemplate(
name="filesystem",
role=(
"File system forensic analyst. You examine disk image partition layouts, "
"directory structures, file metadata, and recover deleted files. "
"You identify suspicious files, installed programs, and user data locations. "
"You also handle Recycle Bin forensics and Prefetch execution evidence."
),
default_tools=[
"partition_info", "filesystem_info", "list_directory",
"extract_file", "find_file", "search_strings",
"parse_prefetch", "count_deleted_files",
"read_text_file", "search_text_file", "read_binary_preview",
],
tags=["filesystem", "disk", "files", "deleted", "prefetch"],
),
"registry": RoleTemplate(
name="registry",
role=(
"Windows registry forensic analyst. You parse registry hive files "
"(SYSTEM, SOFTWARE, SAM, NTUSER.DAT) to extract system configuration, "
"user accounts, installed software, network settings, email accounts, "
"and other Windows artifacts."
),
default_tools=[
"extract_file", "list_directory",
"parse_registry_key", "list_installed_software",
"get_user_activity", "search_registry",
"get_system_info", "get_timezone_info", "get_computer_name",
"get_shutdown_time", "enumerate_users",
"get_network_interfaces", "get_email_config",
],
tags=["registry", "windows", "system", "user", "software"],
),
"communication": RoleTemplate(
name="communication",
role=(
"Communication forensic analyst. You analyze email files (.dbx, .pst), "
"IRC/mIRC chat logs, newsgroup data, and other messaging artifacts "
"to identify communication patterns and contacts."
),
default_tools=[
"list_directory", "extract_file",
"read_text_file", "read_binary_preview",
"list_extracted_dir", "search_strings",
"search_text_file", "read_text_file_section",
],
tags=["email", "chat", "irc", "messaging", "communication"],
),
"network": RoleTemplate(
name="network",
role=(
"Network forensic analyst. You analyze browser history, cookies, "
"network captures (PCAP), wireless artifacts, and other network-related "
"evidence to reconstruct online activities."
),
default_tools=[
"list_directory", "extract_file",
"read_text_file", "read_binary_preview",
"list_extracted_dir", "search_strings",
"search_text_file", "read_text_file_section",
"parse_pcap_strings",
],
tags=["network", "browser", "pcap", "http", "internet"],
),
"timeline": RoleTemplate(
name="timeline",
role=(
"Timeline correlation analyst. You build chronological timelines "
"by combining filesystem MAC times with evidence from other agents. "
"You identify temporal patterns and correlate events across categories."
),
default_tools=[
"build_filesystem_timeline",
],
tags=["timeline", "correlation", "temporal"],
),
"report": RoleTemplate(
name="report",
role=(
"Forensic report writer. You synthesize all evidence and hypotheses "
"into a comprehensive forensic analysis report with executive summary, "
"detailed findings organized by hypothesis, timeline of events, and conclusions."
),
default_tools=[], # Report agent uses only graph query tools
tags=["report", "summary", "writing"],
),
"hypothesis": RoleTemplate(
name="hypothesis",
role=(
"Hypothesis analyst. You review all phenomena discovered so far "
"and formulate investigative hypotheses about what happened on the system. "
"For each hypothesis, identify which existing phenomena support or contradict it."
),
default_tools=[], # Uses only graph query + hypothesis tools
tags=["hypothesis", "analysis", "reasoning"],
),
}
class AgentFactory:
"""Creates agents from templates or dynamically via LLM composition."""
"""Creates agents from registered classes or dynamically via LLM composition."""
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
self.llm = llm
@@ -152,40 +52,20 @@ class AgentFactory:
self._cache: dict[str, BaseAgent] = {}
def get_or_create_agent(self, agent_type: str) -> BaseAgent | None:
"""Get a cached agent or create one from a template."""
"""Get a cached agent or instantiate one from its registered class."""
if agent_type in self._cache:
return self._cache[agent_type]
template = ROLE_TEMPLATES.get(agent_type)
if template is None:
logger.warning("No template for agent type: %s", agent_type)
return None
# Use custom agent class if one exists, otherwise BaseAgent
_load_agent_classes()
agent_cls = _AGENT_CLASSES.get(agent_type)
if agent_cls is not None:
agent = agent_cls(self.llm, self.graph)
else:
agent = self._instantiate_from_template(template)
if agent_cls is None:
logger.warning("No agent class for type: %s", agent_type)
return None
agent = agent_cls(self.llm, self.graph)
self._cache[agent_type] = agent
return agent
def _instantiate_from_template(self, template: RoleTemplate) -> BaseAgent:
"""Create a BaseAgent from a role template, registering tools from the catalog."""
agent = BaseAgent(self.llm, self.graph)
agent.name = template.name
agent.role = template.role
for tool_name in template.default_tools:
td = TOOL_CATALOG.get(tool_name)
if td is None:
logger.warning("Tool '%s' not in catalog (template: %s)", tool_name, template.name)
continue
agent.register_tool(td.name, td.description, td.input_schema, td.executor)
return agent
async def create_specialized_agent(
self,
hypothesis_title: str,
@@ -220,18 +100,15 @@ class AgentFactory:
messages=[{"role": "user", "content": prompt}],
)
# Parse response — try to extract JSON
try:
config = json.loads(response)
except json.JSONDecodeError:
# Try to find JSON in the response
import re
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
config = json.loads(match.group())
else:
logger.error("Failed to parse agent composition response: %s", response[:300])
# Fallback: create a generic agent with all tools
return self._create_fallback_agent(capability_gap)
agent_name = config.get("agent_name", "specialized")
@@ -239,13 +116,11 @@ class AgentFactory:
strategy = config.get("strategy", "")
tool_names = config.get("tools", [])
# Validate tool names against catalog
valid_tools = [t for t in tool_names if t in TOOL_CATALOG]
if not valid_tools:
logger.warning("No valid tools selected by LLM, using fallback")
return self._create_fallback_agent(capability_gap)
# Build agent
agent = BaseAgent(self.llm, self.graph)
agent.name = agent_name
agent.role = f"{role_text}\n\nInvestigation Strategy:\n{strategy}"

View File

@@ -1,12 +1,15 @@
"""Hypothesis Agent — analyzes phenomena and generates investigative hypotheses."""
"""Hypothesis Agent — generates investigative hypotheses from phenomena.
Generates hypotheses only. Phenomenon→Hypothesis linking is handled centrally
by Orchestrator._judge_new_phenomena, so all link logic lives in one place.
"""
from __future__ import annotations
import json
import logging
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph, HYPOTHESIS_EDGE_WEIGHTS
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
logger = logging.getLogger(__name__)
@@ -17,8 +20,7 @@ class HypothesisAgent(BaseAgent):
role = (
"Hypothesis analyst. You review all phenomena discovered so far "
"and formulate investigative hypotheses about what happened on this system. "
"Your ultimate goal: build the most complete picture of events that occurred. "
"For each hypothesis, identify which existing phenomena support or contradict it."
"Your ultimate goal: build the most complete picture of events that occurred."
)
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
@@ -26,10 +28,6 @@ class HypothesisAgent(BaseAgent):
self._register_hypothesis_tools()
def _register_hypothesis_tools(self) -> None:
"""Register hypothesis-specific tools."""
valid_edge_types = list(HYPOTHESIS_EDGE_WEIGHTS.keys())
self.register_tool(
name="add_hypothesis",
description=(
@@ -53,44 +51,6 @@ class HypothesisAgent(BaseAgent):
executor=self._add_hypothesis,
)
self.register_tool(
name="link_phenomenon_to_hypothesis",
description=(
"Link an existing phenomenon to a hypothesis with a relationship type. "
f"Valid relationship types: {', '.join(valid_edge_types)}. "
"direct_evidence = the phenomenon IS the hypothesis. "
"supports = consistent with the hypothesis. "
"prerequisite_met = a necessary condition is satisfied. "
"consequence_observed = an expected result of the hypothesis is found. "
"contradicts = directly contradicts the hypothesis. "
"weakens = makes the hypothesis less likely."
),
input_schema={
"type": "object",
"properties": {
"phenomenon_id": {
"type": "string",
"description": "ID of the phenomenon (e.g. 'ph-a1b2c3d4').",
},
"hypothesis_id": {
"type": "string",
"description": "ID of the hypothesis (e.g. 'hyp-e5f6g7h8').",
},
"edge_type": {
"type": "string",
"enum": valid_edge_types,
"description": "The edge_type of the relationship.",
},
"reason": {
"type": "string",
"description": "The reason this relationship holds (1-2 sentences).",
},
},
"required": ["phenomenon_id", "hypothesis_id", "edge_type", "reason"],
},
executor=self._link_phenomenon_to_hypothesis,
)
async def _add_hypothesis(self, title: str, description: str) -> str:
hid = await self.graph.add_hypothesis(
title=title,
@@ -98,33 +58,3 @@ class HypothesisAgent(BaseAgent):
created_by=self.name,
)
return f"Hypothesis created: {hid}{title} (confidence: 0.50)"
async def _link_phenomenon_to_hypothesis(
self,
phenomenon_id: str,
hypothesis_id: str,
edge_type: str = "",
reason: str = "",
# Common LLM misnaming — accept as fallbacks
relationship: str = "",
note: str = "",
) -> str:
edge_type = edge_type or relationship
reason = reason or note
if not edge_type:
return "Error: edge_type is required."
try:
new_conf = await self.graph.update_hypothesis_confidence(
hyp_id=hypothesis_id,
phenomenon_id=phenomenon_id,
edge_type=edge_type,
reason=reason,
)
weight = HYPOTHESIS_EDGE_WEIGHTS[edge_type]
direction = "+" if weight > 0 else ""
return (
f"Linked: {phenomenon_id} —[{edge_type}]→ {hypothesis_id} "
f"(weight: {direction}{weight}, new confidence: {new_conf:.3f})"
)
except ValueError as e:
return f"Error linking: {e}"

View File

@@ -37,6 +37,7 @@ class BaseAgent:
self._tools: dict[str, dict] = {} # name -> schema
self._executors: dict[str, Any] = {} # name -> async callable
self._work_log: list[str] = []
self._current_lead_id: str | None = None
def register_tool(
self,
@@ -107,11 +108,12 @@ class BaseAgent:
f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools"
)
async def run(self, task: str) -> str:
async def run(self, task: str, lead_id: str | None = None) -> str:
"""Run this agent with a specific task."""
_log(task, event="agent_start", agent=self.name)
self.graph.agent_status[self.name] = "running"
self.graph._current_agent = self.name
self._current_lead_id = lead_id
self._register_graph_tools()
@@ -375,6 +377,7 @@ class BaseAgent:
raw_data=raw_data,
timestamp=timestamp,
source_tool=source_tool,
from_lead_id=self._current_lead_id,
)
if merged:
return f"Phenomenon merged into existing: {pid}{title} (corroboration boost)"

View File

@@ -18,10 +18,12 @@ from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Predefined edge weights for Phenomenon → Hypothesis relationships.
# Default edge weights for Phenomenon → Hypothesis relationships.
# LLM only picks the edge type (categorical); the weight is looked up here.
# Override per-graph via EvidenceGraph(edge_weights=...) or config.yaml's
# `hypothesis_edge_weights` section.
# ---------------------------------------------------------------------------
HYPOTHESIS_EDGE_WEIGHTS: dict[str, float] = {
_DEFAULT_EDGE_WEIGHTS: dict[str, float] = {
"direct_evidence": +0.25,
"supports": +0.15,
"prerequisite_met": +0.10,
@@ -94,6 +96,7 @@ class Phenomenon:
confidence: float = 1.0
source_tool: str = ""
corroborating_agents: list[str] = field(default_factory=list)
from_lead_id: str | None = None
created_at: str = ""
def to_dict(self) -> dict:
@@ -239,8 +242,12 @@ class EvidenceGraph:
self,
case_info: dict | None = None,
persist_path: Path | None = None,
edge_weights: dict[str, float] | None = None,
) -> None:
self.case_info: dict = case_info or {}
self.edge_weights: dict[str, float] = (
dict(edge_weights) if edge_weights else dict(_DEFAULT_EDGE_WEIGHTS)
)
self.image_path: str = ""
self.partition_offset: int = 0
self.extracted_dir: str = "extracted"
@@ -304,12 +311,17 @@ class EvidenceGraph:
self._persist_path = old
@classmethod
def load_state(cls, path: Path) -> EvidenceGraph:
def load_state(
cls,
path: Path,
edge_weights: dict[str, float] | None = None,
) -> EvidenceGraph:
"""Restore an EvidenceGraph from a saved JSON state file."""
data = json.loads(path.read_text())
graph = cls(
case_info=data.get("case_info", {}),
persist_path=path,
edge_weights=edge_weights,
)
graph.image_path = data.get("image_path", "")
graph.partition_offset = data.get("partition_offset", 0)
@@ -403,6 +415,7 @@ class EvidenceGraph:
raw_data: dict | None = None,
timestamp: str | None = None,
source_tool: str = "",
from_lead_id: str | None = None,
) -> tuple[str, bool]:
"""Add a phenomenon. Returns (id, was_merged).
@@ -419,6 +432,8 @@ class EvidenceGraph:
for k, v in raw_data.items():
if k not in similar.raw_data:
similar.raw_data[k] = v
if from_lead_id and similar.from_lead_id is None:
similar.from_lead_id = from_lead_id
self._auto_save()
return similar.id, True
@@ -437,6 +452,7 @@ class EvidenceGraph:
timestamp=timestamp,
confidence=confidence,
source_tool=source_tool,
from_lead_id=from_lead_id,
created_at=datetime.now().isoformat(),
)
self.phenomena[pid] = ph
@@ -532,14 +548,14 @@ class EvidenceGraph:
) -> float:
"""Update hypothesis confidence based on a phenomenon linkage.
The edge_type must be one of HYPOTHESIS_EDGE_WEIGHTS keys.
Weight is looked up from the predefined table, NOT judged by LLM.
The edge_type must be one of self.edge_weights keys.
Weight is looked up from the configured table, NOT judged by LLM.
Returns the new confidence value.
"""
if edge_type not in HYPOTHESIS_EDGE_WEIGHTS:
if edge_type not in self.edge_weights:
raise ValueError(
f"Invalid hypothesis edge type: {edge_type}. "
f"Must be one of: {list(HYPOTHESIS_EDGE_WEIGHTS.keys())}"
f"Must be one of: {list(self.edge_weights.keys())}"
)
async with self._lock:
@@ -549,7 +565,7 @@ class EvidenceGraph:
if hyp is None:
raise ValueError(f"Hypothesis not found: {hyp_id}")
weight = HYPOTHESIS_EDGE_WEIGHTS[edge_type]
weight = self.edge_weights[edge_type]
old_conf = hyp.confidence
if weight > 0:

View File

@@ -229,6 +229,7 @@ async def async_main() -> None:
graph = EvidenceGraph(
case_info=config.get("cfreds_hacking_case", {}),
persist_path=run_dir / "graph_state.json",
edge_weights=config.get("hypothesis_edge_weights"),
)
graph.image_path = image_path
graph.partition_offset = partition_offset

View File

@@ -11,7 +11,7 @@ from datetime import datetime
from pathlib import Path
from agent_factory import AgentFactory
from evidence_graph import EvidenceGraph, HYPOTHESIS_EDGE_WEIGHTS
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
logger = logging.getLogger(__name__)
@@ -149,7 +149,8 @@ class Orchestrator:
await agent.run(
f"Investigate this lead: {lead.description}\n"
f"{hyp_line}"
f"Focus area: {lead.target_agent}"
f"Focus area: {lead.target_agent}",
lead_id=lead.id,
)
await self.graph.mark_lead_completed(lead.id)
self._failure_count = 0
@@ -209,11 +210,9 @@ class Orchestrator:
"1. Specific and testable\n"
"2. About a distinct aspect of activity (e.g., hacking tools, communication, "
"network attacks, data theft)\n\n"
"For each hypothesis:\n"
"- Call add_hypothesis to create it\n"
"- Then call link_phenomenon_to_hypothesis to link relevant existing phenomena\n"
"- Choose the relationship type carefully: direct_evidence, supports, "
"prerequisite_met, consequence_observed, contradicts, or weakens\n\n"
"Call add_hypothesis for each. The orchestrator will automatically link "
"relevant existing phenomena to each hypothesis after you finish — you do "
"not need to (and cannot) create those links yourself.\n\n"
"The ultimate goal is to reconstruct a detailed timeline of what happened on this host."
)
@@ -333,7 +332,7 @@ class Orchestrator:
if not unlinked:
return
valid_types = list(HYPOTHESIS_EDGE_WEIGHTS.keys())
valid_types = list(self.graph.edge_weights.keys())
hyp_section = "\n".join(
f" [{h.id}] {h.title}: {h.description}" for h in active
@@ -370,7 +369,7 @@ class Orchestrator:
if (
hyp_id in self.graph.hypotheses
and ph_id in self.graph.phenomena
and edge_type in HYPOTHESIS_EDGE_WEIGHTS
and edge_type in self.graph.edge_weights
):
await self.graph.update_hypothesis_confidence(
hyp_id=hyp_id,
@@ -413,7 +412,7 @@ class Orchestrator:
ph_id = j.get("phenomenon_id", "")
edge_type = j.get("edge_type", "")
reason = j.get("reason", "")
if ph_id in self.graph.phenomena and edge_type in HYPOTHESIS_EDGE_WEIGHTS:
if ph_id in self.graph.phenomena and edge_type in self.graph.edge_weights:
await self.graph.update_hypothesis_confidence(
hyp_id=hyp.id,
phenomenon_id=ph_id,
@@ -505,6 +504,7 @@ class Orchestrator:
break
_log(f"Gap fill round {round_num}: {len(pending)} leads", event="dispatch")
await self._dispatch_leads_parallel(pending)
await self._judge_new_phenomena()
# ---- Run archiving -------------------------------------------------------
@@ -604,11 +604,13 @@ class Orchestrator:
manual_hypotheses = self.config.get("hypotheses", [])
if manual_hypotheses:
await self._generate_hypotheses_manual(manual_hypotheses)
if self.graph.phenomena:
await self._judge_new_phenomena()
else:
await self._generate_hypotheses_auto()
# Unified judge step — link Phase 1 phenomena to newly-created hypotheses
if self.graph.phenomena and self.graph.hypotheses:
await self._judge_new_phenomena()
for h in self.graph.hypotheses.values():
_log(f" {h.summary()}", event="hypothesis")
_log(

View File

@@ -13,8 +13,16 @@ from tool_registry import register_all_tools
async def main() -> None:
# Find the run to regenerate from
run_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("runs/2026-04-02T15-11-25")
# Find the run: CLI arg, or latest run with a graph_state.json
if len(sys.argv) > 1:
run_dir = Path(sys.argv[1])
else:
states = sorted(Path("runs").glob("*/graph_state.json"), reverse=True)
if not states:
print("No runs found in runs/")
return
run_dir = states[0].parent
print(f"Using latest run: {run_dir.name}")
state_path = run_dir / "graph_state.json"
if not state_path.exists():
@@ -24,8 +32,11 @@ async def main() -> None:
config = yaml.safe_load(open("config.yaml"))
agent_cfg = config["agent"]
# Load graph
graph = EvidenceGraph.load_state(state_path)
# Load graph (edge_weights from config — applied to the loaded graph)
graph = EvidenceGraph.load_state(
state_path,
edge_weights=config.get("hypothesis_edge_weights"),
)
print(f"Loaded: {graph.stats_summary()}")
# LLM client with larger max_tokens for report