Files
MASForensic/agents/timeline.py
BattleTag 444d58726a refactor: native tool calling + generic forced-retry + terminal exit
- llm_client: switch tool_call_loop from text-based <tool_call> regex
  to OpenAI-native tools=[...] / structured tool_calls field; accumulate
  delta.reasoning_content for DeepSeek thinking-mode echo-back; fold
  preserves system msg and aligns boundary to never orphan role:tool
- base_agent: generic forced-retry via mandatory_record_tools class attr
  (filesystem -> add_phenomenon, timeline -> add_temporal_edge,
  hypothesis -> add_hypothesis, report -> save_report); count via
  executor wrapper
- terminal_tools class attr + loop short-circuit: when a terminal tool
  is called, loop exits with its raw return as final_text. ReportAgent
  declares save_report as terminal - replaces the <answer>-tag stop
  signal that native tool calling broke
- _execute_*: return (raw, formatted) - terminal exit uses untruncated
  raw, conversation history uses 3000-char-capped formatted
- evidence_graph + orchestrator: LLM-derived InvestigationArea support
  (hypothesis-driven coverage check, replaces hardcoded _AREA_KEYWORDS /
  _AREA_TOOLS); manual yaml block kept as optional seed
- strip <answer> references from agent prompts (no longer load-bearing)

Verified on CFReDS image across 4 smoke runs: 0 JSON parse failures
(was 3); 22 temporal edges from Phase 4 (was 0); ReportAgent exits via
save_report (was max_iterations regression). 78/78 unit tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 13:51:19 +08:00

142 lines
6.1 KiB
Python

"""Timeline Agent — connects existing phenomena with temporal edges.
Operates on phenomena already in the graph. Does NOT investigate the disk
image itself. The agent's only useful output is the temporal edges it
creates between phenomena.
"""
from __future__ import annotations
import logging
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
logger = logging.getLogger(__name__)
class TimelineAgent(BaseAgent):
name = "timeline"
role = (
"Timeline forensic analyst. You build chronological timelines from filesystem "
"MAC timestamps and correlate events across all phenomena categories in the "
"evidence graph to reconstruct the sequence of activities on the system."
)
mandatory_record_tools = ("add_temporal_edge",)
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
super().__init__(llm, graph)
self._register_tools()
def _register_graph_tools(self) -> None:
"""Restrict to read-only graph tools — Timeline does not add phenomena."""
self._register_graph_read_tools()
def _register_tools(self) -> None:
td = TOOL_CATALOG.get("build_filesystem_timeline")
if td:
self.register_tool(td.name, td.description, td.input_schema, td.executor)
self.register_tool(
name="get_timestamped_phenomena",
description=(
"Get all phenomena that have timestamps, sorted chronologically. "
"Returns each phenomenon's id, category, title, and a short description "
"preview. Use this as your primary input for temporal correlation."
),
input_schema={"type": "object", "properties": {}},
executor=self._get_timestamped_phenomena,
)
self.register_tool(
name="add_temporal_edge",
description=(
"Add a temporal relationship edge between two existing phenomena. "
"Use 'before' when source phenomenon happened before target, "
"'concurrent' when they occurred within seconds of each other."
),
input_schema={
"type": "object",
"properties": {
"source_id": {"type": "string", "description": "ID of the earlier/source phenomenon."},
"target_id": {"type": "string", "description": "ID of the later/target phenomenon."},
"relation": {
"type": "string",
"enum": ["before", "after", "concurrent"],
"description": "Temporal relationship.",
},
},
"required": ["source_id", "target_id", "relation"],
},
executor=self._add_temporal_edge,
)
def _build_system_prompt(self, task: str) -> str:
"""Focused prompt — Timeline connects existing phenomena, doesn't investigate."""
return (
f"You are {self.name}, a forensic timeline correlation analyst.\n"
f"Role: {self.role}\n\n"
f"Image: {self.graph.image_path}\n"
f"Current state: {self.graph.stats_summary()}\n\n"
f"Your task: {task}\n\n"
f"WORKFLOW:\n"
f"1. Call build_filesystem_timeline once to materialize MAC times for the disk.\n"
f"2. Call get_timestamped_phenomena to see all phenomena with timestamps, "
f"sorted chronologically. THIS IS YOUR PRIMARY INPUT.\n"
f"3. For each meaningful temporal relationship between phenomena, call "
f"add_temporal_edge(source_id, target_id, relation). Use 'before' when "
f"source happened first (the common case); 'concurrent' for events within "
f"a few seconds of each other.\n"
f" Examples of meaningful connections:\n"
f" - 'Cain installer executed' (before) 'Cain.exe first execution'\n"
f" - 'WHOIS first lookup' (before) 'WHOIS second lookup'\n"
f" - 'Recon tool cluster' (before) 'Anti-forensics defrag'\n"
f" - 'Tool installation' (before) 'Tool execution'\n"
f"4. Aim for 15-40 temporal edges that connect the major events into a "
f"forensic story.\n"
f"5. STOP after recording all meaningful temporal edges. Do not call any more tools.\n\n"
f"STRICT BOUNDARIES:\n"
f"- Your job is to CONNECT existing phenomena, NOT to discover new ones. "
f"You CANNOT call add_phenomenon — the tool isn't yours.\n"
f"- Use ONLY phenomenon IDs returned by get_timestamped_phenomena or "
f"list_phenomena. NEVER fabricate IDs.\n"
f"- Connect events that tell a forensic story (recon -> exploit -> cover-up). "
f"Do not exhaustively pair every two phenomena; focus on causally-relevant "
f"sequences.\n"
f"- The orchestrator handles report writing in the next phase. Your only "
f"output that propagates is the temporal edges you create."
)
async def _get_timestamped_phenomena(self) -> str:
items = [
ph for ph in self.graph.phenomena.values()
if ph.timestamp
]
items.sort(key=lambda ph: ph.timestamp or "")
if not items:
return "No phenomena with timestamps found."
lines = []
for ph in items:
lines.append(f"{ph.timestamp} | [{ph.category}] {ph.title} ({ph.id})")
lines.append(f" {ph.description[:150]}")
return "\n".join(lines)
async def _add_temporal_edge(
self, source_id: str, target_id: str, relation: str,
) -> str:
try:
await self.graph.add_edge(
source_id=source_id,
target_id=target_id,
edge_type="temporal",
metadata={"relation": relation},
created_by=self.name,
)
return f"Temporal edge added: {source_id} —[{relation}]→ {target_id}"
except ValueError as e:
return f"Error: {e}"