diff --git a/agent_factory.py b/agent_factory.py index e55a680..18582f6 100644 --- a/agent_factory.py +++ b/agent_factory.py @@ -1,150 +1,50 @@ -"""Agent Factory — composes agents from tool registry and role templates. +"""Agent Factory — instantiates agents from registered classes. -Provides both pre-defined agent templates (filesystem, registry, etc.) -and LLM-driven dynamic agent composition for capability gaps. +Each agent type has a dedicated subclass under agents/ that owns its name, +role description, and tool list (single source of truth). The factory just +maps agent_type → class. Also supports LLM-driven dynamic composition for +capability gaps via create_specialized_agent(). """ from __future__ import annotations import json import logging -from dataclasses import dataclass, field from base_agent import BaseAgent from evidence_graph import EvidenceGraph from llm_client import LLMClient -from tool_registry import TOOL_CATALOG, ToolDefinition +from tool_registry import TOOL_CATALOG -# Agent classes with custom tools — keyed by template name -_AGENT_CLASSES: dict[str, type] = {} +# Agent classes keyed by name. Populated lazily to avoid circular imports. +_AGENT_CLASSES: dict[str, type[BaseAgent]] = {} def _load_agent_classes() -> None: - """Lazy-import custom agent classes to avoid circular imports.""" + """Lazy-import agent classes to avoid circular imports.""" if _AGENT_CLASSES: return + from agents.communication import CommunicationAgent + from agents.filesystem import FileSystemAgent from agents.hypothesis import HypothesisAgent + from agents.network import NetworkAgent + from agents.registry import RegistryAgent from agents.report import ReportAgent + from agents.timeline import TimelineAgent + _AGENT_CLASSES["filesystem"] = FileSystemAgent + _AGENT_CLASSES["registry"] = RegistryAgent + _AGENT_CLASSES["communication"] = CommunicationAgent + _AGENT_CLASSES["network"] = NetworkAgent + _AGENT_CLASSES["timeline"] = TimelineAgent _AGENT_CLASSES["hypothesis"] = HypothesisAgent _AGENT_CLASSES["report"] = ReportAgent + logger = logging.getLogger(__name__) -@dataclass -class RoleTemplate: - """Pre-defined agent archetype.""" - - name: str - role: str - default_tools: list[str] # tool names from TOOL_CATALOG - tags: list[str] = field(default_factory=list) - - -# Pre-defined templates matching the original 6 agents + hypothesis agent. -ROLE_TEMPLATES: dict[str, RoleTemplate] = { - "filesystem": RoleTemplate( - name="filesystem", - role=( - "File system forensic analyst. You examine disk image partition layouts, " - "directory structures, file metadata, and recover deleted files. " - "You identify suspicious files, installed programs, and user data locations. " - "You also handle Recycle Bin forensics and Prefetch execution evidence." - ), - default_tools=[ - "partition_info", "filesystem_info", "list_directory", - "extract_file", "find_file", "search_strings", - "parse_prefetch", "count_deleted_files", - "read_text_file", "search_text_file", "read_binary_preview", - ], - tags=["filesystem", "disk", "files", "deleted", "prefetch"], - ), - "registry": RoleTemplate( - name="registry", - role=( - "Windows registry forensic analyst. You parse registry hive files " - "(SYSTEM, SOFTWARE, SAM, NTUSER.DAT) to extract system configuration, " - "user accounts, installed software, network settings, email accounts, " - "and other Windows artifacts." - ), - default_tools=[ - "extract_file", "list_directory", - "parse_registry_key", "list_installed_software", - "get_user_activity", "search_registry", - "get_system_info", "get_timezone_info", "get_computer_name", - "get_shutdown_time", "enumerate_users", - "get_network_interfaces", "get_email_config", - ], - tags=["registry", "windows", "system", "user", "software"], - ), - "communication": RoleTemplate( - name="communication", - role=( - "Communication forensic analyst. You analyze email files (.dbx, .pst), " - "IRC/mIRC chat logs, newsgroup data, and other messaging artifacts " - "to identify communication patterns and contacts." - ), - default_tools=[ - "list_directory", "extract_file", - "read_text_file", "read_binary_preview", - "list_extracted_dir", "search_strings", - "search_text_file", "read_text_file_section", - ], - tags=["email", "chat", "irc", "messaging", "communication"], - ), - "network": RoleTemplate( - name="network", - role=( - "Network forensic analyst. You analyze browser history, cookies, " - "network captures (PCAP), wireless artifacts, and other network-related " - "evidence to reconstruct online activities." - ), - default_tools=[ - "list_directory", "extract_file", - "read_text_file", "read_binary_preview", - "list_extracted_dir", "search_strings", - "search_text_file", "read_text_file_section", - "parse_pcap_strings", - ], - tags=["network", "browser", "pcap", "http", "internet"], - ), - "timeline": RoleTemplate( - name="timeline", - role=( - "Timeline correlation analyst. You build chronological timelines " - "by combining filesystem MAC times with evidence from other agents. " - "You identify temporal patterns and correlate events across categories." - ), - default_tools=[ - "build_filesystem_timeline", - ], - tags=["timeline", "correlation", "temporal"], - ), - "report": RoleTemplate( - name="report", - role=( - "Forensic report writer. You synthesize all evidence and hypotheses " - "into a comprehensive forensic analysis report with executive summary, " - "detailed findings organized by hypothesis, timeline of events, and conclusions." - ), - default_tools=[], # Report agent uses only graph query tools - tags=["report", "summary", "writing"], - ), - "hypothesis": RoleTemplate( - name="hypothesis", - role=( - "Hypothesis analyst. You review all phenomena discovered so far " - "and formulate investigative hypotheses about what happened on the system. " - "For each hypothesis, identify which existing phenomena support or contradict it." - ), - default_tools=[], # Uses only graph query + hypothesis tools - tags=["hypothesis", "analysis", "reasoning"], - ), -} - - class AgentFactory: - """Creates agents from templates or dynamically via LLM composition.""" + """Creates agents from registered classes or dynamically via LLM composition.""" def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None: self.llm = llm @@ -152,40 +52,20 @@ class AgentFactory: self._cache: dict[str, BaseAgent] = {} def get_or_create_agent(self, agent_type: str) -> BaseAgent | None: - """Get a cached agent or create one from a template.""" + """Get a cached agent or instantiate one from its registered class.""" if agent_type in self._cache: return self._cache[agent_type] - template = ROLE_TEMPLATES.get(agent_type) - if template is None: - logger.warning("No template for agent type: %s", agent_type) - return None - - # Use custom agent class if one exists, otherwise BaseAgent _load_agent_classes() agent_cls = _AGENT_CLASSES.get(agent_type) - if agent_cls is not None: - agent = agent_cls(self.llm, self.graph) - else: - agent = self._instantiate_from_template(template) + if agent_cls is None: + logger.warning("No agent class for type: %s", agent_type) + return None + + agent = agent_cls(self.llm, self.graph) self._cache[agent_type] = agent return agent - def _instantiate_from_template(self, template: RoleTemplate) -> BaseAgent: - """Create a BaseAgent from a role template, registering tools from the catalog.""" - agent = BaseAgent(self.llm, self.graph) - agent.name = template.name - agent.role = template.role - - for tool_name in template.default_tools: - td = TOOL_CATALOG.get(tool_name) - if td is None: - logger.warning("Tool '%s' not in catalog (template: %s)", tool_name, template.name) - continue - agent.register_tool(td.name, td.description, td.input_schema, td.executor) - - return agent - async def create_specialized_agent( self, hypothesis_title: str, @@ -220,18 +100,15 @@ class AgentFactory: messages=[{"role": "user", "content": prompt}], ) - # Parse response — try to extract JSON try: config = json.loads(response) except json.JSONDecodeError: - # Try to find JSON in the response import re match = re.search(r'\{.*\}', response, re.DOTALL) if match: config = json.loads(match.group()) else: logger.error("Failed to parse agent composition response: %s", response[:300]) - # Fallback: create a generic agent with all tools return self._create_fallback_agent(capability_gap) agent_name = config.get("agent_name", "specialized") @@ -239,13 +116,11 @@ class AgentFactory: strategy = config.get("strategy", "") tool_names = config.get("tools", []) - # Validate tool names against catalog valid_tools = [t for t in tool_names if t in TOOL_CATALOG] if not valid_tools: logger.warning("No valid tools selected by LLM, using fallback") return self._create_fallback_agent(capability_gap) - # Build agent agent = BaseAgent(self.llm, self.graph) agent.name = agent_name agent.role = f"{role_text}\n\nInvestigation Strategy:\n{strategy}" diff --git a/agents/hypothesis.py b/agents/hypothesis.py index 4e25f89..e493cdb 100644 --- a/agents/hypothesis.py +++ b/agents/hypothesis.py @@ -1,12 +1,15 @@ -"""Hypothesis Agent — analyzes phenomena and generates investigative hypotheses.""" +"""Hypothesis Agent — generates investigative hypotheses from phenomena. + +Generates hypotheses only. Phenomenon→Hypothesis linking is handled centrally +by Orchestrator._judge_new_phenomena, so all link logic lives in one place. +""" from __future__ import annotations -import json import logging from base_agent import BaseAgent -from evidence_graph import EvidenceGraph, HYPOTHESIS_EDGE_WEIGHTS +from evidence_graph import EvidenceGraph from llm_client import LLMClient logger = logging.getLogger(__name__) @@ -17,8 +20,7 @@ class HypothesisAgent(BaseAgent): role = ( "Hypothesis analyst. You review all phenomena discovered so far " "and formulate investigative hypotheses about what happened on this system. " - "Your ultimate goal: build the most complete picture of events that occurred. " - "For each hypothesis, identify which existing phenomena support or contradict it." + "Your ultimate goal: build the most complete picture of events that occurred." ) def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None: @@ -26,10 +28,6 @@ class HypothesisAgent(BaseAgent): self._register_hypothesis_tools() def _register_hypothesis_tools(self) -> None: - """Register hypothesis-specific tools.""" - - valid_edge_types = list(HYPOTHESIS_EDGE_WEIGHTS.keys()) - self.register_tool( name="add_hypothesis", description=( @@ -53,44 +51,6 @@ class HypothesisAgent(BaseAgent): executor=self._add_hypothesis, ) - self.register_tool( - name="link_phenomenon_to_hypothesis", - description=( - "Link an existing phenomenon to a hypothesis with a relationship type. " - f"Valid relationship types: {', '.join(valid_edge_types)}. " - "direct_evidence = the phenomenon IS the hypothesis. " - "supports = consistent with the hypothesis. " - "prerequisite_met = a necessary condition is satisfied. " - "consequence_observed = an expected result of the hypothesis is found. " - "contradicts = directly contradicts the hypothesis. " - "weakens = makes the hypothesis less likely." - ), - input_schema={ - "type": "object", - "properties": { - "phenomenon_id": { - "type": "string", - "description": "ID of the phenomenon (e.g. 'ph-a1b2c3d4').", - }, - "hypothesis_id": { - "type": "string", - "description": "ID of the hypothesis (e.g. 'hyp-e5f6g7h8').", - }, - "edge_type": { - "type": "string", - "enum": valid_edge_types, - "description": "The edge_type of the relationship.", - }, - "reason": { - "type": "string", - "description": "The reason this relationship holds (1-2 sentences).", - }, - }, - "required": ["phenomenon_id", "hypothesis_id", "edge_type", "reason"], - }, - executor=self._link_phenomenon_to_hypothesis, - ) - async def _add_hypothesis(self, title: str, description: str) -> str: hid = await self.graph.add_hypothesis( title=title, @@ -98,33 +58,3 @@ class HypothesisAgent(BaseAgent): created_by=self.name, ) return f"Hypothesis created: {hid} — {title} (confidence: 0.50)" - - async def _link_phenomenon_to_hypothesis( - self, - phenomenon_id: str, - hypothesis_id: str, - edge_type: str = "", - reason: str = "", - # Common LLM misnaming — accept as fallbacks - relationship: str = "", - note: str = "", - ) -> str: - edge_type = edge_type or relationship - reason = reason or note - if not edge_type: - return "Error: edge_type is required." - try: - new_conf = await self.graph.update_hypothesis_confidence( - hyp_id=hypothesis_id, - phenomenon_id=phenomenon_id, - edge_type=edge_type, - reason=reason, - ) - weight = HYPOTHESIS_EDGE_WEIGHTS[edge_type] - direction = "+" if weight > 0 else "" - return ( - f"Linked: {phenomenon_id} —[{edge_type}]→ {hypothesis_id} " - f"(weight: {direction}{weight}, new confidence: {new_conf:.3f})" - ) - except ValueError as e: - return f"Error linking: {e}" diff --git a/base_agent.py b/base_agent.py index a385f47..b84dda5 100644 --- a/base_agent.py +++ b/base_agent.py @@ -37,6 +37,7 @@ class BaseAgent: self._tools: dict[str, dict] = {} # name -> schema self._executors: dict[str, Any] = {} # name -> async callable self._work_log: list[str] = [] + self._current_lead_id: str | None = None def register_tool( self, @@ -107,11 +108,12 @@ class BaseAgent: f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools" ) - async def run(self, task: str) -> str: + async def run(self, task: str, lead_id: str | None = None) -> str: """Run this agent with a specific task.""" _log(task, event="agent_start", agent=self.name) self.graph.agent_status[self.name] = "running" self.graph._current_agent = self.name + self._current_lead_id = lead_id self._register_graph_tools() @@ -375,6 +377,7 @@ class BaseAgent: raw_data=raw_data, timestamp=timestamp, source_tool=source_tool, + from_lead_id=self._current_lead_id, ) if merged: return f"Phenomenon merged into existing: {pid} — {title} (corroboration boost)" diff --git a/evidence_graph.py b/evidence_graph.py index efdad6d..99ac1cd 100644 --- a/evidence_graph.py +++ b/evidence_graph.py @@ -18,10 +18,12 @@ from pathlib import Path logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Predefined edge weights for Phenomenon → Hypothesis relationships. +# Default edge weights for Phenomenon → Hypothesis relationships. # LLM only picks the edge type (categorical); the weight is looked up here. +# Override per-graph via EvidenceGraph(edge_weights=...) or config.yaml's +# `hypothesis_edge_weights` section. # --------------------------------------------------------------------------- -HYPOTHESIS_EDGE_WEIGHTS: dict[str, float] = { +_DEFAULT_EDGE_WEIGHTS: dict[str, float] = { "direct_evidence": +0.25, "supports": +0.15, "prerequisite_met": +0.10, @@ -94,6 +96,7 @@ class Phenomenon: confidence: float = 1.0 source_tool: str = "" corroborating_agents: list[str] = field(default_factory=list) + from_lead_id: str | None = None created_at: str = "" def to_dict(self) -> dict: @@ -239,8 +242,12 @@ class EvidenceGraph: self, case_info: dict | None = None, persist_path: Path | None = None, + edge_weights: dict[str, float] | None = None, ) -> None: self.case_info: dict = case_info or {} + self.edge_weights: dict[str, float] = ( + dict(edge_weights) if edge_weights else dict(_DEFAULT_EDGE_WEIGHTS) + ) self.image_path: str = "" self.partition_offset: int = 0 self.extracted_dir: str = "extracted" @@ -304,12 +311,17 @@ class EvidenceGraph: self._persist_path = old @classmethod - def load_state(cls, path: Path) -> EvidenceGraph: + def load_state( + cls, + path: Path, + edge_weights: dict[str, float] | None = None, + ) -> EvidenceGraph: """Restore an EvidenceGraph from a saved JSON state file.""" data = json.loads(path.read_text()) graph = cls( case_info=data.get("case_info", {}), persist_path=path, + edge_weights=edge_weights, ) graph.image_path = data.get("image_path", "") graph.partition_offset = data.get("partition_offset", 0) @@ -403,6 +415,7 @@ class EvidenceGraph: raw_data: dict | None = None, timestamp: str | None = None, source_tool: str = "", + from_lead_id: str | None = None, ) -> tuple[str, bool]: """Add a phenomenon. Returns (id, was_merged). @@ -419,6 +432,8 @@ class EvidenceGraph: for k, v in raw_data.items(): if k not in similar.raw_data: similar.raw_data[k] = v + if from_lead_id and similar.from_lead_id is None: + similar.from_lead_id = from_lead_id self._auto_save() return similar.id, True @@ -437,6 +452,7 @@ class EvidenceGraph: timestamp=timestamp, confidence=confidence, source_tool=source_tool, + from_lead_id=from_lead_id, created_at=datetime.now().isoformat(), ) self.phenomena[pid] = ph @@ -532,14 +548,14 @@ class EvidenceGraph: ) -> float: """Update hypothesis confidence based on a phenomenon linkage. - The edge_type must be one of HYPOTHESIS_EDGE_WEIGHTS keys. - Weight is looked up from the predefined table, NOT judged by LLM. + The edge_type must be one of self.edge_weights keys. + Weight is looked up from the configured table, NOT judged by LLM. Returns the new confidence value. """ - if edge_type not in HYPOTHESIS_EDGE_WEIGHTS: + if edge_type not in self.edge_weights: raise ValueError( f"Invalid hypothesis edge type: {edge_type}. " - f"Must be one of: {list(HYPOTHESIS_EDGE_WEIGHTS.keys())}" + f"Must be one of: {list(self.edge_weights.keys())}" ) async with self._lock: @@ -549,7 +565,7 @@ class EvidenceGraph: if hyp is None: raise ValueError(f"Hypothesis not found: {hyp_id}") - weight = HYPOTHESIS_EDGE_WEIGHTS[edge_type] + weight = self.edge_weights[edge_type] old_conf = hyp.confidence if weight > 0: diff --git a/main.py b/main.py index 060d826..b1b6a6a 100644 --- a/main.py +++ b/main.py @@ -229,6 +229,7 @@ async def async_main() -> None: graph = EvidenceGraph( case_info=config.get("cfreds_hacking_case", {}), persist_path=run_dir / "graph_state.json", + edge_weights=config.get("hypothesis_edge_weights"), ) graph.image_path = image_path graph.partition_offset = partition_offset diff --git a/orchestrator.py b/orchestrator.py index f6bf193..839d873 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -11,7 +11,7 @@ from datetime import datetime from pathlib import Path from agent_factory import AgentFactory -from evidence_graph import EvidenceGraph, HYPOTHESIS_EDGE_WEIGHTS +from evidence_graph import EvidenceGraph from llm_client import LLMClient logger = logging.getLogger(__name__) @@ -149,7 +149,8 @@ class Orchestrator: await agent.run( f"Investigate this lead: {lead.description}\n" f"{hyp_line}" - f"Focus area: {lead.target_agent}" + f"Focus area: {lead.target_agent}", + lead_id=lead.id, ) await self.graph.mark_lead_completed(lead.id) self._failure_count = 0 @@ -209,11 +210,9 @@ class Orchestrator: "1. Specific and testable\n" "2. About a distinct aspect of activity (e.g., hacking tools, communication, " "network attacks, data theft)\n\n" - "For each hypothesis:\n" - "- Call add_hypothesis to create it\n" - "- Then call link_phenomenon_to_hypothesis to link relevant existing phenomena\n" - "- Choose the relationship type carefully: direct_evidence, supports, " - "prerequisite_met, consequence_observed, contradicts, or weakens\n\n" + "Call add_hypothesis for each. The orchestrator will automatically link " + "relevant existing phenomena to each hypothesis after you finish — you do " + "not need to (and cannot) create those links yourself.\n\n" "The ultimate goal is to reconstruct a detailed timeline of what happened on this host." ) @@ -333,7 +332,7 @@ class Orchestrator: if not unlinked: return - valid_types = list(HYPOTHESIS_EDGE_WEIGHTS.keys()) + valid_types = list(self.graph.edge_weights.keys()) hyp_section = "\n".join( f" [{h.id}] {h.title}: {h.description}" for h in active @@ -370,7 +369,7 @@ class Orchestrator: if ( hyp_id in self.graph.hypotheses and ph_id in self.graph.phenomena - and edge_type in HYPOTHESIS_EDGE_WEIGHTS + and edge_type in self.graph.edge_weights ): await self.graph.update_hypothesis_confidence( hyp_id=hyp_id, @@ -413,7 +412,7 @@ class Orchestrator: ph_id = j.get("phenomenon_id", "") edge_type = j.get("edge_type", "") reason = j.get("reason", "") - if ph_id in self.graph.phenomena and edge_type in HYPOTHESIS_EDGE_WEIGHTS: + if ph_id in self.graph.phenomena and edge_type in self.graph.edge_weights: await self.graph.update_hypothesis_confidence( hyp_id=hyp.id, phenomenon_id=ph_id, @@ -505,6 +504,7 @@ class Orchestrator: break _log(f"Gap fill round {round_num}: {len(pending)} leads", event="dispatch") await self._dispatch_leads_parallel(pending) + await self._judge_new_phenomena() # ---- Run archiving ------------------------------------------------------- @@ -604,11 +604,13 @@ class Orchestrator: manual_hypotheses = self.config.get("hypotheses", []) if manual_hypotheses: await self._generate_hypotheses_manual(manual_hypotheses) - if self.graph.phenomena: - await self._judge_new_phenomena() else: await self._generate_hypotheses_auto() + # Unified judge step — link Phase 1 phenomena to newly-created hypotheses + if self.graph.phenomena and self.graph.hypotheses: + await self._judge_new_phenomena() + for h in self.graph.hypotheses.values(): _log(f" {h.summary()}", event="hypothesis") _log( diff --git a/regenerate_report.py b/regenerate_report.py index 4e01b42..18b9338 100644 --- a/regenerate_report.py +++ b/regenerate_report.py @@ -13,8 +13,16 @@ from tool_registry import register_all_tools async def main() -> None: - # Find the run to regenerate from - run_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("runs/2026-04-02T15-11-25") + # Find the run: CLI arg, or latest run with a graph_state.json + if len(sys.argv) > 1: + run_dir = Path(sys.argv[1]) + else: + states = sorted(Path("runs").glob("*/graph_state.json"), reverse=True) + if not states: + print("No runs found in runs/") + return + run_dir = states[0].parent + print(f"Using latest run: {run_dir.name}") state_path = run_dir / "graph_state.json" if not state_path.exists(): @@ -24,8 +32,11 @@ async def main() -> None: config = yaml.safe_load(open("config.yaml")) agent_cfg = config["agent"] - # Load graph - graph = EvidenceGraph.load_state(state_path) + # Load graph (edge_weights from config — applied to the loaded graph) + graph = EvidenceGraph.load_state( + state_path, + edge_weights=config.get("hypothesis_edge_weights"), + ) print(f"Loaded: {graph.stats_summary()}") # LLM client with larger max_tokens for report