From 388321ee301595aee4ab2f5d73993dfd339b422a Mon Sep 17 00:00:00 2001 From: BattleTag Date: Thu, 21 May 2026 02:27:05 -1000 Subject: [PATCH] feat(strategist) S7: strategist resume / open-round repair MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESIGN_STRATEGIST.md §5. Support resume from a crash mid-strategist-loop. _resume_strategist_state inspects investigation_rounds for a tail entry without completed_at — an "open" round, i.e. one that started but never closed. Two repairs: 1. Mark the round closed with strategist_action="interrupted_resume" so the run history reflects what actually happened. 2. Walk that round's leads; any still in "assigned" state are re-marked as "failed" with failure_reason="interrupted before complete". The Retry-failed-leads + Gap-analysis passes that run after the strategist loop can pick them up. Returns max(round_number) + 1 — the round at which to resume the loop. On a clean graph (no prior rounds) returns 1 and makes no changes. _phase3_strategist_loop now calls this helper before the main for-loop and uses its return value as start_round, so a resume run lands at the right round number rather than restarting from R1. Co-Authored-By: Claude Opus 4.7 (1M context) --- orchestrator.py | 51 ++++++++++++++++++++++++++++++++++++- tests/test_optimizations.py | 48 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/orchestrator.py b/orchestrator.py index ed3cd94..fbc2ada 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -275,6 +275,45 @@ class Orchestrator: finally: self.graph._auto_save() + async def _resume_strategist_state(self) -> int: + """Repair any open InvestigationRound after a resume and return the + next round number to use. + + An "open" round is one with ``started_at`` set but ``completed_at`` + empty — interrupted before its complete step. Mark it as completed + with action=interrupted_resume so the run history is self-describing, + and mark any leads still in the "assigned" state from that round as + "failed" so the gap-analysis / retry paths can re-process them. + Returns the round number the strategist loop should start from + (1 + the highest existing round_number). + """ + if not self.graph.investigation_rounds: + return 1 + highest = max(r.round_number for r in self.graph.investigation_rounds) + last = self.graph.latest_round() + if last is not None and not last.completed_at: + assigned_in_round = [ + l for l in self.graph.leads + if l.round_number == last.round_number + and l.status == "assigned" + ] + for lead in assigned_in_round: + lead.status = "failed" + lead.context["failure_reason"] = "interrupted before complete" + await self.graph.complete_investigation_round( + last.id, strategist_action="interrupted_resume", + decision_rationale=( + f"resume repair: this round was interrupted before " + f"completion; {len(assigned_in_round)} assigned leads " + f"have been re-marked as failed." + ), + ) + logger.info( + "Strategist resume: repaired open round R%d (closed %d assigned leads)", + last.round_number, len(assigned_in_round), + ) + return highest + 1 + async def _phase3_strategist_loop(self) -> None: """Belief-driven investigation: strategist proposes, workers execute, repeat. Replaces the legacy fixed-round investigation loop. @@ -293,9 +332,19 @@ class Orchestrator: await self._phase3_legacy_loop() return + # Resume support: if we're restarting after an interruption, repair + # any half-open round and pick up at the next number. + start_round = await self._resume_strategist_state() + if start_round > 1: + _log( + f"Resuming strategist loop at round {start_round} " + f"(history: {len(self.graph.investigation_rounds)} prior rounds)", + event="progress", + ) + zero_yield_streak = 0 - for round_num in range(1, max_rounds + 1): + for round_num in range(start_round, max_rounds + 1): # Reset per-round flags so a previous round's declare_complete # doesn't leak across iterations (defensive — strategist also # only sets True, never False). diff --git a/tests/test_optimizations.py b/tests/test_optimizations.py index 6b1b663..9242686 100644 --- a/tests/test_optimizations.py +++ b/tests/test_optimizations.py @@ -3407,6 +3407,54 @@ class TestInvestigationRound: assert "not in" in result assert graph.strategist_complete_requested is False + @pytest.mark.asyncio + async def test_resume_repairs_open_round(self, tmp_path): + """Simulate a crash mid-round: half-open InvestigationRound + a + lead in 'assigned' state. _resume_strategist_state must close the + round and re-mark the lead as failed.""" + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + rid = await graph.start_investigation_round(1) + lid = await graph.add_lead( + target_agent="filesystem", description="probe", + proposed_by="strategist", motivating_hypothesis=hid, + expected_evidence_type="supports", round_number=1, + ) + lead = next(l for l in graph.leads if l.id == lid) + lead.status = "assigned" + + llm = AsyncMock() + class FakeFactory: + def get_or_create_agent(self, name): + return AsyncMock() + orch = Orchestrator(llm, graph, FakeFactory(), config={}) + next_round = await orch._resume_strategist_state() + + round_ = graph.get_investigation_round(rid) + assert round_.completed_at != "" + assert round_.strategist_action == "interrupted_resume" + lead2 = next(l for l in graph.leads if l.id == lid) + assert lead2.status == "failed" + assert "interrupted" in lead2.context.get("failure_reason", "") + assert next_round == 2 + + @pytest.mark.asyncio + async def test_resume_state_is_idempotent_on_clean_graph(self): + """No prior rounds → resume returns 1, no changes.""" + from unittest.mock import AsyncMock + from orchestrator import Orchestrator + graph = EvidenceGraph() + llm = AsyncMock() + class FakeFactory: + def get_or_create_agent(self, name): + return AsyncMock() + orch = Orchestrator(llm, graph, FakeFactory(), config={}) + result = await orch._resume_strategist_state() + assert result == 1 + assert graph.investigation_rounds == [] + @pytest.mark.asyncio async def test_strategist_loop_exits_on_declare_complete(self): """Mock strategist that declares complete in round 1 — orchestrator