feat(strategist) S1: Lead extension + InvestigationRound model

DESIGN_STRATEGIST.md §1. Foundation for the Phase 3 strategist loop.

Lead now carries four annotations that let the orchestrator measure
marginal yield per lead and dedupe strategist proposals:
  - proposed_by         (agent that proposed it: "strategist", "filesystem", …)
  - motivating_hypothesis (hyp-id the lead is meant to corroborate/refute)
  - expected_evidence_type (edge type the lead's worker should produce)
  - round_number        (0 = Phase 1 lead, ≥1 = strategist-proposed)

add_lead idempotently dedupes strategist proposals on
(motivating_hypothesis, expected_evidence_type, target_agent, source_id)
to prevent the "strategist loops on the same lead" failure mode.

New InvestigationRound dataclass records per-round provenance: before/
after hypothesis status snapshots, phenomena + edge count deltas, and
the strategist's decision_rationale. ``new_phenomena_count``,
``new_edges_count``, ``status_flips`` are derived properties that the
marginal_yield tool will use.

start_investigation_round / complete_investigation_round /
get_investigation_round / latest_round / leads_from_round complete the
lifecycle. complete is idempotent on already-closed rounds.

Lead.from_dict is forward-compat for state files written before this
commit. InvestigationRound persists as a top-level list in
graph_state.json (auto-save + load_state both wired).

EvidenceGraph also gains graph.budgets and graph.run_start_monotonic
fields that the budget_status view (S2) will read; orchestrator
populates them in S5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-21 02:18:35 -10:00
parent 8020c24776
commit ca96f29849
2 changed files with 368 additions and 2 deletions

View File

@@ -417,7 +417,14 @@ class Edge:
@dataclass
class Lead:
"""An investigative lead that should be followed up by an agent."""
"""An investigative lead that should be followed up by an agent.
Phase 1 agents create leads as "things outside my scope but worth chasing".
The strategist (DESIGN_STRATEGIST.md) also creates leads, and additionally
annotates each with the hypothesis it's meant to corroborate or refute plus
the kind of edge it expects to produce — so the orchestrator can later
measure "did this lead actually change any belief".
"""
id: str
target_agent: str
@@ -426,13 +433,77 @@ class Lead:
context: dict = field(default_factory=dict)
status: str = "pending" # pending, assigned, completed, failed
hypothesis_id: str | None = None
# Strategist-loop annotations. proposed_by names the agent that created
# the lead ("filesystem", "strategist", ...). motivating_hypothesis and
# expected_evidence_type let the orchestrator measure marginal yield.
# round_number is 0 for Phase 1 leads, ≥1 for strategist-produced leads.
proposed_by: str = ""
motivating_hypothesis: str = ""
expected_evidence_type: str = ""
round_number: int = 0
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Lead:
return cls(**d)
# Forward-compatible: old state files predate the strategist annotations.
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
@dataclass
class InvestigationRound:
"""One round of strategist-driven investigation.
DESIGN_STRATEGIST.md §1.2: provenance for the strategist's decisions. Each
round records what hypothesis statuses looked like before vs. after, what
leads were proposed, which actually got executed, and how many new
phenomena/edges resulted. ``marginal_yield`` over recent rounds is what
the strategist consults to decide whether to keep digging or declare
complete.
"""
id: str # "round-{nnn}"
round_number: int
started_at: str
completed_at: str = ""
strategist_action: str = "" # "propose_leads" | "declare_complete"
leads_proposed: list[str] = field(default_factory=list)
leads_executed: list[str] = field(default_factory=list)
hypothesis_status_snapshot_before: dict = field(default_factory=dict)
hypothesis_status_snapshot_after: dict = field(default_factory=dict)
phenomena_count_before: int = 0
phenomena_count_after: int = 0
edges_count_before: int = 0
edges_count_after: int = 0
decision_rationale: str = ""
@property
def new_phenomena_count(self) -> int:
return max(0, self.phenomena_count_after - self.phenomena_count_before)
@property
def new_edges_count(self) -> int:
return max(0, self.edges_count_after - self.edges_count_before)
@property
def status_flips(self) -> int:
before = self.hypothesis_status_snapshot_before
after = self.hypothesis_status_snapshot_after
flips = 0
for hid, after_status in after.items():
if before.get(hid) and before.get(hid) != after_status:
flips += 1
return flips
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> InvestigationRound:
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
@dataclass
@@ -598,6 +669,17 @@ class EvidenceGraph:
# claimed fact values against real tool outputs.
self.tool_invocations: dict[str, ToolInvocation] = {}
# Investigation rounds — provenance for the strategist's per-round
# decisions (DESIGN_STRATEGIST.md). Empty for runs that don't reach
# Phase 3 or that disable the strategist via config.
self.investigation_rounds: list[InvestigationRound] = []
# Budget config + run-start monotonic clock. Set by the orchestrator
# when it boots; the budget_status strategy tool reads these. None
# means unbounded / not yet started.
self.budgets: dict[str, int] = {}
self.run_start_monotonic: float | None = None
# _current_agent / _current_task_id are exposed as @property below,
# backed by module-level ContextVars (race-free under asyncio.gather).
@@ -658,6 +740,9 @@ class EvidenceGraph:
"tool_invocations": {
iid: inv.to_dict() for iid, inv in self.tool_invocations.items()
},
"investigation_rounds": [
r.to_dict() for r in self.investigation_rounds
],
"saved_at": datetime.now().isoformat(),
}
tmp = self._persist_path.with_suffix(".tmp")
@@ -730,6 +815,10 @@ class EvidenceGraph:
iid: ToolInvocation.from_dict(inv)
for iid, inv in data.get("tool_invocations", {}).items()
}
graph.investigation_rounds = [
InvestigationRound.from_dict(r)
for r in data.get("investigation_rounds", [])
]
graph._rebuild_adjacency()
logger.info(
"EvidenceGraph restored: %d phenomena, %d hypotheses, %d entities, "
@@ -1497,9 +1586,29 @@ class EvidenceGraph:
priority: int = 5,
context: dict | None = None,
hypothesis_id: str | None = None,
proposed_by: str = "",
motivating_hypothesis: str = "",
expected_evidence_type: str = "",
round_number: int = 0,
) -> str:
async with self._lock:
lid = f"lead-{uuid.uuid4().hex[:8]}"
# Idempotency for strategist proposals: identical
# (motivating_hypothesis, expected_evidence_type, target_agent,
# source_id) triple should not be created twice — this guards
# against the "strategist loops on the same lead" failure mode.
if motivating_hypothesis and proposed_by == "strategist":
source_id = (context or {}).get("source_id", "")
for existing in self.leads:
if (
existing.proposed_by == "strategist"
and existing.motivating_hypothesis == motivating_hypothesis
and existing.expected_evidence_type == expected_evidence_type
and existing.target_agent == target_agent
and (existing.context or {}).get("source_id", "") == source_id
and existing.status in ("pending", "assigned")
):
return existing.id
self.leads.append(Lead(
id=lid,
target_agent=target_agent,
@@ -1507,10 +1616,89 @@ class EvidenceGraph:
priority=priority,
context=context or {},
hypothesis_id=hypothesis_id,
proposed_by=proposed_by,
motivating_hypothesis=motivating_hypothesis,
expected_evidence_type=expected_evidence_type,
round_number=round_number,
))
self._auto_save()
return lid
# ---- Investigation rounds (strategist loop) ----------------------------
async def start_investigation_round(
self, round_number: int,
) -> str:
"""Open a new investigation round + capture pre-round snapshot.
Called by the orchestrator at the top of each strategist iteration.
The snapshot records hypothesis status, phenomena count, and edges
count so that ``complete_investigation_round`` can compute the
round's yield deltas.
"""
async with self._lock:
rid = f"round-{round_number:03d}"
snapshot_before = {
hid: h.status for hid, h in self.hypotheses.items()
}
self.investigation_rounds.append(InvestigationRound(
id=rid,
round_number=round_number,
started_at=datetime.now().isoformat(),
hypothesis_status_snapshot_before=snapshot_before,
phenomena_count_before=len(self.phenomena),
edges_count_before=len(self.edges),
))
self._auto_save()
return rid
async def complete_investigation_round(
self,
round_id: str,
strategist_action: str = "propose_leads",
leads_executed: list[str] | None = None,
decision_rationale: str = "",
) -> InvestigationRound | None:
"""Close a round, recording after-snapshot + which leads got executed.
Idempotent on already-closed rounds (returns the existing record).
"""
async with self._lock:
for r in self.investigation_rounds:
if r.id != round_id:
continue
if r.completed_at:
return r
r.completed_at = datetime.now().isoformat()
r.strategist_action = strategist_action
r.leads_executed = list(leads_executed or [])
r.leads_proposed = [
l.id for l in self.leads
if l.round_number == r.round_number
and l.proposed_by == "strategist"
]
r.hypothesis_status_snapshot_after = {
hid: h.status for hid, h in self.hypotheses.items()
}
r.phenomena_count_after = len(self.phenomena)
r.edges_count_after = len(self.edges)
r.decision_rationale = decision_rationale
self._auto_save()
return r
return None
def get_investigation_round(self, round_id: str) -> InvestigationRound | None:
for r in self.investigation_rounds:
if r.id == round_id:
return r
return None
def latest_round(self) -> InvestigationRound | None:
return self.investigation_rounds[-1] if self.investigation_rounds else None
def leads_from_round(self, round_number: int) -> list[Lead]:
return [l for l in self.leads if l.round_number == round_number]
async def get_pending_leads(self, agent_type: str | None = None) -> list[Lead]:
async with self._lock:
leads = [l for l in self.leads if l.status == "pending"]

View File

@@ -9,6 +9,7 @@ import pytest
from evidence_graph import (
EvidenceGraph, Phenomenon, Hypothesis, Lead, GroundingError,
InvestigationRound,
_compute_quality_score, _jaccard_similarity,
prob_to_log_odds, log_odds_to_prob,
)
@@ -3036,3 +3037,180 @@ class TestOrchestratorMultiSource:
assert Orchestrator._is_analysable(ok_media)
assert not Orchestrator._is_analysable(no_path)
# ---------------------------------------------------------------------------
# Strategist loop foundation (DESIGN_STRATEGIST.md §1)
# ---------------------------------------------------------------------------
class TestLeadExtensionsForStrategist:
"""Lead now carries strategist-loop annotations: proposed_by,
motivating_hypothesis, expected_evidence_type, round_number. Old state
files predate these fields — from_dict must accept them missing.
"""
@pytest.mark.asyncio
async def test_add_lead_records_strategist_annotations(self):
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
lid = await graph.add_lead(
target_agent="filesystem",
description="Check Safari bookmarks for device-switching evidence",
proposed_by="strategist",
motivating_hypothesis=hid,
expected_evidence_type="supports",
round_number=2,
context={"source_id": "src-ios-chan"},
)
lead = next(l for l in graph.leads if l.id == lid)
assert lead.proposed_by == "strategist"
assert lead.motivating_hypothesis == hid
assert lead.expected_evidence_type == "supports"
assert lead.round_number == 2
@pytest.mark.asyncio
async def test_strategist_lead_idempotency(self):
"""Same (motivating_hyp, expected_type, target_agent, source_id) from
the strategist should NOT create a duplicate while pending.
"""
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
first = await graph.add_lead(
target_agent="filesystem", description="probe X",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=1,
context={"source_id": "src-A"},
)
again = await graph.add_lead(
target_agent="filesystem", description="probe X (rephrased)",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=2,
context={"source_id": "src-A"},
)
assert first == again
assert len(graph.leads) == 1
@pytest.mark.asyncio
async def test_non_strategist_leads_not_deduped(self):
"""Phase 1 worker leads (proposed_by != 'strategist') should NOT be
deduped — agents can legitimately propose the same kind of lead
multiple times from different contexts.
"""
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
a = await graph.add_lead(
target_agent="filesystem", description="x", proposed_by="filesystem",
motivating_hypothesis=hid, expected_evidence_type="supports",
)
b = await graph.add_lead(
target_agent="filesystem", description="x", proposed_by="filesystem",
motivating_hypothesis=hid, expected_evidence_type="supports",
)
assert a != b
assert len(graph.leads) == 2
@pytest.mark.asyncio
async def test_lead_from_old_state_file_loads_with_defaults(self, tmp_path):
"""Forward-compat: a state file written before the strategist fields
existed must still load. The new fields take their defaults.
"""
legacy = {
"id": "lead-legacy01",
"target_agent": "filesystem",
"description": "old-style lead",
"priority": 5,
"context": {},
"status": "pending",
"hypothesis_id": None,
}
lead = Lead.from_dict(legacy)
assert lead.proposed_by == ""
assert lead.motivating_hypothesis == ""
assert lead.round_number == 0
class TestInvestigationRound:
"""The InvestigationRound provenance node + start/complete lifecycle."""
@pytest.mark.asyncio
async def test_round_lifecycle_captures_before_and_after(self):
graph = EvidenceGraph()
h1 = await graph.add_hypothesis("h1", "d")
h2 = await graph.add_hypothesis("h2", "d")
rid = await graph.start_investigation_round(1)
r = graph.get_investigation_round(rid)
assert r is not None
assert r.round_number == 1
assert r.hypothesis_status_snapshot_before == {h1: "active", h2: "active"}
assert r.phenomena_count_before == 0
assert r.completed_at == ""
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", "found something", "interp", source_tool="t",
)
await graph.update_hypothesis_confidence(h1, pid, "direct_evidence", "")
closed = await graph.complete_investigation_round(
rid, strategist_action="propose_leads",
decision_rationale="found new evidence for h1",
)
assert closed is not None
assert closed.completed_at != ""
assert closed.hypothesis_status_snapshot_after[h1] == "supported"
assert closed.hypothesis_status_snapshot_after[h2] == "active"
assert closed.new_phenomena_count == 1
assert closed.new_edges_count == 1
assert closed.status_flips == 1
@pytest.mark.asyncio
async def test_complete_round_is_idempotent(self):
graph = EvidenceGraph()
rid = await graph.start_investigation_round(1)
first = await graph.complete_investigation_round(rid)
await graph.add_phenomenon("fs", "x", "y", "z", source_tool="t")
second = await graph.complete_investigation_round(rid)
assert first is second
assert first.phenomena_count_after == 0
@pytest.mark.asyncio
async def test_leads_from_round_filters_correctly(self):
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
await graph.add_lead(
target_agent="filesystem", description="r1 lead",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=1,
)
await graph.add_lead(
target_agent="filesystem", description="r2 lead",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=2,
context={"source_id": "src-different"},
)
await graph.add_lead(
target_agent="ios_artifact", description="phase 1 finding",
proposed_by="filesystem", round_number=0,
)
r1 = graph.leads_from_round(1)
r2 = graph.leads_from_round(2)
r0 = graph.leads_from_round(0)
assert len(r1) == 1 and r1[0].description == "r1 lead"
assert len(r2) == 1 and r2[0].description == "r2 lead"
assert len(r0) == 1 and r0[0].proposed_by == "filesystem"
@pytest.mark.asyncio
async def test_round_persistence_round_trip(self, tmp_path):
"""Investigation rounds must survive save/load."""
path = tmp_path / "state.json"
graph = EvidenceGraph(persist_path=path)
hid = await graph.add_hypothesis("h", "d")
rid = await graph.start_investigation_round(1)
await graph.complete_investigation_round(
rid, decision_rationale="probe complete",
)
loaded = EvidenceGraph.load_state(path)
assert len(loaded.investigation_rounds) == 1
r = loaded.investigation_rounds[0]
assert r.id == rid
assert r.decision_rationale == "probe complete"
assert hid in r.hypothesis_status_snapshot_before