diff --git a/orchestrator.py b/orchestrator.py index 7821ad7..ed3cd94 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -119,6 +119,11 @@ class Orchestrator: self._failure_count = 0 self._max_failures = 3 self._start_time = datetime.now() + # Make budgets visible to strategy tools via the graph object. The + # budget_status tool reads graph.budgets / graph.run_start_monotonic + # directly so it does not need a back-reference to the orchestrator. + self.graph.budgets = dict(self.config.get("budgets", {}) or {}) + self.graph.run_start_monotonic = time.monotonic() def _resolve_agent_type(self, agent_type: str) -> str: return AGENT_ALIASES.get(agent_type, agent_type) @@ -195,6 +200,249 @@ class Orchestrator: lead.context["retry"] = True await self._dispatch_leads_parallel(failed) + # ---- Phase 3: strategist loop (DESIGN_STRATEGIST.md §4) ------------------ + + def _budget_exceeded(self) -> bool: + """Hard budget enforcement, complementing strategist self-throttling. + + Any of these triggers an immediate Phase 3 exit even if the + strategist hasn't called declare_investigation_complete. Each cap + is optional — leave it out of config to make it unbounded. + """ + b = self.graph.budgets or {} + tc_cap = b.get("tool_calls_total") + if tc_cap and len(self.graph.tool_invocations) >= tc_cap: + return True + wc_cap = b.get("wall_clock_minutes_max") + if wc_cap and self.graph.run_start_monotonic is not None: + elapsed_min = (time.monotonic() - self.graph.run_start_monotonic) / 60.0 + if elapsed_min >= wc_cap: + return True + return False + + async def _execute_strategist_lead(self, lead, round_num: int) -> None: + """Dispatch one strategist-proposed lead to its target worker. + + Unlike the legacy bulk dispatcher this runs leads serially so each + worker run reads a graph that includes prior leads' findings — the + strategist's next round can see the cumulative effect of this round. + """ + agent_type = AGENT_ALIASES.get(lead.target_agent, lead.target_agent) + worker = self.factory.get_or_create_agent(agent_type) + if worker is None: + logger.warning( + "No worker registered for lead %s: target_agent=%s", + lead.id, agent_type, + ) + lead.status = "failed" + lead.context["failure_reason"] = f"no worker for agent type '{agent_type}'" + self.graph._auto_save() + return + + source_id = (lead.context or {}).get("source_id", "") + if source_id and self.graph.case is not None: + src = self.graph.case.get_source(source_id) + if src: + self.graph.set_active_source(src) + + rationale = (lead.context or {}).get("rationale", "") + worker_task = ( + f"Investigate this specific lead from the strategist:\n\n" + f"REQUEST: {lead.description}\n" + f"MOTIVATING HYPOTHESIS: {lead.motivating_hypothesis or '(unspecified)'}\n" + f"EXPECTED EVIDENCE TYPE: {lead.expected_evidence_type or '(unspecified)'}\n" + f"RATIONALE: {rationale or '(unspecified)'}\n\n" + f"After investigating, record findings via add_phenomenon AND " + f"link relevant phenomena to " + f"{lead.motivating_hypothesis or 'the motivating hypothesis'} via the " + f"appropriate edge_type. If your investigation produces no relevant " + f"finding, record that as a negative phenomenon so the strategist " + f"can see the gap was probed." + ) + _log( + f"Round {round_num} dispatching: {lead.description[:80]}", + event="dispatch", agent=agent_type, lead=lead.id, + ) + lead.status = "assigned" + self.graph._auto_save() + try: + await worker.run(worker_task, lead_id=lead.id) + lead.status = "completed" + except Exception as e: + logger.error("Strategist lead %s failed: %s", lead.id, e, exc_info=True) + lead.status = "failed" + lead.context["failure_reason"] = str(e) + finally: + self.graph._auto_save() + + async def _phase3_strategist_loop(self) -> None: + """Belief-driven investigation: strategist proposes, workers execute, + repeat. Replaces the legacy fixed-round investigation loop. + """ + _log("Phase 3: Strategist-Driven Investigation", event="phase") + strategist_cfg = self.config.get("strategist", {}) or {} + max_rounds = int(strategist_cfg.get("max_rounds", 10)) + zero_yield_cap = int(strategist_cfg.get("hard_stop_marginal_yield_zero_rounds", 3)) + + strategist = self.factory.get_or_create_agent("strategist") + if strategist is None: + logger.error( + "InvestigationStrategist agent not registered — falling back " + "to legacy Phase 3 loop. Check agent_factory._AGENT_CLASSES." + ) + await self._phase3_legacy_loop() + return + + zero_yield_streak = 0 + + for round_num in range(1, max_rounds + 1): + # Reset per-round flags so a previous round's declare_complete + # doesn't leak across iterations (defensive — strategist also + # only sets True, never False). + self.graph.strategist_complete_requested = False + self.graph.current_strategist_round = round_num + rid = await self.graph.start_investigation_round(round_num) + _log( + f"Strategist Round {round_num}/{max_rounds}", event="phase", + round=round_num, + ) + t0 = time.monotonic() + + try: + await strategist.run( + f"Review the current investigation state and decide the " + f"next action. This is round {round_num}/{max_rounds}. " + f"Use graph_overview / marginal_yield / budget_status / " + f"source_coverage to ground your decision, then call " + f"propose_lead 1-3 times OR declare_investigation_complete." + ) + except Exception as e: + logger.error("Strategist round %d failed: %s", round_num, e, exc_info=True) + await self.graph.complete_investigation_round( + rid, decision_rationale=f"strategist crashed: {e}", + ) + break + + # Strategist declared complete → no leads execute, exit loop. + if self.graph.strategist_complete_requested: + _log( + f"Strategist declared complete at round {round_num}", + event="progress", elapsed=time.monotonic() - t0, + ) + await self.graph.complete_investigation_round( + rid, strategist_action="declare_complete", + decision_rationale="strategist declare_investigation_complete", + ) + break + + # Collect this round's leads (proposed_by=strategist + matching round). + new_leads = [ + l for l in self.graph.leads + if l.round_number == round_num + and l.proposed_by == "strategist" + and l.status == "pending" + ] + if not new_leads: + _log( + f"Round {round_num}: strategist proposed no new leads — exiting loop", + event="progress", elapsed=time.monotonic() - t0, + ) + await self.graph.complete_investigation_round( + rid, strategist_action="no_leads", + decision_rationale="strategist proposed no new leads", + ) + break + + # Dispatch each lead to its worker. + for lead in new_leads: + await self._execute_strategist_lead(lead, round_num) + + # After workers run, judge any new phenomena against existing + # hypotheses (so confidence updates happen before the next round + # of strategist reasoning). + if self.graph.phenomena and self.graph.hypotheses: + await self._judge_new_phenomena() + + closed = await self.graph.complete_investigation_round( + rid, strategist_action="propose_leads", + leads_executed=[l.id for l in new_leads], + ) + + # Show round outcome. + for h in self.graph.hypotheses.values(): + _log(f" {h.summary()}", event="hypothesis") + _log( + _progress_summary(self.graph) + f" (yield: +{closed.new_phenomena_count}ph, +{closed.new_edges_count}edges, {closed.status_flips}flips)", + event="progress", elapsed=time.monotonic() - t0, + ) + + # Marginal-yield hard stop. Distinct from strategist self-throttle: + # if the strategist insists on continuing through repeated dry + # rounds, force-stop. This protects against an over-eager + # strategist + a confused worker that produces no edges. + yield_total = ( + closed.new_phenomena_count + + closed.new_edges_count + + closed.status_flips + ) + if yield_total == 0: + zero_yield_streak += 1 + if zero_yield_streak >= zero_yield_cap: + _log( + f"Hard stop: {zero_yield_streak} consecutive " + f"zero-yield rounds (cap {zero_yield_cap})", + event="progress", + ) + break + else: + zero_yield_streak = 0 + + if self._budget_exceeded(): + _log( + f"Budget exhausted after round {round_num} — exiting Phase 3", + event="progress", + ) + break + else: + _log( + f"Strategist max_rounds={max_rounds} reached", event="progress", + ) + + # Always reset the round counter on exit so subsequent runs don't + # inherit the last value. + self.graph.current_strategist_round = 0 + + async def _phase3_legacy_loop(self) -> None: + """Legacy fixed-round Phase 3 — preserved for fallback / regression. + + Engaged when config has ``strategist.enabled: false`` or when the + strategist agent class is somehow not registered. Behaves identically + to the pre-DESIGN_STRATEGIST orchestrator: bounded iteration, + hypothesis-derived leads, parallel dispatch, gap analysis. + """ + max_rounds = self.config.get("max_investigation_rounds", 5) + for round_num in range(max_rounds): + _log(f"Phase 3: Investigation Round {round_num}", event="phase") + t0 = time.monotonic() + + if self.graph.hypotheses_converged(): + _log("All hypotheses converged — stopping", event="progress") + break + + await self._generate_hypothesis_leads() + + pending = await self.graph.get_pending_leads() + if not pending: + _log("No pending leads — round complete", event="progress") + break + + await self._dispatch_leads_parallel(pending) + await self._judge_new_phenomena() + + for h in self.graph.hypotheses.values(): + _log(f" {h.summary()}", event="hypothesis") + _log(_progress_summary(self.graph), event="progress", elapsed=time.monotonic() - t0) + # ---- Hypothesis generation ----------------------------------------------- async def _generate_hypotheses_manual(self, hypotheses_config: list[dict]) -> None: @@ -881,39 +1129,26 @@ class Orchestrator: event="progress", elapsed=time.monotonic() - t0, ) - # Phase 3: Hypothesis-directed investigation (iterative) + # Phase 3: Strategist-driven investigation (DESIGN_STRATEGIST.md) if resume_phase <= 3: - max_rounds = self.config.get("max_investigation_rounds", 5) - for round_num in range(max_rounds): - _log(f"Phase 3: Investigation Round {round_num}", event="phase") - t0 = time.monotonic() + strategist_cfg = self.config.get("strategist", {}) or {} + strategist_enabled = strategist_cfg.get("enabled", True) + if strategist_enabled: + await self._phase3_strategist_loop() + else: + # Legacy fallback — keep the old hypothesis-directed + # iterative loop available for runs that explicitly + # disable the strategist (debugging, regression + # comparison, or environments without the strategist + # agent registered). + await self._phase3_legacy_loop() - if self.graph.hypotheses_converged(): - _log("All hypotheses converged — stopping", event="progress") - break - - await self._generate_hypothesis_leads() - - pending = await self.graph.get_pending_leads() - if not pending: - _log("No pending leads — round complete", event="progress") - break - - await self._dispatch_leads_parallel(pending) - await self._judge_new_phenomena() - - # Show hypothesis status update - for h in self.graph.hypotheses.values(): - _log(f" {h.summary()}", event="hypothesis") - _log(_progress_summary(self.graph), event="progress", elapsed=time.monotonic() - t0) - - # Retry failed leads + # Retry failed leads + Gap Analysis run regardless of which + # Phase 3 variant was used — they operate on the leads/ + # hypothesis graph the strategist loop leaves behind. await self._retry_failed_leads() - - # Gap analysis _log("Phase 3: Gap Analysis", event="phase") await self._run_gap_analysis() - self.graph.mark_remaining_inconclusive() # Phase 4: Timeline construction diff --git a/tests/test_optimizations.py b/tests/test_optimizations.py index f10592a..6b1b663 100644 --- a/tests/test_optimizations.py +++ b/tests/test_optimizations.py @@ -3407,6 +3407,202 @@ class TestInvestigationRound: assert "not in" in result assert graph.strategist_complete_requested is False + @pytest.mark.asyncio + async def test_strategist_loop_exits_on_declare_complete(self): + """Mock strategist that declares complete in round 1 — orchestrator + must exit the Phase 3 loop without dispatching any worker.""" + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + + graph = EvidenceGraph() + llm = AsyncMock() + worker_runs: list[str] = [] + + class FakeStrategist: + name = "strategist" + async def run(self, task, lead_id=None): + graph.strategist_complete_requested = True + return "complete" + + class FakeFactory: + def __init__(self): + self._instances = {"strategist": FakeStrategist()} + def get_or_create_agent(self, name): + return self._instances.get(name) + + orch = Orchestrator(llm, graph, FakeFactory(), config={ + "strategist": {"enabled": True, "max_rounds": 5}, + }) + await orch._phase3_strategist_loop() + + assert len(graph.investigation_rounds) == 1 + r = graph.investigation_rounds[0] + assert r.strategist_action == "declare_complete" + assert r.completed_at != "" + assert worker_runs == [] + + @pytest.mark.asyncio + async def test_strategist_loop_dispatches_lead_then_completes(self): + """Strategist proposes 1 lead in round 1, declares complete in round 2. + Loop must dispatch the worker for the lead, then exit cleanly. + """ + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + from case import Case, EvidenceSource + + graph = EvidenceGraph() + src = EvidenceSource(id="src-A", label="A", type="disk_image", + access_mode="image", path="/tmp/x") + graph.case = Case(case_id="c", name="n", sources=[src]) + graph.set_active_source(src) + hid = await graph.add_hypothesis("h", "d") + llm = AsyncMock() + worker_calls: list[tuple[str, str]] = [] + + class FakeStrategist: + name = "strategist" + def __init__(self): + self.round = 0 + async def run(self, task, lead_id=None): + self.round += 1 + if self.round == 1: + await graph.add_lead( + target_agent="filesystem", + description="probe X", + proposed_by="strategist", + motivating_hypothesis=hid, + expected_evidence_type="supports", + round_number=graph.current_strategist_round, + ) + else: + graph.strategist_complete_requested = True + return "ok" + + class FakeWorker: + name = "filesystem" + async def run(self, task, lead_id=None): + worker_calls.append((self.name, lead_id)) + return "did the thing" + + class FakeFactory: + def __init__(self): + self.s = FakeStrategist() + self.w = FakeWorker() + def get_or_create_agent(self, name): + if name == "strategist": return self.s + return self.w + + orch = Orchestrator(llm, graph, FakeFactory(), config={ + "strategist": {"enabled": True, "max_rounds": 5, + "hard_stop_marginal_yield_zero_rounds": 99}, + }) + await orch._phase3_strategist_loop() + + assert len(graph.investigation_rounds) == 2 + assert graph.investigation_rounds[0].strategist_action == "propose_leads" + assert graph.investigation_rounds[1].strategist_action == "declare_complete" + assert len(worker_calls) == 1 + assert worker_calls[0][0] == "filesystem" + leads = [l for l in graph.leads if l.proposed_by == "strategist"] + assert len(leads) == 1 + assert leads[0].status == "completed" + + @pytest.mark.asyncio + async def test_strategist_loop_hard_stop_on_zero_yield(self): + """If the strategist insists on more rounds but yield stays zero for + N consecutive rounds, the orchestrator force-stops as a safety net.""" + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + + graph = EvidenceGraph() + llm = AsyncMock() + + class FakeStrategist: + name = "strategist" + async def run(self, task, lead_id=None): + hid_local = next(iter(graph.hypotheses)) if graph.hypotheses else None + await graph.add_lead( + target_agent="filesystem", description="probe", + proposed_by="strategist", + motivating_hypothesis=hid_local or "", + expected_evidence_type="supports", + round_number=graph.current_strategist_round, + ) + + class FakeWorker: + name = "filesystem" + async def run(self, task, lead_id=None): + return "" + + class FakeFactory: + def __init__(self): + self.s = FakeStrategist() + self.w = FakeWorker() + def get_or_create_agent(self, name): + return self.s if name == "strategist" else self.w + + hid = await graph.add_hypothesis("h", "d") + orch = Orchestrator(llm, graph, FakeFactory(), config={ + "strategist": { + "enabled": True, "max_rounds": 20, + "hard_stop_marginal_yield_zero_rounds": 2, + }, + }) + await orch._phase3_strategist_loop() + assert len(graph.investigation_rounds) == 2 + + @pytest.mark.asyncio + async def test_strategist_loop_budget_exhaustion_stops_loop(self): + """Hard budget cap on tool_calls_total kills the loop even when the + strategist wants to continue.""" + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + + graph = EvidenceGraph() + llm = AsyncMock() + # Pre-stuff the invocations log so we're already past the cap. + await graph.record_tool_invocation( + tool="probe", args={}, output="x", + ) + await graph.record_tool_invocation( + tool="probe", args={}, output="y", + ) + + class FakeStrategist: + name = "strategist" + async def run(self, task, lead_id=None): + hid_local = next(iter(graph.hypotheses)) if graph.hypotheses else "" + await graph.add_lead( + target_agent="filesystem", description="x", + proposed_by="strategist", + motivating_hypothesis=hid_local, + expected_evidence_type="supports", + round_number=graph.current_strategist_round, + ) + + class FakeWorker: + name = "filesystem" + async def run(self, task, lead_id=None): + await graph.record_tool_invocation( + tool="probe", args={}, output="z", + ) + + class FakeFactory: + def __init__(self): + self.s = FakeStrategist() + self.w = FakeWorker() + def get_or_create_agent(self, name): + return self.s if name == "strategist" else self.w + + hid = await graph.add_hypothesis("h", "d") + orch = Orchestrator(llm, graph, FakeFactory(), config={ + "strategist": {"enabled": True, "max_rounds": 99, + "hard_stop_marginal_yield_zero_rounds": 99}, + "budgets": {"tool_calls_total": 2}, + }) + await orch._phase3_strategist_loop() + assert len(graph.investigation_rounds) == 1 + @pytest.mark.asyncio async def test_marginal_yield_after_two_rounds(self): """Verify marginal_yield captures phenomena/edge/status deltas."""