diff --git a/agents/hypothesis.py b/agents/hypothesis.py index e493cdb..6284d8f 100644 --- a/agents/hypothesis.py +++ b/agents/hypothesis.py @@ -1,7 +1,9 @@ """Hypothesis Agent — generates investigative hypotheses from phenomena. Generates hypotheses only. Phenomenon→Hypothesis linking is handled centrally -by Orchestrator._judge_new_phenomena, so all link logic lives in one place. +by Orchestrator._judge_new_phenomena. Tool set is restricted to read-only +graph queries + add_hypothesis to prevent the agent from creating phenomena, +leads, or entity links. """ from __future__ import annotations @@ -27,6 +29,10 @@ class HypothesisAgent(BaseAgent): super().__init__(llm, graph) self._register_hypothesis_tools() + def _register_graph_tools(self) -> None: + """Restrict to read-only graph tools. add_hypothesis is registered separately.""" + self._register_graph_read_tools() + def _register_hypothesis_tools(self) -> None: self.register_tool( name="add_hypothesis", @@ -51,6 +57,32 @@ class HypothesisAgent(BaseAgent): executor=self._add_hypothesis, ) + def _build_system_prompt(self, task: str) -> str: + """Focused prompt — no INVESTIGATE/RECORD/LINK workflow.""" + return ( + f"You are {self.name}, a forensic hypothesis analyst.\n" + f"Role: {self.role}\n\n" + f"Image: {self.graph.image_path}\n" + f"Current investigation state: {self.graph.stats_summary()}\n\n" + f"Your task: {task}\n\n" + f"WORKFLOW:\n" + f"1. Call list_phenomena and search_graph to review existing findings.\n" + f"2. For each hypothesis you want to record, call add_hypothesis (title + description).\n" + f"3. Wrap a short summary in when you have generated 3-7 hypotheses.\n\n" + f"STRICT BOUNDARIES:\n" + f"- Your only mutation tool is add_hypothesis. Do NOT attempt list_directory, " + f"parse_registry_key, extract_file, or any disk-image investigation tools — " + f"they are not yours and you will get 'unknown tool' errors.\n" + f"- You CANNOT create phenomena, leads, or entity links. The orchestrator handles " + f"all phenomenon↔hypothesis linking after you finish.\n" + f"- Each hypothesis must be specific and testable. Avoid generic templates like " + f"'Unauthorized Remote Access' or 'Malware Deployment' unless concrete phenomena " + f"in the graph already point to them.\n" + f"- If the graph is empty, generate broad starting hypotheses and mark them " + f"clearly as exploratory in their description so downstream agents know they " + f"still need evidence." + ) + async def _add_hypothesis(self, title: str, description: str) -> str: hid = await self.graph.add_hypothesis( title=title, diff --git a/agents/report.py b/agents/report.py index 552d070..0ce0b15 100644 --- a/agents/report.py +++ b/agents/report.py @@ -25,6 +25,10 @@ class ReportAgent(BaseAgent): super().__init__(llm, graph) self._register_tools() + def _register_graph_tools(self) -> None: + """Restrict to read-only graph tools. Report agent does not mutate state.""" + self._register_graph_read_tools() + def _build_system_prompt(self, task: str) -> str: """Report agent gets a clean prompt — no Phase A/B/C/D workflow.""" return ( diff --git a/agents/timeline.py b/agents/timeline.py index 0365510..752fb33 100644 --- a/agents/timeline.py +++ b/agents/timeline.py @@ -1,14 +1,21 @@ -"""Timeline Agent — correlates evidence across time.""" +"""Timeline Agent — connects existing phenomena with temporal edges. + +Operates on phenomena already in the graph. Does NOT investigate the disk +image itself. The agent's only useful output is the temporal edges it +creates between phenomena. +""" from __future__ import annotations -import json +import logging from base_agent import BaseAgent from evidence_graph import EvidenceGraph from llm_client import LLMClient from tool_registry import TOOL_CATALOG +logger = logging.getLogger(__name__) + class TimelineAgent(BaseAgent): name = "timeline" @@ -22,24 +29,33 @@ class TimelineAgent(BaseAgent): super().__init__(llm, graph) self._register_tools() + def _register_graph_tools(self) -> None: + """Restrict to read-only graph tools — Timeline does not add phenomena.""" + self._register_graph_read_tools() + def _register_tools(self) -> None: - # Filesystem timeline tool from catalog td = TOOL_CATALOG.get("build_filesystem_timeline") if td: self.register_tool(td.name, td.description, td.input_schema, td.executor) - # Custom tool to get all phenomena with timestamps for correlation self.register_tool( name="get_timestamped_phenomena", - description="Get all phenomena that have timestamps, sorted chronologically. Use for timeline correlation.", + description=( + "Get all phenomena that have timestamps, sorted chronologically. " + "Returns each phenomenon's id, category, title, and a short description " + "preview. Use this as your primary input for temporal correlation." + ), input_schema={"type": "object", "properties": {}}, executor=self._get_timestamped_phenomena, ) - # Tool to add temporal edges between phenomena self.register_tool( name="add_temporal_edge", - description="Add a temporal relationship between two phenomena (before, after, or concurrent).", + description=( + "Add a temporal relationship edge between two existing phenomena. " + "Use 'before' when source phenomenon happened before target, " + "'concurrent' when they occurred within seconds of each other." + ), input_schema={ "type": "object", "properties": { @@ -56,6 +72,42 @@ class TimelineAgent(BaseAgent): executor=self._add_temporal_edge, ) + def _build_system_prompt(self, task: str) -> str: + """Focused prompt — Timeline connects existing phenomena, doesn't investigate.""" + return ( + f"You are {self.name}, a forensic timeline correlation analyst.\n" + f"Role: {self.role}\n\n" + f"Image: {self.graph.image_path}\n" + f"Current state: {self.graph.stats_summary()}\n\n" + f"Your task: {task}\n\n" + f"WORKFLOW:\n" + f"1. Call build_filesystem_timeline once to materialize MAC times for the disk.\n" + f"2. Call get_timestamped_phenomena to see all phenomena with timestamps, " + f"sorted chronologically. THIS IS YOUR PRIMARY INPUT.\n" + f"3. For each meaningful temporal relationship between phenomena, call " + f"add_temporal_edge(source_id, target_id, relation). Use 'before' when " + f"source happened first (the common case); 'concurrent' for events within " + f"a few seconds of each other.\n" + f" Examples of meaningful connections:\n" + f" - 'Cain installer executed' (before) 'Cain.exe first execution'\n" + f" - 'WHOIS first lookup' (before) 'WHOIS second lookup'\n" + f" - 'Recon tool cluster' (before) 'Anti-forensics defrag'\n" + f" - 'Tool installation' (before) 'Tool execution'\n" + f"4. Aim for 15-40 temporal edges that connect the major events into a " + f"forensic story.\n" + f"5. Wrap a short summary in when done.\n\n" + f"STRICT BOUNDARIES:\n" + f"- Your job is to CONNECT existing phenomena, NOT to discover new ones. " + f"You CANNOT call add_phenomenon — the tool isn't yours.\n" + f"- Use ONLY phenomenon IDs returned by get_timestamped_phenomena or " + f"list_phenomena. NEVER fabricate IDs.\n" + f"- Connect events that tell a forensic story (recon -> exploit -> cover-up). " + f"Do not exhaustively pair every two phenomena; focus on causally-relevant " + f"sequences.\n" + f"- The orchestrator handles report writing in the next phase. Your only " + f"output that propagates is the temporal edges you create." + ) + async def _get_timestamped_phenomena(self) -> str: items = [ ph for ph in self.graph.phenomena.values() diff --git a/base_agent.py b/base_agent.py index b84dda5..41691bb 100644 --- a/base_agent.py +++ b/base_agent.py @@ -93,12 +93,20 @@ class BaseAgent: f" NEVER guess or fabricate a phenomenon ID. If an ID is not in list_phenomena output, it does not exist.\n\n" f"Phase D — ANSWER:\n" f" Only give your AFTER completing Phases B and C.\n\n" - f"IMPORTANT:\n" - f"- You MUST call add_phenomenon at least once before finishing\n" - f"- Complete each phase before starting the next\n" - f"- Other agents can ONLY see what you write to the graph\n" - f"- If you don't record findings, they are LOST\n" - f"- Include relevant file paths, inode numbers, timestamps, and raw data\n\n" + f"CRITICAL — RECORDING REQUIREMENT:\n" + f"- Your block is DISCARDED by the orchestrator. Only graph mutations propagate.\n" + f"- Other agents and the final report read ONLY the evidence graph " + f"(phenomena, entities, edges).\n" + f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you end.\n" + f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, " + f"a registry key) and found NOTHING, that absence IS evidence — call " + f"add_phenomenon with a 'No matches for X' title and the search scope in " + f"raw_data. Negative findings constrain the hypothesis space and prevent " + f"the next agent from wasting time re-searching.\n" + f"- If you produce without having called add_phenomenon at least once, " + f"the task is FAILED regardless of what you wrote in .\n" + f"- Include exact file paths, inode numbers, timestamps, and the source_tool " + f"that produced each finding.\n\n" f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n" f"- ONLY record findings that appear VERBATIM in tool results you received\n" f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n" @@ -145,9 +153,17 @@ class BaseAgent: # ---- Graph interaction tools -------------------------------------------- def _register_graph_tools(self) -> None: - """Register tools for querying and writing to the evidence graph.""" + """Register graph query + mutation tools. - # --- Read tools --- + Subclasses can override to restrict the toolset. For example, a + read-only agent (hypothesis, report) overrides this to skip + _register_graph_write_tools. + """ + self._register_graph_read_tools() + self._register_graph_write_tools() + + def _register_graph_read_tools(self) -> None: + """Register read-only graph + asset query tools.""" self.register_tool( name="list_phenomena", @@ -213,7 +229,49 @@ class BaseAgent: executor=self._get_hypothesis_status, ) - # --- Write tools --- + self.register_tool( + name="list_assets", + description=( + "List all files extracted from the disk image. " + "Shows filename, category, size, local path, and inode. " + "Check this before calling extract_file to avoid re-extraction." + ), + input_schema={ + "type": "object", + "properties": { + "category": { + "type": "string", + "enum": [ + "registry_hive", "chat_log", "prefetch", "network_capture", + "config_file", "address_book", "recycle_bin", "executable", + "text_log", "other", + ], + "description": "Filter by category. Omit to list all.", + }, + }, + }, + executor=self._list_assets, + ) + + self.register_tool( + name="find_extracted_file", + description=( + "Find an already-extracted file by inode or filename. " + "Returns the local path so you can use it directly with " + "parse_registry_key, read_text_file, etc. without re-extracting." + ), + input_schema={ + "type": "object", + "properties": { + "inode": {"type": "string", "description": "Inode to look up."}, + "filename": {"type": "string", "description": "Filename or partial name to search."}, + }, + }, + executor=self._find_extracted_file, + ) + + def _register_graph_write_tools(self) -> None: + """Register graph mutation tools (add_phenomenon, add_lead, link_to_entity).""" self.register_tool( name="add_phenomenon", @@ -282,49 +340,6 @@ class BaseAgent: executor=self._link_to_entity, ) - # --- Asset library tools --- - - self.register_tool( - name="list_assets", - description=( - "List all files extracted from the disk image. " - "Shows filename, category, size, local path, and inode. " - "Check this before calling extract_file to avoid re-extraction." - ), - input_schema={ - "type": "object", - "properties": { - "category": { - "type": "string", - "enum": [ - "registry_hive", "chat_log", "prefetch", "network_capture", - "config_file", "address_book", "recycle_bin", "executable", - "text_log", "other", - ], - "description": "Filter by category. Omit to list all.", - }, - }, - }, - executor=self._list_assets, - ) - - self.register_tool( - name="find_extracted_file", - description=( - "Find an already-extracted file by inode or filename. " - "Returns the local path so you can use it directly with " - "parse_registry_key, read_text_file, etc. without re-extracting." - ), - input_schema={ - "type": "object", - "properties": { - "inode": {"type": "string", "description": "Inode to look up."}, - "filename": {"type": "string", "description": "Filename or partial name to search."}, - }, - }, - executor=self._find_extracted_file, - ) - # ---- Tool executors ----------------------------------------------------- async def _list_phenomena(self, category: str | None = None) -> str: diff --git a/orchestrator.py b/orchestrator.py index 839d873..30a33ee 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -17,6 +17,36 @@ from llm_client import LLMClient logger = logging.getLogger(__name__) +def _safe_json_loads(text: str): + """Parse JSON with progressive sanitization for LLM-produced output. + + Tries: (0) as-is, (1) escape stray backslashes outside valid JSON + escapes (\\" \\\\ \\/ \\b \\f \\n \\r \\t \\uXXXX). On final failure, + logs raw input (first 600 chars) so we can diagnose what the model + actually emitted. + """ + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + # Escape backslashes not followed by a valid JSON escape character. + # NOTE: \\u must be followed by exactly 4 hex digits to be valid. + stage1 = re.sub( + r'\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})', + r'\\\\', + text, + ) + try: + return json.loads(stage1) + except json.JSONDecodeError as e: + logger.warning( + "_safe_json_loads failed after sanitize (%s); raw head[:600]=%r", + e, text[:600], + ) + raise + + def _log(msg: str, **extra) -> None: """Emit a structured log message with extra fields for the terminal formatter.""" logger.info(msg, extra=extra) @@ -216,6 +246,52 @@ class Orchestrator: "The ultimate goal is to reconstruct a detailed timeline of what happened on this host." ) + # ---- LLM JSON helper ----------------------------------------------------- + + async def _call_llm_for_json( + self, + system: str, + user_prompt: str, + schema: str = "array", + max_retries: int = 2, + ): + """Call LLM expecting JSON output; self-correct on parse failure. + + Two layers of safety: + 1. _safe_json_loads escapes stray backslashes outside valid JSON escapes. + 2. On any remaining parse error, append the error to the prompt and ask + the LLM to retry, up to max_retries additional attempts. + """ + error_hint = "" + last_err: Exception | None = None + pattern = r'\[.*?\]' if schema == "array" else r'\{.*?\}' + + for attempt in range(max_retries + 1): + messages = [{"role": "user", "content": user_prompt + error_hint}] + response = await self.llm.chat(messages=messages, system=system) + match = re.search(pattern, response, re.DOTALL) + candidate = match.group() if match else response + try: + return _safe_json_loads(candidate) + except (json.JSONDecodeError, ValueError) as e: + last_err = e + if attempt < max_retries: + logger.info( + "JSON parse attempt %d/%d failed (%s); retrying with hint", + attempt + 1, max_retries + 1, e, + ) + error_hint = ( + f"\n\n[Your previous response could not be parsed as JSON: {e}]\n" + f"Output STRICT JSON only — no markdown fences, no code blocks. " + f"Do NOT include literal backslash characters in any string value; " + f"rephrase using forward slashes or describe paths in English " + f"(e.g. 'the Cain folder under Program Files' instead of " + f"'C:\\Program Files\\Cain'). " + f"The response must be a single JSON {schema}." + ) + assert last_err is not None + raise last_err + # ---- Hypothesis-directed investigation ----------------------------------- async def _generate_hypothesis_leads(self) -> None: @@ -252,15 +328,11 @@ class Orchestrator: ) try: - response = await self.llm.chat( - messages=[{"role": "user", "content": prompt}], + tasks = await self._call_llm_for_json( system=self._LEAD_GEN_SYSTEM, + user_prompt=prompt, + schema="array", ) - match = re.search(r'\[.*?\]', response, re.DOTALL) - if match: - tasks = json.loads(match.group()) - else: - tasks = json.loads(response) for task in tasks: hyp_id = task.get("hypothesis_id", "") @@ -300,12 +372,11 @@ class Orchestrator: f'[{{"agent": "agent_type", "task": "what to investigate", "priority": 1-10}}]' ) try: - response = await self.llm.chat( - messages=[{"role": "user", "content": prompt}], + tasks = await self._call_llm_for_json( system=self._LEAD_GEN_SYSTEM, + user_prompt=prompt, + schema="array", ) - match = re.search(r'\[.*?\]', response, re.DOTALL) - tasks = json.loads(match.group()) if match else json.loads(response) for task in tasks: await self.graph.add_lead( target_agent=task.get("agent", "filesystem"), @@ -351,15 +422,11 @@ class Orchestrator: ) try: - response = await self.llm.chat( - messages=[{"role": "user", "content": prompt}], + judgments = await self._call_llm_for_json( system=self._JUDGE_SYSTEM, + user_prompt=prompt, + schema="array", ) - match = re.search(r'\[.*?\]', response, re.DOTALL) - if match: - judgments = json.loads(match.group()) - else: - judgments = json.loads(response) for j in judgments: hyp_id = j.get("hypothesis_id", "") @@ -402,12 +469,11 @@ class Orchestrator: f'[{{"phenomenon_id": "ph-xxx", "edge_type": "supports|contradicts|...", "reason": "brief explanation"}}]' ) try: - response = await self.llm.chat( - messages=[{"role": "user", "content": prompt}], + judgments = await self._call_llm_for_json( system=self._JUDGE_SYSTEM, + user_prompt=prompt, + schema="array", ) - match = re.search(r'\[.*?\]', response, re.DOTALL) - judgments = json.loads(match.group()) if match else json.loads(response) for j in judgments: ph_id = j.get("phenomenon_id", "") edge_type = j.get("edge_type", "")