"""Timeline Agent — connects existing phenomena with temporal edges. Operates on phenomena already in the graph. Does NOT investigate the disk image itself. The agent's only useful output is the temporal edges it creates between phenomena. """ from __future__ import annotations import logging from base_agent import BaseAgent from evidence_graph import EvidenceGraph from llm_client import LLMClient from tool_registry import TOOL_CATALOG logger = logging.getLogger(__name__) class TimelineAgent(BaseAgent): name = "timeline" role = ( "Timeline forensic analyst. You build chronological timelines from filesystem " "MAC timestamps and correlate events across all phenomena categories in the " "evidence graph to reconstruct the sequence of activities on the system." ) mandatory_record_tools = ("add_temporal_edge",) def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None: super().__init__(llm, graph) self._register_tools() def _register_graph_tools(self) -> None: """Restrict to read-only graph tools — Timeline does not add phenomena.""" self._register_graph_read_tools() def _register_tools(self) -> None: td = TOOL_CATALOG.get("build_filesystem_timeline") if td: self.register_tool(td.name, td.description, td.input_schema, td.executor) self.register_tool( name="get_timestamped_phenomena", description=( "Get all phenomena that have timestamps, sorted chronologically. " "Returns each phenomenon's id, category, title, and a short description " "preview. Use this as your primary input for temporal correlation." ), input_schema={"type": "object", "properties": {}}, executor=self._get_timestamped_phenomena, ) self.register_tool( name="add_temporal_edge", description=( "Add a temporal relationship edge between two existing phenomena. " "Use 'before' when source phenomenon happened before target, " "'concurrent' when they occurred within seconds of each other." ), input_schema={ "type": "object", "properties": { "source_id": {"type": "string", "description": "ID of the earlier/source phenomenon."}, "target_id": {"type": "string", "description": "ID of the later/target phenomenon."}, "relation": { "type": "string", "enum": ["before", "after", "concurrent"], "description": "Temporal relationship.", }, }, "required": ["source_id", "target_id", "relation"], }, executor=self._add_temporal_edge, ) def _build_system_prompt(self, task: str) -> str: """Focused prompt — Timeline connects existing phenomena, doesn't investigate.""" return ( f"You are {self.name}, a forensic timeline correlation analyst.\n" f"Role: {self.role}\n\n" f"Image: {self.graph.image_path}\n" f"Current state: {self.graph.stats_summary()}\n\n" f"Your task: {task}\n\n" f"WORKFLOW:\n" f"1. Call build_filesystem_timeline once to materialize MAC times for the disk.\n" f"2. Call get_timestamped_phenomena to see all phenomena with timestamps, " f"sorted chronologically. THIS IS YOUR PRIMARY INPUT.\n" f"3. For each meaningful temporal relationship between phenomena, call " f"add_temporal_edge(source_id, target_id, relation). Use 'before' when " f"source happened first (the common case); 'concurrent' for events within " f"a few seconds of each other.\n" f" Examples of meaningful connections:\n" f" - 'Cain installer executed' (before) 'Cain.exe first execution'\n" f" - 'WHOIS first lookup' (before) 'WHOIS second lookup'\n" f" - 'Recon tool cluster' (before) 'Anti-forensics defrag'\n" f" - 'Tool installation' (before) 'Tool execution'\n" f"4. Aim for 15-40 temporal edges that connect the major events into a " f"forensic story.\n" f"5. STOP after recording all meaningful temporal edges. Do not call any more tools.\n\n" f"STRICT BOUNDARIES:\n" f"- Your job is to CONNECT existing phenomena, NOT to discover new ones. " f"You CANNOT call add_phenomenon — the tool isn't yours.\n" f"- Use ONLY phenomenon IDs returned by get_timestamped_phenomena or " f"list_phenomena. NEVER fabricate IDs.\n" f"- Connect events that tell a forensic story (recon -> exploit -> cover-up). " f"Do not exhaustively pair every two phenomena; focus on causally-relevant " f"sequences.\n" f"- The orchestrator handles report writing in the next phase. Your only " f"output that propagates is the temporal edges you create." ) async def _get_timestamped_phenomena(self) -> str: items = [ ph for ph in self.graph.phenomena.values() if ph.timestamp ] items.sort(key=lambda ph: ph.timestamp or "") if not items: return "No phenomena with timestamps found." lines = [] for ph in items: lines.append(f"{ph.timestamp} | [{ph.category}] {ph.title} ({ph.id})") lines.append(f" {ph.description[:150]}") return "\n".join(lines) async def _add_temporal_edge( self, source_id: str, target_id: str, relation: str, ) -> str: try: await self.graph.add_edge( source_id=source_id, target_id=target_id, edge_type="temporal", metadata={"relation": relation}, created_by=self.name, ) return f"Temporal edge added: {source_id} —[{relation}]→ {target_id}" except ValueError as e: return f"Error: {e}"