From 6ebbc675c1b422ea7af43bfb963e0f459e62dc7d Mon Sep 17 00:00:00 2001 From: BattleTag Date: Thu, 21 May 2026 02:19:54 -1000 Subject: [PATCH] feat(strategist) S2: graph_overview / source_coverage / marginal_yield / budget_status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESIGN_STRATEGIST.md §2. Four read-only view tools the strategist uses to ground its decision each round. graph_overview() — hypotheses table (log_odds, conf, edges_in, distinct_sources, recent_flip), sources table, pending leads. distinct_sources is the critical signal: a hypothesis with 23 edges but only 1 distinct_source has fragile cross- source independence and is a candidate for a corroboration-seeking lead. source_coverage(src) — per-source ✓/✗ against an expected-artefact catalogue. Catalogue is heuristic hints, NOT a forced checklist. Footer reminds the strategist to investigate ✗ items only when an active hypothesis depends on them — this is the "应试能力存在但不被绑死" guardrail. marginal_yield(N) — new phenomena / edges / status flips per recent round. Two consecutive zero-yield rounds = strong signal to declare complete. budget_status() — usage vs caps (tool_calls, rounds, wall clock). Pacing warnings at 70% / 90%. tools/strategy.py also exports EXPECTED_ARTEFACTS, a per-source-type table of (name, detector, value_for) entries. Detectors are substring patterns on tool name + args; the matcher resolves at call time against graph.tool_invocations. Catalogue covers iOS / Android / Windows disk / media-collection / archive source types. All four tools registered in tool_registry, listed as read-only in llm_client.READ_ONLY_TOOLS for parallel execution. They go through the invocation-logging wrapper so the strategist's reads are themselves auditable (the wrapper does NOT cache them — graph state changes between calls). Co-Authored-By: Claude Opus 4.7 (1M context) --- llm_client.py | 2 + tests/test_optimizations.py | 81 ++++++ tool_registry.py | 92 +++++++ tools/strategy.py | 485 ++++++++++++++++++++++++++++++++++++ 4 files changed, 660 insertions(+) create mode 100644 tools/strategy.py diff --git a/llm_client.py b/llm_client.py index cee06a9..b23876b 100644 --- a/llm_client.py +++ b/llm_client.py @@ -148,6 +148,8 @@ READ_ONLY_TOOLS: set[str] = { "parse_ios_keychain", "read_idevice_info", # Android + media reads (S6) — set_active_partition is NOT read-only. "probe_android_partitions", "ocr_image", + # Strategist view tools (DESIGN_STRATEGIST.md §2) — pure renders. + "graph_overview", "source_coverage", "marginal_yield", "budget_status", } diff --git a/tests/test_optimizations.py b/tests/test_optimizations.py index 096d70f..5d3152d 100644 --- a/tests/test_optimizations.py +++ b/tests/test_optimizations.py @@ -3214,3 +3214,84 @@ class TestInvestigationRound: assert r.decision_rationale == "probe complete" assert hid in r.hypothesis_status_snapshot_before + @pytest.mark.asyncio + async def test_strategy_tool_helpers(self): + """Smoke-test the strategy tool renders against a small graph. + Renders are markdown strings — we assert structural anchors rather + than full text so the test is robust to wording tweaks. + """ + from tools import strategy + from case import Case, EvidenceSource + + graph = EvidenceGraph() + src = EvidenceSource( + id="src-test", label="test iOS", type="mobile_extraction", + access_mode="tree", path="/tmp/x", + ) + graph.case = Case(case_id="c", name="c", sources=[src]) + graph.set_active_source(src) + graph._current_agent = "ios_artifact" + graph._current_task_id = "task-1" + + # graph_overview on empty graph still renders. + ov = strategy.graph_overview(graph) + assert "# Investigation State" in ov + assert "_(none yet" in ov # hypotheses section + assert "src-test" in ov + + # Source coverage — every entry should be ✗ initially. + cov = strategy.source_coverage(graph, "src-test") + assert "Coverage: **0/" in cov + assert "✗" in cov + assert "Coverage hints are heuristics" in cov + + # Record one invocation that matches the AddressBook detector. + await graph.record_tool_invocation( + tool="sqlite_query", + args={"db_path": "/x/var/mobile/Library/AddressBook/AddressBook.sqlitedb"}, + output="contact list", + ) + cov2 = strategy.source_coverage(graph, "src-test") + assert "Coverage: **1/" in cov2 or "Coverage: **2/" in cov2 + + # marginal_yield: no rounds → empty render. + my = strategy.marginal_yield(graph) + assert "no completed investigation rounds" in my + + # budget_status with no budgets shows unbounded. + bs = strategy.budget_status(graph, None, None) + assert "tool_calls" in bs + assert "(unbounded)" in bs + + # budget_status with budgets + pacing hint. + bs2 = strategy.budget_status( + graph, + {"tool_calls_total": 1, "strategist_rounds_max": 1}, + None, + ) + assert "≥ 90%" in bs2 # already over 90% (1 of 1 tool calls used) + + @pytest.mark.asyncio + async def test_marginal_yield_after_two_rounds(self): + """Verify marginal_yield captures phenomena/edge/status deltas.""" + from tools import strategy + + graph = EvidenceGraph() + hid = await graph.add_hypothesis("h", "d") + + rid1 = await graph.start_investigation_round(1) + pid, _ = await graph.add_phenomenon( + "fs", "filesystem", "p1", "interp", source_tool="t", + ) + await graph.update_hypothesis_confidence(hid, pid, "direct_evidence", "") + await graph.complete_investigation_round(rid1) + + rid2 = await graph.start_investigation_round(2) + await graph.complete_investigation_round(rid2) + + out = strategy.marginal_yield(graph, last_n_rounds=2) + assert "R1" in out and "R2" in out + assert "Trend" in out + assert ("collapsed" in out or "Decelerating" in out + or "Diminishing" in out or "diminishing" in out) + diff --git a/tool_registry.py b/tool_registry.py index 87d5d67..676bd38 100644 --- a/tool_registry.py +++ b/tool_registry.py @@ -24,6 +24,7 @@ from tools import mobile_ios as ios from tools import parsers from tools import registry as reg from tools import sleuthkit as tsk +from tools import strategy as strat logger = logging.getLogger(__name__) @@ -985,6 +986,97 @@ def register_all_tools(graph: Any) -> None: tags=["media", "ocr", "image"], ) + # ---- Strategist-loop view tools (DESIGN_STRATEGIST.md §2) ---- + # Pure read-only renders over graph state. The strategist agent uses + # these to decide whether to keep investigating or to declare complete. + # They go through invocation logging like every other tool (so the + # strategist's reads are auditable) but are NOT cacheable — graph + # state changes between calls and a stale snapshot would mislead. + + async def _exec_graph_overview() -> str: + return strat.graph_overview(graph) + + TOOL_CATALOG["graph_overview"] = ToolDefinition( + name="graph_overview", + description=( + "Top-level investigation state: hypotheses (with log-odds, " + "confidence, edges_in, distinct_sources contributing, recent " + "status flips), sources (phenomena/identity counts, last-touched " + "round), and pending leads. Always call this first when deciding " + "the next strategist action." + ), + input_schema={"type": "object", "properties": {}}, + executor=_exec_graph_overview, + module="strategy", + tags=["strategy", "overview", "read-only"], + ) + + async def _exec_source_coverage(source_id: str) -> str: + return strat.source_coverage(graph, source_id) + + TOOL_CATALOG["source_coverage"] = ToolDefinition( + name="source_coverage", + description=( + "Per-source artefact coverage report: which expected categories " + "have been touched (✓) vs not (✗) on the given source. Coverage " + "items are heuristic hints, not requirements — investigate ✗ " + "items only when an active hypothesis depends on them." + ), + input_schema={ + "type": "object", + "properties": { + "source_id": {"type": "string", "description": "Source id, e.g. 'src-ios-chan'."}, + }, + "required": ["source_id"], + }, + executor=_exec_source_coverage, + module="strategy", + tags=["strategy", "coverage", "read-only"], + ) + + async def _exec_marginal_yield(last_n_rounds: int = 2) -> str: + return strat.marginal_yield(graph, int(last_n_rounds)) + + TOOL_CATALOG["marginal_yield"] = ToolDefinition( + name="marginal_yield", + description=( + "How much information the last N investigation rounds added: " + "new phenomena, new edges, and hypothesis status flips per round. " + "Two consecutive zero-yield rounds means diminishing returns are " + "decisive — declare_investigation_complete with reason " + "marginal_yield_zero." + ), + input_schema={ + "type": "object", + "properties": { + "last_n_rounds": {"type": "integer", "description": "How many recent rounds to summarise (default 2)."}, + }, + }, + executor=_exec_marginal_yield, + module="strategy", + tags=["strategy", "yield", "read-only"], + ) + + async def _exec_budget_status() -> str: + return strat.budget_status( + graph, + getattr(graph, "budgets", None), + getattr(graph, "run_start_monotonic", None), + ) + + TOOL_CATALOG["budget_status"] = ToolDefinition( + name="budget_status", + description=( + "Budget vs caps: tool_calls, strategist_rounds, wall_clock_minutes. " + "Includes pacing hints when usage crosses 70% / 90% thresholds. " + "Use this to decide whether to keep proposing leads or to wind down." + ), + input_schema={"type": "object", "properties": {}}, + executor=_exec_budget_status, + module="strategy", + tags=["strategy", "budget", "read-only"], + ) + # ---- Wrap every executor with invocation logging (+ cache + auto-record) ---- # Must run AFTER all tools are registered. Every tool call now produces # a ToolInvocation entry on the graph (provenance for grounding), and diff --git a/tools/strategy.py b/tools/strategy.py new file mode 100644 index 0000000..bb6ede3 --- /dev/null +++ b/tools/strategy.py @@ -0,0 +1,485 @@ +"""Strategist-loop tools — read-only views over graph state that let the +InvestigationStrategist agent decide whether to keep investigating or to +declare the investigation complete. + +DESIGN_STRATEGIST.md §2. Four read-only views: + + graph_overview() → hypotheses + sources + pending leads snapshot + source_coverage(src_id) → which artefact categories on this source have + been touched vs are still ✗ + marginal_yield(n_rounds) → how much information the last N rounds added + budget_status() → tool calls / rounds / wall-clock against caps + +These are pure render functions over the graph — they MUST NOT mutate state. +The strategist never writes phenomena/edges directly; all graph mutations +happen through worker agents that the strategist dispatches via propose_lead +(which is registered separately in tool_registry). +""" + +from __future__ import annotations + +import time +from typing import Any + + +# --------------------------------------------------------------------------- +# Expected artefact catalogue (per source type) +# +# These are SOFT HINTS — items the strategist might want to check on a given +# source type if any active hypothesis depends on them. The catalogue is +# intentionally compact; expand it in-place when a new forensic specialty +# joins the toolset. Each entry: +# +# name human-readable artefact category +# detector how to recognise that this category has been touched — either +# a tool name OR a `@` pattern, joined with +# `|` for alternatives. The matcher is substring on the tool name +# and on the args' string representation. +# value_for one-line description of why this category might matter +# --------------------------------------------------------------------------- + +EXPECTED_ARTEFACTS: dict[str, list[dict[str, str]]] = { + "disk_image+windows": [ + {"name": "partition layout", "detector": "partition_info|mmls", + "value_for": "deleted files, hidden partitions"}, + {"name": "filesystem walk", "detector": "list_directory|fls", + "value_for": "directory tree, recoverable deleted entries"}, + {"name": "registry hives", "detector": "parse_registry_key|list_installed_software|get_user_activity", + "value_for": "installed software, user activity, timezone"}, + {"name": "browser history", "detector": "list_directory@AppData|read_text_file@History|read_text_file@Bookmarks", + "value_for": "URL access, downloads, web search terms"}, + {"name": "prefetch", "detector": "parse_prefetch|extract_file@Prefetch", + "value_for": "program execution evidence"}, + {"name": "email/IM config", "detector": "get_email_config", + "value_for": "user accounts, configured mail/IM clients"}, + {"name": "recycle bin", "detector": "list_directory@$Recycle|count_deleted_files", + "value_for": "deleted file metadata and recovery"}, + ], + "disk_image+android": [ + {"name": "partition probe", "detector": "probe_android_partitions", + "value_for": "discover EFS / SYSTEM / USERDATA layout"}, + {"name": "system properties", "detector": "read_text_file@build.prop|read_text_file@default.prop", + "value_for": "device model, OS version, CSC region"}, + {"name": "app inventory", "detector": "list_directory@data/app|list_directory@data/data", + "value_for": "installed apps, package names"}, + {"name": "user data dbs", "detector": "list_directory@data/data|sqlite_query", + "value_for": "messages, contacts, app-specific data"}, + {"name": "device identity", "detector": "search_strings@imei|search_strings@serial|search_strings@DRI", + "value_for": "IMEI, serial, device fingerprint"}, + ], + "mobile_extraction": [ + {"name": "device info", "detector": "read_idevice_info|read_text_file@iDevice_info", + "value_for": "model, iOS version, IMEI, ICCID, Bluetooth MAC, UDID"}, + {"name": "AddressBook", "detector": "sqlite_query@AddressBook.sqlitedb", + "value_for": "contacts, owner identity"}, + {"name": "SMS / iMessage", "detector": "sqlite_query@sms.db", + "value_for": "messaging content, OTP / verification codes"}, + {"name": "WhatsApp messages", "detector": "sqlite_query@ChatStorage.sqlite|sqlite_query@WhatsApp", + "value_for": "WhatsApp content, group membership, call records"}, + {"name": "WeChat", "detector": "sqlite_query@MM.sqlite|sqlite_query@wcdb|list_directory@WeChat", + "value_for": "WeChat IDs, messages, follow targets"}, + {"name": "Call history", "detector": "sqlite_query@CallHistory|sqlite_query@call_history", + "value_for": "incoming/outgoing call log"}, + {"name": "Safari history", "detector": "sqlite_query@History.db|read_text_file@Bookmarks.plist|parse_plist@Bookmarks", + "value_for": "URL access, bookmarks, search queries"}, + {"name": "Photos library", "detector": "sqlite_query@Photos.sqlite|parse_plist@Photos", + "value_for": "photo metadata, EXIF, geolocation, source app"}, + {"name": "iCloud accounts", "detector": "parse_plist@Accounts3|parse_ios_keychain", + "value_for": "Apple ID, registered services, authentication tokens"}, + {"name": "app inventory", "detector": "list_directory@Bundle/Application|list_directory@Containers", + "value_for": "installed apps, app-specific containers"}, + {"name": "Wi-Fi history", "detector": "parse_plist@com.apple.wifi|read_text_file@known_networks", + "value_for": "connected SSIDs, keys, first/last seen times"}, + ], + "media_collection": [ + {"name": "archive unpack", "detector": "unzip_archive|list_directory", + "value_for": "extract images / docs for downstream analysis"}, + {"name": "OCR text", "detector": "ocr_image", + "value_for": "screenshot text content (chat, transaction, IDs)"}, + {"name": "metadata", "detector": "read_binary_preview|search_strings", + "value_for": "EXIF, embedded timestamps, device fingerprints"}, + ], + "archive": [ + {"name": "archive unpack", "detector": "unzip_archive", + "value_for": "expose contents for further analysis"}, + ], +} + + +def _key_for_source(src) -> str: + """Return the EXPECTED_ARTEFACTS key for a source: 'disk_image+platform' + when platform is set in meta, otherwise just the source type.""" + src_type = getattr(src, "type", "") + if src_type == "disk_image": + platform = (getattr(src, "meta", {}) or {}).get("platform", "").lower() + if platform: + return f"disk_image+{platform}" + return src_type + + +def _detector_matches(detector: str, tool_name: str, args_str: str) -> bool: + """Return True if any '|'-separated branch of `detector` matches. + + A branch like ``sqlite_query@AddressBook.sqlitedb`` requires both the + tool name (substring) AND the args (substring) to match. A branch like + ``parse_prefetch`` is a tool-name-only check. + """ + for branch in detector.split("|"): + branch = branch.strip() + if not branch: + continue + if "@" in branch: + t, sub = branch.split("@", 1) + if t in tool_name and sub.lower() in args_str.lower(): + return True + else: + if branch in tool_name: + return True + return False + + +# --------------------------------------------------------------------------- +# graph_overview() +# --------------------------------------------------------------------------- + +def graph_overview(graph) -> str: + """Render hypotheses + sources + pending leads as the strategist's + primary decision view. + + Annotates each hypothesis with the count of distinct sources that + contribute supporting (positive-LR) edges. A hypothesis with many edges + but only one source is a strategist signal to seek cross-source + corroboration. + """ + lines: list[str] = ["# Investigation State", ""] + + # Hypotheses table. + if graph.hypotheses: + lines.append(f"## Hypotheses ({len(graph.hypotheses)})") + lines.append("") + lines.append( + "| id | title | L | conf | status | edges_in | distinct_sources | recent_flip |" + ) + lines.append("|----|-------|---|------|--------|---------:|-----------------:|--------------|") + # Sort by absolute log-odds magnitude descending so the strategist + # sees the most decided hypotheses first; active ones float to the + # middle of the table where decisions matter most. + for hid, h in sorted( + graph.hypotheses.items(), + key=lambda kv: (kv[1].status != "active", -abs(kv[1].log_odds)), + ): + in_edges = graph._adj_rev.get(hid, []) + edges_in = len(in_edges) + # Distinct sources contributing edges (looked up via source + # phenomenon's source_id; entity→entity edges have no source). + distinct_sources: set[str] = set() + for e in in_edges: + src_node = graph.phenomena.get(e.source_id) + if src_node is not None and src_node.source_id: + distinct_sources.add(src_node.source_id) + # Did this hypothesis's status change in the last 2 rounds? + recent = "no" + recent_rounds = graph.investigation_rounds[-2:] + for r in recent_rounds: + before = r.hypothesis_status_snapshot_before.get(hid) + after = r.hypothesis_status_snapshot_after.get(hid) + if before and after and before != after: + recent = f"yes ({before}→{after} in R{r.round_number})" + break + title = (h.title or "")[:60].replace("|", "/") + lines.append( + f"| {hid[:14]} | {title} | {h.log_odds:+.2f} | " + f"{h.confidence:.2f} | {h.status} | {edges_in} | " + f"{len(distinct_sources)} | {recent} |" + ) + lines.append("") + else: + lines.append("## Hypotheses\n\n_(none yet — Phase 2 has not produced any)_\n") + + # Sources table. + if graph.case and graph.case.sources: + lines.append(f"## Sources ({len(graph.case.sources)})") + lines.append("") + lines.append( + "| id | type | phenomena | identities | last_touched_in_round |" + ) + lines.append("|----|------|----------:|-----------:|----------------------|") + for src in graph.case.sources: + ph_count = sum( + 1 for p in graph.phenomena.values() if p.source_id == src.id + ) + id_count = sum( + 1 for e in graph.entities.values() + for i in e.identifiers + if any( + p.source_id == src.id + for p in graph.phenomena.values() + if p.id == i.get("phenomenon_id") + ) + ) + # Latest round in which a tool invocation was made against this src. + last_r = "—" + for r in reversed(graph.investigation_rounds): + if r.new_phenomena_count > 0: + # Heuristic: if any phenomenon created during this round + # was on this source, mark this round as the last touch. + in_round = [ + p for p in graph.phenomena.values() + if p.source_id == src.id + and r.started_at <= p.created_at + and (not r.completed_at or p.created_at <= r.completed_at) + ] + if in_round: + last_r = f"R{r.round_number}" + break + lines.append( + f"| {src.id} | {src.type} | {ph_count} | {id_count} | {last_r} |" + ) + lines.append("") + + # Pending leads. + pending = [l for l in graph.leads if l.status == "pending"] + if pending: + lines.append(f"## Pending Leads ({len(pending)})") + lines.append("") + lines.append("| id | from | target_agent | for_hypothesis | description |") + lines.append("|----|------|--------------|----------------|-------------|") + for l in pending[:20]: + desc = (l.description or "")[:80].replace("|", "/") + mh = l.motivating_hypothesis or l.hypothesis_id or "—" + lines.append( + f"| {l.id} | {l.proposed_by or '—'} | {l.target_agent} | " + f"{mh[:14] if mh != '—' else '—'} | {desc} |" + ) + if len(pending) > 20: + lines.append(f"\n_(+{len(pending) - 20} more pending leads not shown)_") + lines.append("") + else: + lines.append("## Pending Leads\n\n_(none — no investigations queued)_\n") + + # Interpretation hint at the end, plain English. + lines.append("---") + lines.append( + "**Interpretation hints**: A hypothesis with many edges but only one " + "distinct_source has fragile cross-source independence — a single " + "edge from a *different* source would do more for it than another " + "edge from the same source (harmonic damping makes repeats cheap). " + "Hypotheses in the active band (0.2 < conf < 0.8) are the ones a " + "well-targeted lead can flip. recent_flip = 'yes' means belief is " + "still moving on that hypothesis; 'no' across 2 rounds suggests " + "stability." + ) + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# source_coverage(source_id) +# --------------------------------------------------------------------------- + +def source_coverage(graph, source_id: str) -> str: + """Render which expected artefact categories have been touched on + *source_id*, and which remain ✗. + + Output is markdown. The closing paragraph reminds the strategist that + coverage hints are heuristics — investigate ✗ items only when an active + hypothesis depends on them. This is the design's central guardrail + against the system devolving into a fixed forensic checklist. + """ + src = graph.case.get_source(source_id) if graph.case else None + if src is None: + return f"Error: source_id {source_id!r} not found in case." + + key = _key_for_source(src) + expected = EXPECTED_ARTEFACTS.get(key, []) + + # Collect this source's invocation history. + invs = [ + inv for inv in graph.tool_invocations.values() + if inv.source_id == source_id + ] + + # For each expected category, decide ✓ / ✗ + show example invocation if ✓. + rows: list[tuple[str, str, str, str]] = [] + for entry in expected: + name = entry["name"] + detector = entry["detector"] + value_for = entry["value_for"] + matched: str | None = None + for inv in invs: + args_str = "" + try: + args_str = " ".join(f"{k}={v}" for k, v in (inv.args or {}).items()) + except Exception: + args_str = str(inv.args) + if _detector_matches(detector, inv.tool, args_str): + matched = f"{inv.tool}({args_str[:60]})" + break + mark = "✓" if matched else "✗" + evidence = matched or "—" + rows.append((mark, name, evidence, value_for)) + + lines: list[str] = [ + f"# Coverage of source `{source_id}` ({src.label})", + "", + f"Source type: `{src.type}` / access_mode: `{src.access_mode}`", + f"Invocations made against this source: **{len(invs)}**", + "", + ] + if not expected: + lines.append( + f"_(no expected-artefact catalogue entry for source type `{key}` — " + "coverage cannot be assessed against a baseline)_" + ) + else: + lines.append( + "| ✓/✗ | category | example invocation | what it would tell us |" + ) + lines.append("|-----|----------|---------------------|------------------------|") + for mark, name, evidence, value_for in rows: + lines.append( + f"| {mark} | {name} | {evidence[:70].replace('|','/')} | {value_for} |" + ) + n_covered = sum(1 for r in rows if r[0] == "✓") + n_total = len(rows) + lines.append("") + lines.append(f"Coverage: **{n_covered}/{n_total}** ({n_covered*100//max(n_total,1)}%)") + + # Other invocations on this source that didn't match any expected entry — + # could be genuine novel exploration; strategist might want to know. + lines.append("") + lines.append("---") + lines.append( + "**Coverage hints are heuristics, not requirements.** Skip an item if " + "the case theory makes it irrelevant — a financial-fraud case has no " + "reason to OCR every photo. Investigate ✗ items only when they could " + "materially affect an active hypothesis. If you propose a lead just " + "because something is ✗, the strategist prompt is being misused." + ) + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# marginal_yield(last_n_rounds) +# --------------------------------------------------------------------------- + +def marginal_yield(graph, last_n_rounds: int = 2) -> str: + """Render the last N investigation rounds' yield deltas. + + Yield columns: + - new_phenomena: phenomena created during the round + - new_edges: edges (any direction) added during the round + - status_flips: hypotheses whose status changed during the round + + A row of zeros means that round didn't move the graph. Two consecutive + such rows is strong evidence of diminishing returns; the strategist + should consider declare_investigation_complete with reason + marginal_yield_zero. + """ + rounds = [r for r in graph.investigation_rounds if r.completed_at] + if not rounds: + return ( + "# Marginal Yield\n\n" + "_(no completed investigation rounds yet — yield not applicable)_" + ) + recent = rounds[-max(1, last_n_rounds):] + lines = [f"# Marginal Yield (last {len(recent)} of {len(rounds)} rounds)", ""] + lines.append("| round | new_phenomena | new_edges | status_flips |") + lines.append("|-------|--------------:|----------:|-------------:|") + yields: list[tuple[int, int, int]] = [] + for r in recent: + yields.append((r.new_phenomena_count, r.new_edges_count, r.status_flips)) + lines.append( + f"| R{r.round_number} | {r.new_phenomena_count} | " + f"{r.new_edges_count} | {r.status_flips} |" + ) + + # Trend interpretation aid. + lines.append("") + if all(y == (0, 0, 0) for y in yields): + trend = ( + "Yield is zero across these rounds — diminishing returns are " + "confirmed. Strongly consider declare_investigation_complete " + "(reason: marginal_yield_zero)." + ) + elif len(yields) >= 2: + first = yields[0][0] + yields[0][1] + yields[0][2] + last = yields[-1][0] + yields[-1][1] + yields[-1][2] + if last == 0 and first > 0: + trend = ( + "Yield collapsed to zero in the most recent round. One more " + "well-targeted probe is reasonable; another zero-yield round " + "after that means stop." + ) + elif last < first / 2 and first > 0: + trend = ( + f"Decelerating ({last}/{first} ≈ " + f"{int(100*last/first)}% of the earlier round). Diminishing " + "returns are accumulating." + ) + else: + trend = "Yield is still active — further investigation is paying off." + else: + trend = ( + "Only one completed round — too early to call a trend. Run at " + "least one more before considering completion." + ) + lines.append(f"**Trend**: {trend}") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# budget_status() +# --------------------------------------------------------------------------- + +def budget_status(graph, budgets: dict[str, Any] | None, start_time: float | None) -> str: + """Render budget usage against config.yaml `budgets` block. + + Counters: + - tool_calls: len(graph.tool_invocations) + - strategist_rounds: len(graph.investigation_rounds) + - wall_clock_minutes: now - start_time (when start_time is supplied) + """ + budgets = budgets or {} + tool_calls_used = len(graph.tool_invocations) + rounds_used = len(graph.investigation_rounds) + minutes_used: float | None = None + if start_time is not None: + minutes_used = (time.monotonic() - start_time) / 60.0 + + def _row(name: str, used: float, cap: Any) -> str: + if cap is None: + return f"| {name} | {used:g} | — | (unbounded) |" + pct = (used / cap) * 100 if cap else 0 + return f"| {name} | {used:g} | {cap} | {pct:.0f}% |" + + lines = ["# Budget Status", ""] + lines.append("| metric | used | cap | pct |") + lines.append("|--------|-----:|----:|----:|") + lines.append(_row("tool_calls", tool_calls_used, budgets.get("tool_calls_total"))) + lines.append(_row("strategist_rounds", rounds_used, budgets.get("strategist_rounds_max"))) + if minutes_used is not None: + lines.append(_row( + "wall_clock_minutes", round(minutes_used, 1), + budgets.get("wall_clock_minutes_max"), + )) + + # Pacing hint. + lines.append("") + flags = [] + cap_calls = budgets.get("tool_calls_total") + cap_rounds = budgets.get("strategist_rounds_max") + if cap_calls and tool_calls_used / cap_calls >= 0.9: + flags.append("tool_calls budget ≥ 90% used — favour declare_complete") + if cap_rounds and rounds_used / cap_rounds >= 0.7: + flags.append("strategist rounds ≥ 70% used — only propose leads with high expected yield") + if flags: + lines.append("**Budget warnings**:") + for f in flags: + lines.append(f"- {f}") + else: + lines.append( + "Budget room remains. Standard rule: each propose_lead should " + "name a specific hypothesis it expects to move; otherwise skip it." + ) + return "\n".join(lines)