diff --git a/evidence_graph.py b/evidence_graph.py index a25d420..e71b543 100644 --- a/evidence_graph.py +++ b/evidence_graph.py @@ -417,7 +417,14 @@ class Edge: @dataclass class Lead: - """An investigative lead that should be followed up by an agent.""" + """An investigative lead that should be followed up by an agent. + + Phase 1 agents create leads as "things outside my scope but worth chasing". + The strategist (DESIGN_STRATEGIST.md) also creates leads, and additionally + annotates each with the hypothesis it's meant to corroborate or refute plus + the kind of edge it expects to produce — so the orchestrator can later + measure "did this lead actually change any belief". + """ id: str target_agent: str @@ -426,13 +433,77 @@ class Lead: context: dict = field(default_factory=dict) status: str = "pending" # pending, assigned, completed, failed hypothesis_id: str | None = None + # Strategist-loop annotations. proposed_by names the agent that created + # the lead ("filesystem", "strategist", ...). motivating_hypothesis and + # expected_evidence_type let the orchestrator measure marginal yield. + # round_number is 0 for Phase 1 leads, ≥1 for strategist-produced leads. + proposed_by: str = "" + motivating_hypothesis: str = "" + expected_evidence_type: str = "" + round_number: int = 0 def to_dict(self) -> dict: return asdict(self) @classmethod def from_dict(cls, d: dict) -> Lead: - return cls(**d) + # Forward-compatible: old state files predate the strategist annotations. + known = set(cls.__dataclass_fields__) + return cls(**{k: v for k, v in d.items() if k in known}) + + +@dataclass +class InvestigationRound: + """One round of strategist-driven investigation. + + DESIGN_STRATEGIST.md §1.2: provenance for the strategist's decisions. Each + round records what hypothesis statuses looked like before vs. after, what + leads were proposed, which actually got executed, and how many new + phenomena/edges resulted. ``marginal_yield`` over recent rounds is what + the strategist consults to decide whether to keep digging or declare + complete. + """ + + id: str # "round-{nnn}" + round_number: int + started_at: str + completed_at: str = "" + strategist_action: str = "" # "propose_leads" | "declare_complete" + leads_proposed: list[str] = field(default_factory=list) + leads_executed: list[str] = field(default_factory=list) + hypothesis_status_snapshot_before: dict = field(default_factory=dict) + hypothesis_status_snapshot_after: dict = field(default_factory=dict) + phenomena_count_before: int = 0 + phenomena_count_after: int = 0 + edges_count_before: int = 0 + edges_count_after: int = 0 + decision_rationale: str = "" + + @property + def new_phenomena_count(self) -> int: + return max(0, self.phenomena_count_after - self.phenomena_count_before) + + @property + def new_edges_count(self) -> int: + return max(0, self.edges_count_after - self.edges_count_before) + + @property + def status_flips(self) -> int: + before = self.hypothesis_status_snapshot_before + after = self.hypothesis_status_snapshot_after + flips = 0 + for hid, after_status in after.items(): + if before.get(hid) and before.get(hid) != after_status: + flips += 1 + return flips + + def to_dict(self) -> dict: + return asdict(self) + + @classmethod + def from_dict(cls, d: dict) -> InvestigationRound: + known = set(cls.__dataclass_fields__) + return cls(**{k: v for k, v in d.items() if k in known}) @dataclass @@ -598,6 +669,17 @@ class EvidenceGraph: # claimed fact values against real tool outputs. self.tool_invocations: dict[str, ToolInvocation] = {} + # Investigation rounds — provenance for the strategist's per-round + # decisions (DESIGN_STRATEGIST.md). Empty for runs that don't reach + # Phase 3 or that disable the strategist via config. + self.investigation_rounds: list[InvestigationRound] = [] + + # Budget config + run-start monotonic clock. Set by the orchestrator + # when it boots; the budget_status strategy tool reads these. None + # means unbounded / not yet started. + self.budgets: dict[str, int] = {} + self.run_start_monotonic: float | None = None + # _current_agent / _current_task_id are exposed as @property below, # backed by module-level ContextVars (race-free under asyncio.gather). @@ -658,6 +740,9 @@ class EvidenceGraph: "tool_invocations": { iid: inv.to_dict() for iid, inv in self.tool_invocations.items() }, + "investigation_rounds": [ + r.to_dict() for r in self.investigation_rounds + ], "saved_at": datetime.now().isoformat(), } tmp = self._persist_path.with_suffix(".tmp") @@ -730,6 +815,10 @@ class EvidenceGraph: iid: ToolInvocation.from_dict(inv) for iid, inv in data.get("tool_invocations", {}).items() } + graph.investigation_rounds = [ + InvestigationRound.from_dict(r) + for r in data.get("investigation_rounds", []) + ] graph._rebuild_adjacency() logger.info( "EvidenceGraph restored: %d phenomena, %d hypotheses, %d entities, " @@ -1497,9 +1586,29 @@ class EvidenceGraph: priority: int = 5, context: dict | None = None, hypothesis_id: str | None = None, + proposed_by: str = "", + motivating_hypothesis: str = "", + expected_evidence_type: str = "", + round_number: int = 0, ) -> str: async with self._lock: lid = f"lead-{uuid.uuid4().hex[:8]}" + # Idempotency for strategist proposals: identical + # (motivating_hypothesis, expected_evidence_type, target_agent, + # source_id) triple should not be created twice — this guards + # against the "strategist loops on the same lead" failure mode. + if motivating_hypothesis and proposed_by == "strategist": + source_id = (context or {}).get("source_id", "") + for existing in self.leads: + if ( + existing.proposed_by == "strategist" + and existing.motivating_hypothesis == motivating_hypothesis + and existing.expected_evidence_type == expected_evidence_type + and existing.target_agent == target_agent + and (existing.context or {}).get("source_id", "") == source_id + and existing.status in ("pending", "assigned") + ): + return existing.id self.leads.append(Lead( id=lid, target_agent=target_agent, @@ -1507,10 +1616,89 @@ class EvidenceGraph: priority=priority, context=context or {}, hypothesis_id=hypothesis_id, + proposed_by=proposed_by, + motivating_hypothesis=motivating_hypothesis, + expected_evidence_type=expected_evidence_type, + round_number=round_number, )) self._auto_save() return lid + # ---- Investigation rounds (strategist loop) ---------------------------- + + async def start_investigation_round( + self, round_number: int, + ) -> str: + """Open a new investigation round + capture pre-round snapshot. + + Called by the orchestrator at the top of each strategist iteration. + The snapshot records hypothesis status, phenomena count, and edges + count so that ``complete_investigation_round`` can compute the + round's yield deltas. + """ + async with self._lock: + rid = f"round-{round_number:03d}" + snapshot_before = { + hid: h.status for hid, h in self.hypotheses.items() + } + self.investigation_rounds.append(InvestigationRound( + id=rid, + round_number=round_number, + started_at=datetime.now().isoformat(), + hypothesis_status_snapshot_before=snapshot_before, + phenomena_count_before=len(self.phenomena), + edges_count_before=len(self.edges), + )) + self._auto_save() + return rid + + async def complete_investigation_round( + self, + round_id: str, + strategist_action: str = "propose_leads", + leads_executed: list[str] | None = None, + decision_rationale: str = "", + ) -> InvestigationRound | None: + """Close a round, recording after-snapshot + which leads got executed. + + Idempotent on already-closed rounds (returns the existing record). + """ + async with self._lock: + for r in self.investigation_rounds: + if r.id != round_id: + continue + if r.completed_at: + return r + r.completed_at = datetime.now().isoformat() + r.strategist_action = strategist_action + r.leads_executed = list(leads_executed or []) + r.leads_proposed = [ + l.id for l in self.leads + if l.round_number == r.round_number + and l.proposed_by == "strategist" + ] + r.hypothesis_status_snapshot_after = { + hid: h.status for hid, h in self.hypotheses.items() + } + r.phenomena_count_after = len(self.phenomena) + r.edges_count_after = len(self.edges) + r.decision_rationale = decision_rationale + self._auto_save() + return r + return None + + def get_investigation_round(self, round_id: str) -> InvestigationRound | None: + for r in self.investigation_rounds: + if r.id == round_id: + return r + return None + + def latest_round(self) -> InvestigationRound | None: + return self.investigation_rounds[-1] if self.investigation_rounds else None + + def leads_from_round(self, round_number: int) -> list[Lead]: + return [l for l in self.leads if l.round_number == round_number] + async def get_pending_leads(self, agent_type: str | None = None) -> list[Lead]: async with self._lock: leads = [l for l in self.leads if l.status == "pending"] diff --git a/tests/test_optimizations.py b/tests/test_optimizations.py index 5a58a16..096d70f 100644 --- a/tests/test_optimizations.py +++ b/tests/test_optimizations.py @@ -9,6 +9,7 @@ import pytest from evidence_graph import ( EvidenceGraph, Phenomenon, Hypothesis, Lead, GroundingError, + InvestigationRound, _compute_quality_score, _jaccard_similarity, prob_to_log_odds, log_odds_to_prob, ) @@ -3036,3 +3037,180 @@ class TestOrchestratorMultiSource: assert Orchestrator._is_analysable(ok_media) assert not Orchestrator._is_analysable(no_path) + +# --------------------------------------------------------------------------- +# Strategist loop foundation (DESIGN_STRATEGIST.md §1) +# --------------------------------------------------------------------------- + +class TestLeadExtensionsForStrategist: + """Lead now carries strategist-loop annotations: proposed_by, + motivating_hypothesis, expected_evidence_type, round_number. Old state + files predate these fields — from_dict must accept them missing. + """ + + @pytest.mark.asyncio + async def test_add_lead_records_strategist_annotations(self): + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + lid = await graph.add_lead( + target_agent="filesystem", + description="Check Safari bookmarks for device-switching evidence", + proposed_by="strategist", + motivating_hypothesis=hid, + expected_evidence_type="supports", + round_number=2, + context={"source_id": "src-ios-chan"}, + ) + lead = next(l for l in graph.leads if l.id == lid) + assert lead.proposed_by == "strategist" + assert lead.motivating_hypothesis == hid + assert lead.expected_evidence_type == "supports" + assert lead.round_number == 2 + + @pytest.mark.asyncio + async def test_strategist_lead_idempotency(self): + """Same (motivating_hyp, expected_type, target_agent, source_id) from + the strategist should NOT create a duplicate while pending. + """ + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + first = await graph.add_lead( + target_agent="filesystem", description="probe X", + proposed_by="strategist", motivating_hypothesis=hid, + expected_evidence_type="supports", round_number=1, + context={"source_id": "src-A"}, + ) + again = await graph.add_lead( + target_agent="filesystem", description="probe X (rephrased)", + proposed_by="strategist", motivating_hypothesis=hid, + expected_evidence_type="supports", round_number=2, + context={"source_id": "src-A"}, + ) + assert first == again + assert len(graph.leads) == 1 + + @pytest.mark.asyncio + async def test_non_strategist_leads_not_deduped(self): + """Phase 1 worker leads (proposed_by != 'strategist') should NOT be + deduped — agents can legitimately propose the same kind of lead + multiple times from different contexts. + """ + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + a = await graph.add_lead( + target_agent="filesystem", description="x", proposed_by="filesystem", + motivating_hypothesis=hid, expected_evidence_type="supports", + ) + b = await graph.add_lead( + target_agent="filesystem", description="x", proposed_by="filesystem", + motivating_hypothesis=hid, expected_evidence_type="supports", + ) + assert a != b + assert len(graph.leads) == 2 + + @pytest.mark.asyncio + async def test_lead_from_old_state_file_loads_with_defaults(self, tmp_path): + """Forward-compat: a state file written before the strategist fields + existed must still load. The new fields take their defaults. + """ + legacy = { + "id": "lead-legacy01", + "target_agent": "filesystem", + "description": "old-style lead", + "priority": 5, + "context": {}, + "status": "pending", + "hypothesis_id": None, + } + lead = Lead.from_dict(legacy) + assert lead.proposed_by == "" + assert lead.motivating_hypothesis == "" + assert lead.round_number == 0 + + +class TestInvestigationRound: + """The InvestigationRound provenance node + start/complete lifecycle.""" + + @pytest.mark.asyncio + async def test_round_lifecycle_captures_before_and_after(self): + graph = EvidenceGraph() + h1 = await graph.add_hypothesis("h1", "d") + h2 = await graph.add_hypothesis("h2", "d") + rid = await graph.start_investigation_round(1) + r = graph.get_investigation_round(rid) + assert r is not None + assert r.round_number == 1 + assert r.hypothesis_status_snapshot_before == {h1: "active", h2: "active"} + assert r.phenomena_count_before == 0 + assert r.completed_at == "" + + pid, _ = await graph.add_phenomenon( + "fs", "filesystem", "found something", "interp", source_tool="t", + ) + await graph.update_hypothesis_confidence(h1, pid, "direct_evidence", "") + + closed = await graph.complete_investigation_round( + rid, strategist_action="propose_leads", + decision_rationale="found new evidence for h1", + ) + assert closed is not None + assert closed.completed_at != "" + assert closed.hypothesis_status_snapshot_after[h1] == "supported" + assert closed.hypothesis_status_snapshot_after[h2] == "active" + assert closed.new_phenomena_count == 1 + assert closed.new_edges_count == 1 + assert closed.status_flips == 1 + + @pytest.mark.asyncio + async def test_complete_round_is_idempotent(self): + graph = EvidenceGraph() + rid = await graph.start_investigation_round(1) + first = await graph.complete_investigation_round(rid) + await graph.add_phenomenon("fs", "x", "y", "z", source_tool="t") + second = await graph.complete_investigation_round(rid) + assert first is second + assert first.phenomena_count_after == 0 + + @pytest.mark.asyncio + async def test_leads_from_round_filters_correctly(self): + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + await graph.add_lead( + target_agent="filesystem", description="r1 lead", + proposed_by="strategist", motivating_hypothesis=hid, + expected_evidence_type="supports", round_number=1, + ) + await graph.add_lead( + target_agent="filesystem", description="r2 lead", + proposed_by="strategist", motivating_hypothesis=hid, + expected_evidence_type="supports", round_number=2, + context={"source_id": "src-different"}, + ) + await graph.add_lead( + target_agent="ios_artifact", description="phase 1 finding", + proposed_by="filesystem", round_number=0, + ) + r1 = graph.leads_from_round(1) + r2 = graph.leads_from_round(2) + r0 = graph.leads_from_round(0) + assert len(r1) == 1 and r1[0].description == "r1 lead" + assert len(r2) == 1 and r2[0].description == "r2 lead" + assert len(r0) == 1 and r0[0].proposed_by == "filesystem" + + @pytest.mark.asyncio + async def test_round_persistence_round_trip(self, tmp_path): + """Investigation rounds must survive save/load.""" + path = tmp_path / "state.json" + graph = EvidenceGraph(persist_path=path) + hid = await graph.add_hypothesis("h", "d") + rid = await graph.start_investigation_round(1) + await graph.complete_investigation_round( + rid, decision_rationale="probe complete", + ) + loaded = EvidenceGraph.load_state(path) + assert len(loaded.investigation_rounds) == 1 + r = loaded.investigation_rounds[0] + assert r.id == rid + assert r.decision_rationale == "probe complete" + assert hid in r.hypothesis_status_snapshot_before +