feat(strategist) S3: propose_lead / declare_investigation_complete

DESIGN_STRATEGIST.md §2.5. The strategist's two write actions.

propose_lead validates motivating_hypothesis exists in the graph,
validates expected_evidence_type is a real edge type, validates
source_id refers to a real source in the case — fast specific
errors so the strategist gets fixable feedback rather than a
generic crash. On success, calls graph.add_lead with proposed_by=
"strategist" and round_number=graph.current_strategist_round so
the round-completion code can collect this round's leads.

declare_investigation_complete sets graph.strategist_complete_requested
which the orchestrator inspects after each strategist run to decide
whether to break the loop. reason must come from a closed enum so
the audit log is consistent.

EvidenceGraph gains two transient run-context fields:
  current_strategist_round       — set by orchestrator at start of round
  strategist_complete_requested  — flipped by declare_complete

These are intentionally NOT persisted — they're per-run flags, not
graph state.

Both tools required to be in InvestigationStrategist.mandatory_record_
tools (added in S4) so the agent's forced-retry mechanism kicks in if
it returns without taking a documented decision.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-21 02:21:13 -10:00
parent 6ebbc675c1
commit ff3a05d7ce
3 changed files with 279 additions and 0 deletions

View File

@@ -680,6 +680,15 @@ class EvidenceGraph:
self.budgets: dict[str, int] = {}
self.run_start_monotonic: float | None = None
# Current strategist round number. Set by the orchestrator at the
# top of each strategist loop iteration so propose_lead / declare_
# investigation_complete can tag their actions correctly. 0 when
# the strategist is not running.
self.current_strategist_round: int = 0
# Set to True by declare_investigation_complete so the orchestrator
# knows to break out of the strategist loop after this round.
self.strategist_complete_requested: bool = False
# _current_agent / _current_task_id are exposed as @property below,
# backed by module-level ContextVars (race-free under asyncio.gather).

View File

@@ -3271,6 +3271,102 @@ class TestInvestigationRound:
)
assert "≥ 90%" in bs2 # already over 90% (1 of 1 tool calls used)
@pytest.mark.asyncio
async def test_propose_lead_validates_hypothesis_id(self):
"""propose_lead must reject leads whose motivating_hypothesis isn't
actually a registered hypothesis — that's a strategist hallucination
analogous to citing a bogus invocation_id.
"""
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-1"
graph.current_strategist_round = 1
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="probe X",
target_agent="filesystem",
motivating_hypothesis="hyp-does-not-exist",
expected_evidence_type="supports",
)
assert "not in graph.hypotheses" in result
assert not graph.leads
@pytest.mark.asyncio
async def test_propose_lead_creates_strategist_lead(self):
"""propose_lead happy path writes a strategist-attributed lead
tagged with the current round_number."""
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-2"
graph.current_strategist_round = 3
hid = await graph.add_hypothesis("h", "d")
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="check Safari bookmarks",
target_agent="ios_artifact",
motivating_hypothesis=hid,
expected_evidence_type="supports",
rationale="single-source hypothesis needs corroboration",
)
assert "proposed" in result
lead = graph.leads[0]
assert lead.proposed_by == "strategist"
assert lead.motivating_hypothesis == hid
assert lead.round_number == 3
assert lead.expected_evidence_type == "supports"
@pytest.mark.asyncio
async def test_propose_lead_rejects_invalid_evidence_type(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-3"
graph.current_strategist_round = 1
hid = await graph.add_hypothesis("h", "d")
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="x", target_agent="filesystem",
motivating_hypothesis=hid,
expected_evidence_type="bogus_type",
)
assert "not one of" in result
assert not graph.leads
@pytest.mark.asyncio
async def test_declare_complete_flips_request_flag(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-4"
graph.current_strategist_round = 5
register_all_tools(graph)
td = TOOL_CATALOG["declare_investigation_complete"]
assert graph.strategist_complete_requested is False
result = await td.executor(
reason="marginal_yield_zero",
rationale="two rounds with 0 yield",
)
assert graph.strategist_complete_requested is True
assert "round 5" in result
assert "marginal_yield_zero" in result
@pytest.mark.asyncio
async def test_declare_complete_rejects_bogus_reason(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-5"
register_all_tools(graph)
td = TOOL_CATALOG["declare_investigation_complete"]
result = await td.executor(reason="i_just_want_to_quit")
assert "not in" in result
assert graph.strategist_complete_requested is False
@pytest.mark.asyncio
async def test_marginal_yield_after_two_rounds(self):
"""Verify marginal_yield captures phenomena/edge/status deltas."""

View File

@@ -1077,6 +1077,180 @@ def register_all_tools(graph: Any) -> None:
tags=["strategy", "budget", "read-only"],
)
# ---- Strategist decision actions (DESIGN_STRATEGIST.md §2.5) ----
# propose_lead is the strategist's tool for "go deeper here";
# declare_investigation_complete is its tool for "we're done".
# Both are required to be in BaseAgent.mandatory_record_tools for the
# strategist subclass so the agent can't return without taking a
# documented decision.
_ALLOWED_EVIDENCE_EDGE_TYPES = (
"direct_evidence", "supports", "contradicts",
"weakens", "prerequisite_met", "consequence_observed",
)
async def _exec_propose_lead(
description: str,
target_agent: str,
motivating_hypothesis: str,
expected_evidence_type: str,
rationale: str = "",
source_id: str = "",
) -> str:
"""Propose a new lead from the strategist. Idempotent on the
(motivating_hypothesis, expected_evidence_type, target_agent,
source_id) tuple within a single run.
"""
# Validate refs early so the strategist gets a fast, specific error.
if motivating_hypothesis and motivating_hypothesis not in graph.hypotheses:
return (
f"Error: motivating_hypothesis {motivating_hypothesis!r} is "
f"not in graph.hypotheses. Call graph_overview to see the "
f"current hypothesis ids."
)
if expected_evidence_type not in _ALLOWED_EVIDENCE_EDGE_TYPES:
return (
f"Error: expected_evidence_type {expected_evidence_type!r} is "
f"not one of {list(_ALLOWED_EVIDENCE_EDGE_TYPES)}."
)
if source_id:
src_obj = graph.case.get_source(source_id) if graph.case else None
if src_obj is None:
return (
f"Error: source_id {source_id!r} is not in the case. "
f"Valid ids: {[s.id for s in (graph.case.sources if graph.case else [])]}"
)
lid = await graph.add_lead(
target_agent=target_agent,
description=description,
proposed_by="strategist",
motivating_hypothesis=motivating_hypothesis,
expected_evidence_type=expected_evidence_type,
round_number=graph.current_strategist_round,
hypothesis_id=motivating_hypothesis or None,
context={"source_id": source_id, "rationale": rationale} if source_id or rationale else {},
)
return (
f"Lead {lid} proposed: target_agent={target_agent}, "
f"motivating_hypothesis={motivating_hypothesis}, "
f"expected={expected_evidence_type}, source={source_id or ''}."
)
TOOL_CATALOG["propose_lead"] = ToolDefinition(
name="propose_lead",
description=(
"Propose a specific investigation lead that will be dispatched "
"after this strategist round. Each lead MUST name a motivating "
"hypothesis it expects to move and the kind of edge it expects "
"to produce. Do NOT propose a lead that just adds more same-"
"direction evidence to an already-supported hypothesis — harmonic "
"damping makes repeats cheap. DO propose leads when (a) a "
"hypothesis is supported only by one source — get cross-source "
"corroboration; (b) a hypothesis is in the active band — give it "
"the deciding evidence; (c) a high-value artefact is uncovered on "
"a source where an active hypothesis suggests it matters. "
"Idempotent on (motivating_hypothesis, expected_evidence_type, "
"target_agent, source_id) — re-proposing the same triple while "
"pending is a no-op that returns the existing lead's id."
),
input_schema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "1-2 sentence specific investigation request, including target source/artefact.",
},
"target_agent": {
"type": "string",
"enum": [
"filesystem", "registry", "communication", "network",
"ios_artifact", "android_artifact", "media",
"hypothesis", "timeline",
],
"description": "Which worker agent should pick this up.",
},
"source_id": {
"type": "string",
"description": "Which evidence source to investigate (e.g. 'src-ios-chan'). Optional for cross-source leads.",
},
"motivating_hypothesis": {
"type": "string",
"description": "hyp-id this lead is meant to corroborate or refute.",
},
"expected_evidence_type": {
"type": "string",
"enum": list(_ALLOWED_EVIDENCE_EDGE_TYPES),
"description": "What kind of P→H edge you expect this lead to produce.",
},
"rationale": {
"type": "string",
"description": "Why this fills a real gap — referenced in audit + worker prompt.",
},
},
"required": [
"description", "target_agent",
"motivating_hypothesis", "expected_evidence_type",
],
},
executor=_exec_propose_lead,
module="strategy",
tags=["strategy", "lead", "decision"],
)
_COMPLETE_REASONS = (
"marginal_yield_zero", "budget_exhausted",
"all_hypotheses_resolved", "coverage_saturated", "other",
)
async def _exec_declare_investigation_complete(
reason: str, rationale: str = "",
) -> str:
"""Terminal strategist action: signal "we're done" to the orchestrator."""
if reason not in _COMPLETE_REASONS:
return (
f"Error: reason {reason!r} not in "
f"{list(_COMPLETE_REASONS)}."
)
graph.strategist_complete_requested = True
return (
f"Investigation marked complete in round "
f"{graph.current_strategist_round}. reason={reason}. "
f"rationale={rationale or '(none)'}. The orchestrator will exit "
f"the strategist loop after this round."
)
TOOL_CATALOG["declare_investigation_complete"] = ToolDefinition(
name="declare_investigation_complete",
description=(
"Terminal strategist action. Call this when (a) marginal_yield "
"shows zero across 2+ rounds, (b) budget is exhausted, (c) all "
"active hypotheses are resolved, or (d) coverage is saturated "
"with respect to the active hypotheses. After this call, the "
"orchestrator finishes the strategist loop and proceeds to "
"Phase 4 (timeline) and Phase 5 (report). The current round's "
"in-flight work still completes."
),
input_schema={
"type": "object",
"properties": {
"reason": {
"type": "string",
"enum": list(_COMPLETE_REASONS),
"description": "Termination cause — picked from a closed set so the audit log is consistent.",
},
"rationale": {
"type": "string",
"description": "Free-text justification — quoted into the InvestigationRound's decision_rationale.",
},
},
"required": ["reason"],
},
executor=_exec_declare_investigation_complete,
module="strategy",
tags=["strategy", "terminal", "decision"],
)
# ---- Wrap every executor with invocation logging (+ cache + auto-record) ----
# Must run AFTER all tools are registered. Every tool call now produces
# a ToolInvocation entry on the graph (provenance for grounding), and