feat(strategist) S2: graph_overview / source_coverage / marginal_yield / budget_status

DESIGN_STRATEGIST.md §2. Four read-only view tools the strategist uses
to ground its decision each round.

  graph_overview()      — hypotheses table (log_odds, conf, edges_in,
                          distinct_sources, recent_flip), sources table,
                          pending leads. distinct_sources is the
                          critical signal: a hypothesis with 23 edges
                          but only 1 distinct_source has fragile cross-
                          source independence and is a candidate for
                          a corroboration-seeking lead.
  source_coverage(src)  — per-source ✓/✗ against an expected-artefact
                          catalogue. Catalogue is heuristic hints,
                          NOT a forced checklist. Footer reminds the
                          strategist to investigate ✗ items only when
                          an active hypothesis depends on them — this
                          is the "应试能力存在但不被绑死" guardrail.
  marginal_yield(N)     — new phenomena / edges / status flips per
                          recent round. Two consecutive zero-yield
                          rounds = strong signal to declare complete.
  budget_status()       — usage vs caps (tool_calls, rounds, wall
                          clock). Pacing warnings at 70% / 90%.

tools/strategy.py also exports EXPECTED_ARTEFACTS, a per-source-type
table of (name, detector, value_for) entries. Detectors are
substring patterns on tool name + args; the matcher resolves at
call time against graph.tool_invocations. Catalogue covers iOS /
Android / Windows disk / media-collection / archive source types.

All four tools registered in tool_registry, listed as read-only in
llm_client.READ_ONLY_TOOLS for parallel execution. They go through
the invocation-logging wrapper so the strategist's reads are
themselves auditable (the wrapper does NOT cache them — graph
state changes between calls).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-21 02:19:54 -10:00
parent ca96f29849
commit 6ebbc675c1
4 changed files with 660 additions and 0 deletions

485
tools/strategy.py Normal file
View File

@@ -0,0 +1,485 @@
"""Strategist-loop tools — read-only views over graph state that let the
InvestigationStrategist agent decide whether to keep investigating or to
declare the investigation complete.
DESIGN_STRATEGIST.md §2. Four read-only views:
graph_overview() → hypotheses + sources + pending leads snapshot
source_coverage(src_id) → which artefact categories on this source have
been touched vs are still ✗
marginal_yield(n_rounds) → how much information the last N rounds added
budget_status() → tool calls / rounds / wall-clock against caps
These are pure render functions over the graph — they MUST NOT mutate state.
The strategist never writes phenomena/edges directly; all graph mutations
happen through worker agents that the strategist dispatches via propose_lead
(which is registered separately in tool_registry).
"""
from __future__ import annotations
import time
from typing import Any
# ---------------------------------------------------------------------------
# Expected artefact catalogue (per source type)
#
# These are SOFT HINTS — items the strategist might want to check on a given
# source type if any active hypothesis depends on them. The catalogue is
# intentionally compact; expand it in-place when a new forensic specialty
# joins the toolset. Each entry:
#
# name human-readable artefact category
# detector how to recognise that this category has been touched — either
# a tool name OR a `<tool>@<path-substring>` pattern, joined with
# `|` for alternatives. The matcher is substring on the tool name
# and on the args' string representation.
# value_for one-line description of why this category might matter
# ---------------------------------------------------------------------------
EXPECTED_ARTEFACTS: dict[str, list[dict[str, str]]] = {
"disk_image+windows": [
{"name": "partition layout", "detector": "partition_info|mmls",
"value_for": "deleted files, hidden partitions"},
{"name": "filesystem walk", "detector": "list_directory|fls",
"value_for": "directory tree, recoverable deleted entries"},
{"name": "registry hives", "detector": "parse_registry_key|list_installed_software|get_user_activity",
"value_for": "installed software, user activity, timezone"},
{"name": "browser history", "detector": "list_directory@AppData|read_text_file@History|read_text_file@Bookmarks",
"value_for": "URL access, downloads, web search terms"},
{"name": "prefetch", "detector": "parse_prefetch|extract_file@Prefetch",
"value_for": "program execution evidence"},
{"name": "email/IM config", "detector": "get_email_config",
"value_for": "user accounts, configured mail/IM clients"},
{"name": "recycle bin", "detector": "list_directory@$Recycle|count_deleted_files",
"value_for": "deleted file metadata and recovery"},
],
"disk_image+android": [
{"name": "partition probe", "detector": "probe_android_partitions",
"value_for": "discover EFS / SYSTEM / USERDATA layout"},
{"name": "system properties", "detector": "read_text_file@build.prop|read_text_file@default.prop",
"value_for": "device model, OS version, CSC region"},
{"name": "app inventory", "detector": "list_directory@data/app|list_directory@data/data",
"value_for": "installed apps, package names"},
{"name": "user data dbs", "detector": "list_directory@data/data|sqlite_query",
"value_for": "messages, contacts, app-specific data"},
{"name": "device identity", "detector": "search_strings@imei|search_strings@serial|search_strings@DRI",
"value_for": "IMEI, serial, device fingerprint"},
],
"mobile_extraction": [
{"name": "device info", "detector": "read_idevice_info|read_text_file@iDevice_info",
"value_for": "model, iOS version, IMEI, ICCID, Bluetooth MAC, UDID"},
{"name": "AddressBook", "detector": "sqlite_query@AddressBook.sqlitedb",
"value_for": "contacts, owner identity"},
{"name": "SMS / iMessage", "detector": "sqlite_query@sms.db",
"value_for": "messaging content, OTP / verification codes"},
{"name": "WhatsApp messages", "detector": "sqlite_query@ChatStorage.sqlite|sqlite_query@WhatsApp",
"value_for": "WhatsApp content, group membership, call records"},
{"name": "WeChat", "detector": "sqlite_query@MM.sqlite|sqlite_query@wcdb|list_directory@WeChat",
"value_for": "WeChat IDs, messages, follow targets"},
{"name": "Call history", "detector": "sqlite_query@CallHistory|sqlite_query@call_history",
"value_for": "incoming/outgoing call log"},
{"name": "Safari history", "detector": "sqlite_query@History.db|read_text_file@Bookmarks.plist|parse_plist@Bookmarks",
"value_for": "URL access, bookmarks, search queries"},
{"name": "Photos library", "detector": "sqlite_query@Photos.sqlite|parse_plist@Photos",
"value_for": "photo metadata, EXIF, geolocation, source app"},
{"name": "iCloud accounts", "detector": "parse_plist@Accounts3|parse_ios_keychain",
"value_for": "Apple ID, registered services, authentication tokens"},
{"name": "app inventory", "detector": "list_directory@Bundle/Application|list_directory@Containers",
"value_for": "installed apps, app-specific containers"},
{"name": "Wi-Fi history", "detector": "parse_plist@com.apple.wifi|read_text_file@known_networks",
"value_for": "connected SSIDs, keys, first/last seen times"},
],
"media_collection": [
{"name": "archive unpack", "detector": "unzip_archive|list_directory",
"value_for": "extract images / docs for downstream analysis"},
{"name": "OCR text", "detector": "ocr_image",
"value_for": "screenshot text content (chat, transaction, IDs)"},
{"name": "metadata", "detector": "read_binary_preview|search_strings",
"value_for": "EXIF, embedded timestamps, device fingerprints"},
],
"archive": [
{"name": "archive unpack", "detector": "unzip_archive",
"value_for": "expose contents for further analysis"},
],
}
def _key_for_source(src) -> str:
"""Return the EXPECTED_ARTEFACTS key for a source: 'disk_image+platform'
when platform is set in meta, otherwise just the source type."""
src_type = getattr(src, "type", "")
if src_type == "disk_image":
platform = (getattr(src, "meta", {}) or {}).get("platform", "").lower()
if platform:
return f"disk_image+{platform}"
return src_type
def _detector_matches(detector: str, tool_name: str, args_str: str) -> bool:
"""Return True if any '|'-separated branch of `detector` matches.
A branch like ``sqlite_query@AddressBook.sqlitedb`` requires both the
tool name (substring) AND the args (substring) to match. A branch like
``parse_prefetch`` is a tool-name-only check.
"""
for branch in detector.split("|"):
branch = branch.strip()
if not branch:
continue
if "@" in branch:
t, sub = branch.split("@", 1)
if t in tool_name and sub.lower() in args_str.lower():
return True
else:
if branch in tool_name:
return True
return False
# ---------------------------------------------------------------------------
# graph_overview()
# ---------------------------------------------------------------------------
def graph_overview(graph) -> str:
"""Render hypotheses + sources + pending leads as the strategist's
primary decision view.
Annotates each hypothesis with the count of distinct sources that
contribute supporting (positive-LR) edges. A hypothesis with many edges
but only one source is a strategist signal to seek cross-source
corroboration.
"""
lines: list[str] = ["# Investigation State", ""]
# Hypotheses table.
if graph.hypotheses:
lines.append(f"## Hypotheses ({len(graph.hypotheses)})")
lines.append("")
lines.append(
"| id | title | L | conf | status | edges_in | distinct_sources | recent_flip |"
)
lines.append("|----|-------|---|------|--------|---------:|-----------------:|--------------|")
# Sort by absolute log-odds magnitude descending so the strategist
# sees the most decided hypotheses first; active ones float to the
# middle of the table where decisions matter most.
for hid, h in sorted(
graph.hypotheses.items(),
key=lambda kv: (kv[1].status != "active", -abs(kv[1].log_odds)),
):
in_edges = graph._adj_rev.get(hid, [])
edges_in = len(in_edges)
# Distinct sources contributing edges (looked up via source
# phenomenon's source_id; entity→entity edges have no source).
distinct_sources: set[str] = set()
for e in in_edges:
src_node = graph.phenomena.get(e.source_id)
if src_node is not None and src_node.source_id:
distinct_sources.add(src_node.source_id)
# Did this hypothesis's status change in the last 2 rounds?
recent = "no"
recent_rounds = graph.investigation_rounds[-2:]
for r in recent_rounds:
before = r.hypothesis_status_snapshot_before.get(hid)
after = r.hypothesis_status_snapshot_after.get(hid)
if before and after and before != after:
recent = f"yes ({before}{after} in R{r.round_number})"
break
title = (h.title or "")[:60].replace("|", "/")
lines.append(
f"| {hid[:14]} | {title} | {h.log_odds:+.2f} | "
f"{h.confidence:.2f} | {h.status} | {edges_in} | "
f"{len(distinct_sources)} | {recent} |"
)
lines.append("")
else:
lines.append("## Hypotheses\n\n_(none yet — Phase 2 has not produced any)_\n")
# Sources table.
if graph.case and graph.case.sources:
lines.append(f"## Sources ({len(graph.case.sources)})")
lines.append("")
lines.append(
"| id | type | phenomena | identities | last_touched_in_round |"
)
lines.append("|----|------|----------:|-----------:|----------------------|")
for src in graph.case.sources:
ph_count = sum(
1 for p in graph.phenomena.values() if p.source_id == src.id
)
id_count = sum(
1 for e in graph.entities.values()
for i in e.identifiers
if any(
p.source_id == src.id
for p in graph.phenomena.values()
if p.id == i.get("phenomenon_id")
)
)
# Latest round in which a tool invocation was made against this src.
last_r = ""
for r in reversed(graph.investigation_rounds):
if r.new_phenomena_count > 0:
# Heuristic: if any phenomenon created during this round
# was on this source, mark this round as the last touch.
in_round = [
p for p in graph.phenomena.values()
if p.source_id == src.id
and r.started_at <= p.created_at
and (not r.completed_at or p.created_at <= r.completed_at)
]
if in_round:
last_r = f"R{r.round_number}"
break
lines.append(
f"| {src.id} | {src.type} | {ph_count} | {id_count} | {last_r} |"
)
lines.append("")
# Pending leads.
pending = [l for l in graph.leads if l.status == "pending"]
if pending:
lines.append(f"## Pending Leads ({len(pending)})")
lines.append("")
lines.append("| id | from | target_agent | for_hypothesis | description |")
lines.append("|----|------|--------------|----------------|-------------|")
for l in pending[:20]:
desc = (l.description or "")[:80].replace("|", "/")
mh = l.motivating_hypothesis or l.hypothesis_id or ""
lines.append(
f"| {l.id} | {l.proposed_by or ''} | {l.target_agent} | "
f"{mh[:14] if mh != '' else ''} | {desc} |"
)
if len(pending) > 20:
lines.append(f"\n_(+{len(pending) - 20} more pending leads not shown)_")
lines.append("")
else:
lines.append("## Pending Leads\n\n_(none — no investigations queued)_\n")
# Interpretation hint at the end, plain English.
lines.append("---")
lines.append(
"**Interpretation hints**: A hypothesis with many edges but only one "
"distinct_source has fragile cross-source independence — a single "
"edge from a *different* source would do more for it than another "
"edge from the same source (harmonic damping makes repeats cheap). "
"Hypotheses in the active band (0.2 < conf < 0.8) are the ones a "
"well-targeted lead can flip. recent_flip = 'yes' means belief is "
"still moving on that hypothesis; 'no' across 2 rounds suggests "
"stability."
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# source_coverage(source_id)
# ---------------------------------------------------------------------------
def source_coverage(graph, source_id: str) -> str:
"""Render which expected artefact categories have been touched on
*source_id*, and which remain ✗.
Output is markdown. The closing paragraph reminds the strategist that
coverage hints are heuristics — investigate ✗ items only when an active
hypothesis depends on them. This is the design's central guardrail
against the system devolving into a fixed forensic checklist.
"""
src = graph.case.get_source(source_id) if graph.case else None
if src is None:
return f"Error: source_id {source_id!r} not found in case."
key = _key_for_source(src)
expected = EXPECTED_ARTEFACTS.get(key, [])
# Collect this source's invocation history.
invs = [
inv for inv in graph.tool_invocations.values()
if inv.source_id == source_id
]
# For each expected category, decide ✓ / ✗ + show example invocation if ✓.
rows: list[tuple[str, str, str, str]] = []
for entry in expected:
name = entry["name"]
detector = entry["detector"]
value_for = entry["value_for"]
matched: str | None = None
for inv in invs:
args_str = ""
try:
args_str = " ".join(f"{k}={v}" for k, v in (inv.args or {}).items())
except Exception:
args_str = str(inv.args)
if _detector_matches(detector, inv.tool, args_str):
matched = f"{inv.tool}({args_str[:60]})"
break
mark = "" if matched else ""
evidence = matched or ""
rows.append((mark, name, evidence, value_for))
lines: list[str] = [
f"# Coverage of source `{source_id}` ({src.label})",
"",
f"Source type: `{src.type}` / access_mode: `{src.access_mode}`",
f"Invocations made against this source: **{len(invs)}**",
"",
]
if not expected:
lines.append(
f"_(no expected-artefact catalogue entry for source type `{key}` — "
"coverage cannot be assessed against a baseline)_"
)
else:
lines.append(
"| ✓/✗ | category | example invocation | what it would tell us |"
)
lines.append("|-----|----------|---------------------|------------------------|")
for mark, name, evidence, value_for in rows:
lines.append(
f"| {mark} | {name} | {evidence[:70].replace('|','/')} | {value_for} |"
)
n_covered = sum(1 for r in rows if r[0] == "")
n_total = len(rows)
lines.append("")
lines.append(f"Coverage: **{n_covered}/{n_total}** ({n_covered*100//max(n_total,1)}%)")
# Other invocations on this source that didn't match any expected entry —
# could be genuine novel exploration; strategist might want to know.
lines.append("")
lines.append("---")
lines.append(
"**Coverage hints are heuristics, not requirements.** Skip an item if "
"the case theory makes it irrelevant — a financial-fraud case has no "
"reason to OCR every photo. Investigate ✗ items only when they could "
"materially affect an active hypothesis. If you propose a lead just "
"because something is ✗, the strategist prompt is being misused."
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# marginal_yield(last_n_rounds)
# ---------------------------------------------------------------------------
def marginal_yield(graph, last_n_rounds: int = 2) -> str:
"""Render the last N investigation rounds' yield deltas.
Yield columns:
- new_phenomena: phenomena created during the round
- new_edges: edges (any direction) added during the round
- status_flips: hypotheses whose status changed during the round
A row of zeros means that round didn't move the graph. Two consecutive
such rows is strong evidence of diminishing returns; the strategist
should consider declare_investigation_complete with reason
marginal_yield_zero.
"""
rounds = [r for r in graph.investigation_rounds if r.completed_at]
if not rounds:
return (
"# Marginal Yield\n\n"
"_(no completed investigation rounds yet — yield not applicable)_"
)
recent = rounds[-max(1, last_n_rounds):]
lines = [f"# Marginal Yield (last {len(recent)} of {len(rounds)} rounds)", ""]
lines.append("| round | new_phenomena | new_edges | status_flips |")
lines.append("|-------|--------------:|----------:|-------------:|")
yields: list[tuple[int, int, int]] = []
for r in recent:
yields.append((r.new_phenomena_count, r.new_edges_count, r.status_flips))
lines.append(
f"| R{r.round_number} | {r.new_phenomena_count} | "
f"{r.new_edges_count} | {r.status_flips} |"
)
# Trend interpretation aid.
lines.append("")
if all(y == (0, 0, 0) for y in yields):
trend = (
"Yield is zero across these rounds — diminishing returns are "
"confirmed. Strongly consider declare_investigation_complete "
"(reason: marginal_yield_zero)."
)
elif len(yields) >= 2:
first = yields[0][0] + yields[0][1] + yields[0][2]
last = yields[-1][0] + yields[-1][1] + yields[-1][2]
if last == 0 and first > 0:
trend = (
"Yield collapsed to zero in the most recent round. One more "
"well-targeted probe is reasonable; another zero-yield round "
"after that means stop."
)
elif last < first / 2 and first > 0:
trend = (
f"Decelerating ({last}/{first}"
f"{int(100*last/first)}% of the earlier round). Diminishing "
"returns are accumulating."
)
else:
trend = "Yield is still active — further investigation is paying off."
else:
trend = (
"Only one completed round — too early to call a trend. Run at "
"least one more before considering completion."
)
lines.append(f"**Trend**: {trend}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# budget_status()
# ---------------------------------------------------------------------------
def budget_status(graph, budgets: dict[str, Any] | None, start_time: float | None) -> str:
"""Render budget usage against config.yaml `budgets` block.
Counters:
- tool_calls: len(graph.tool_invocations)
- strategist_rounds: len(graph.investigation_rounds)
- wall_clock_minutes: now - start_time (when start_time is supplied)
"""
budgets = budgets or {}
tool_calls_used = len(graph.tool_invocations)
rounds_used = len(graph.investigation_rounds)
minutes_used: float | None = None
if start_time is not None:
minutes_used = (time.monotonic() - start_time) / 60.0
def _row(name: str, used: float, cap: Any) -> str:
if cap is None:
return f"| {name} | {used:g} | — | (unbounded) |"
pct = (used / cap) * 100 if cap else 0
return f"| {name} | {used:g} | {cap} | {pct:.0f}% |"
lines = ["# Budget Status", ""]
lines.append("| metric | used | cap | pct |")
lines.append("|--------|-----:|----:|----:|")
lines.append(_row("tool_calls", tool_calls_used, budgets.get("tool_calls_total")))
lines.append(_row("strategist_rounds", rounds_used, budgets.get("strategist_rounds_max")))
if minutes_used is not None:
lines.append(_row(
"wall_clock_minutes", round(minutes_used, 1),
budgets.get("wall_clock_minutes_max"),
))
# Pacing hint.
lines.append("")
flags = []
cap_calls = budgets.get("tool_calls_total")
cap_rounds = budgets.get("strategist_rounds_max")
if cap_calls and tool_calls_used / cap_calls >= 0.9:
flags.append("tool_calls budget ≥ 90% used — favour declare_complete")
if cap_rounds and rounds_used / cap_rounds >= 0.7:
flags.append("strategist rounds ≥ 70% used — only propose leads with high expected yield")
if flags:
lines.append("**Budget warnings**:")
for f in flags:
lines.append(f"- {f}")
else:
lines.append(
"Budget room remains. Standard rule: each propose_lead should "
"name a specific hypothesis it expects to move; otherwise skip it."
)
return "\n".join(lines)