Compare commits

..

10 Commits

Author SHA1 Message Date
BattleTag
8b964b5dec docs(strategist) S8/9: DESIGN.md updates + DESIGN_STRATEGIST.md spec
DESIGN_STRATEGIST.md §11. The strategist refit is the first sub-design
big enough to need its own document, so it lives as a sibling to
DESIGN.md rather than inline.

DESIGN_STRATEGIST.md (new, 543 lines) covers:
  §0  Scope, non-goals, invariants preserved
  §1  Data model (Lead extension, InvestigationRound)
  §2  Six tools (graph_overview / source_coverage / marginal_yield /
      budget_status / propose_lead / declare_investigation_complete)
      with full input_schema
  §3  InvestigationStrategist agent class
  §4  Orchestrator Phase 3 loop pseudocode
  §5  Persistence + resume strategy
  §6  config schema
  §7  Test plan (8 scenarios)
  §8  9-step build order (matches commit history)
  §9  Risks + mitigations
  §10 Open questions
  §11 Required DESIGN.md updates (applied here)
  §12 What this design does NOT solve (exam-test coverage, vision-
      capable LLM, blockchain explorer, etc.)

DESIGN.md updates per §11:
  §4.5  Note harmonic damping is now landed
  §4.9  Phase 3 table row now points at the strategist loop +
        inline summary
  §5    Lead + InvestigationRound rows added to the data-model
        summary table

This commit closes the strategist refit. All 174 tests pass / 1 skipped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:28:06 -10:00
BattleTag
388321ee30 feat(strategist) S7: strategist resume / open-round repair
DESIGN_STRATEGIST.md §5. Support resume from a crash mid-strategist-loop.

_resume_strategist_state inspects investigation_rounds for a tail entry
without completed_at — an "open" round, i.e. one that started but never
closed. Two repairs:

  1. Mark the round closed with strategist_action="interrupted_resume"
     so the run history reflects what actually happened.
  2. Walk that round's leads; any still in "assigned" state are
     re-marked as "failed" with failure_reason="interrupted before
     complete". The Retry-failed-leads + Gap-analysis passes that run
     after the strategist loop can pick them up.

Returns max(round_number) + 1 — the round at which to resume the loop.
On a clean graph (no prior rounds) returns 1 and makes no changes.

_phase3_strategist_loop now calls this helper before the main for-loop
and uses its return value as start_round, so a resume run lands at the
right round number rather than restarting from R1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:27:05 -10:00
BattleTag
093f3cec1f feat(strategist) S6: config.example.yaml schema for strategist + budgets
DESIGN_STRATEGIST.md §6. Document the strategist loop's tunables so
operators can override defaults without code changes.

config.yaml itself is gitignored (it carries the API key), so this
commit adds config.example.yaml as the tracked schema reference.
The runtime reads config.yaml; operators copy the example as a
starting point.

  strategist.enabled       — default true; false routes Phase 3 through
                             the legacy fixed-round loop instead.
  strategist.max_rounds    — orchestrator cap (default 10).
  strategist.hard_stop_marginal_yield_zero_rounds — safety net for
                             over-eager strategist + zero-yielding
                             workers (default 3).
  budgets.tool_calls_total — global tool-call hard cap.
  budgets.strategist_rounds_max — informational, surfaced via
                             budget_status (orchestrator enforces
                             via strategist.max_rounds instead).
  budgets.wall_clock_minutes_max — wall-clock hard cap.

Comment out any budget cap to make it unbounded — Orchestrator's
_budget_exceeded treats missing caps as no-op.

Legacy max_investigation_rounds is kept as the fallback used only when
strategist.enabled is false; documented inline.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:26:12 -10:00
BattleTag
a103c17bdb feat(strategist) S5: Phase 3 strategist loop in orchestrator
DESIGN_STRATEGIST.md §4. Replace the fixed-round hypothesis-directed
loop with a belief-driven strategist loop that runs the strategist
agent once per round and dispatches the leads it proposes.

New helpers on Orchestrator:
  _budget_exceeded()              hard budget caps (tool_calls,
                                  wall_clock_minutes), complementing
                                  strategist self-throttling.
  _execute_strategist_lead(lead)  dispatch one lead serially; the
                                  next strategist round sees the
                                  cumulative effect of this lead's
                                  graph mutations.
  _phase3_strategist_loop()       main loop. Open round, run strategist,
                                  exit on declare_complete or empty
                                  proposals, otherwise dispatch each
                                  lead, judge new phenomena, close round,
                                  apply yield/budget checks.
  _phase3_legacy_loop()           fallback when strategist.enabled is
                                  false. Identical to the
                                  pre-DESIGN_STRATEGIST behaviour.

The run() entry point branches on strategist_cfg.enabled (default
true) and always follows up with _retry_failed_leads() + Gap
Analysis + mark_remaining_inconclusive() regardless of variant.

Orchestrator.__init__ also wires graph.budgets and
graph.run_start_monotonic from config so the budget_status tool
sees real numbers.

Integration tests use a mock strategist + mock workers to verify
declare_complete, propose_lead -> worker dispatch, zero-yield-streak
hard stop, and budget-cap-stops-the-loop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:25:04 -10:00
BattleTag
65745d21dc feat(strategist) S4: InvestigationStrategist agent
DESIGN_STRATEGIST.md §3. The smallest possible agent — its entire
output per round is one decision: propose 1-3 leads (each citing a
real hypothesis it expects to move) OR declare the investigation
complete with a reason.

Constraint surface:
  mandatory_record_tools = ("propose_lead", "declare_investigation_complete")
  terminal_tools         = ("declare_investigation_complete",)

The agent inherits the BaseAgent forced-retry mechanism: if it returns
without calling either action tool, the orchestrator force-prompts a
RECORD-only retry. declare_complete being terminal means the
tool_call_loop short-circuits the moment the strategist decides
we're done.

_register_graph_tools overrides BaseAgent's default to skip
_register_graph_write_tools entirely — the strategist NEVER writes
phenomena, entities, edges, or hypotheses directly. All graph
mutations come from the workers it dispatches via leads. This keeps
the planning agent's responsibility surface narrow: read the graph,
choose what to do next, that's it.

Prompt walks through the workflow (call graph_overview / marginal_
yield / budget_status / source_coverage first, then take exactly
one terminal action) with decision criteria for propose vs stop.

Registered in agent_factory._AGENT_CLASSES["strategist"].

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:22:05 -10:00
BattleTag
ff3a05d7ce feat(strategist) S3: propose_lead / declare_investigation_complete
DESIGN_STRATEGIST.md §2.5. The strategist's two write actions.

propose_lead validates motivating_hypothesis exists in the graph,
validates expected_evidence_type is a real edge type, validates
source_id refers to a real source in the case — fast specific
errors so the strategist gets fixable feedback rather than a
generic crash. On success, calls graph.add_lead with proposed_by=
"strategist" and round_number=graph.current_strategist_round so
the round-completion code can collect this round's leads.

declare_investigation_complete sets graph.strategist_complete_requested
which the orchestrator inspects after each strategist run to decide
whether to break the loop. reason must come from a closed enum so
the audit log is consistent.

EvidenceGraph gains two transient run-context fields:
  current_strategist_round       — set by orchestrator at start of round
  strategist_complete_requested  — flipped by declare_complete

These are intentionally NOT persisted — they're per-run flags, not
graph state.

Both tools required to be in InvestigationStrategist.mandatory_record_
tools (added in S4) so the agent's forced-retry mechanism kicks in if
it returns without taking a documented decision.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:21:13 -10:00
BattleTag
6ebbc675c1 feat(strategist) S2: graph_overview / source_coverage / marginal_yield / budget_status
DESIGN_STRATEGIST.md §2. Four read-only view tools the strategist uses
to ground its decision each round.

  graph_overview()      — hypotheses table (log_odds, conf, edges_in,
                          distinct_sources, recent_flip), sources table,
                          pending leads. distinct_sources is the
                          critical signal: a hypothesis with 23 edges
                          but only 1 distinct_source has fragile cross-
                          source independence and is a candidate for
                          a corroboration-seeking lead.
  source_coverage(src)  — per-source ✓/✗ against an expected-artefact
                          catalogue. Catalogue is heuristic hints,
                          NOT a forced checklist. Footer reminds the
                          strategist to investigate ✗ items only when
                          an active hypothesis depends on them — this
                          is the "应试能力存在但不被绑死" guardrail.
  marginal_yield(N)     — new phenomena / edges / status flips per
                          recent round. Two consecutive zero-yield
                          rounds = strong signal to declare complete.
  budget_status()       — usage vs caps (tool_calls, rounds, wall
                          clock). Pacing warnings at 70% / 90%.

tools/strategy.py also exports EXPECTED_ARTEFACTS, a per-source-type
table of (name, detector, value_for) entries. Detectors are
substring patterns on tool name + args; the matcher resolves at
call time against graph.tool_invocations. Catalogue covers iOS /
Android / Windows disk / media-collection / archive source types.

All four tools registered in tool_registry, listed as read-only in
llm_client.READ_ONLY_TOOLS for parallel execution. They go through
the invocation-logging wrapper so the strategist's reads are
themselves auditable (the wrapper does NOT cache them — graph
state changes between calls).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:19:54 -10:00
BattleTag
ca96f29849 feat(strategist) S1: Lead extension + InvestigationRound model
DESIGN_STRATEGIST.md §1. Foundation for the Phase 3 strategist loop.

Lead now carries four annotations that let the orchestrator measure
marginal yield per lead and dedupe strategist proposals:
  - proposed_by         (agent that proposed it: "strategist", "filesystem", …)
  - motivating_hypothesis (hyp-id the lead is meant to corroborate/refute)
  - expected_evidence_type (edge type the lead's worker should produce)
  - round_number        (0 = Phase 1 lead, ≥1 = strategist-proposed)

add_lead idempotently dedupes strategist proposals on
(motivating_hypothesis, expected_evidence_type, target_agent, source_id)
to prevent the "strategist loops on the same lead" failure mode.

New InvestigationRound dataclass records per-round provenance: before/
after hypothesis status snapshots, phenomena + edge count deltas, and
the strategist's decision_rationale. ``new_phenomena_count``,
``new_edges_count``, ``status_flips`` are derived properties that the
marginal_yield tool will use.

start_investigation_round / complete_investigation_round /
get_investigation_round / latest_round / leads_from_round complete the
lifecycle. complete is idempotent on already-closed rounds.

Lead.from_dict is forward-compat for state files written before this
commit. InvestigationRound persists as a top-level list in
graph_state.json (auto-save + load_state both wired).

EvidenceGraph also gains graph.budgets and graph.run_start_monotonic
fields that the budget_status view (S2) will read; orchestrator
populates them in S5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:18:35 -10:00
BattleTag
8020c24776 fix(graph): harmonic damping for repeated same-edge_type evidence
First full-case run (runs/2026-05-20T20-15-04/) produced hypotheses
with log_odds +31 (8 direct_evidence + 15 supports). That's the
naive-Bayes independence assumption breaking down: 15 different
phenomena all "supporting" the same hypothesis from one source are
not 15 independent pieces of evidence, they're highly correlated.
DESIGN.md §4.5 last bullet flagged this as a "未实施旋钮" — this
commit implements it.

Rule: the k-th edge of a given (hyp_id, edge_type) contributes
log_lr_base / k instead of log_lr_base. Cumulative is harmonic
sum H_N, bounded by ~ ln N. Single-edge hypotheses unaffected
(k=1 → /1 → no change). Replaying the 2026-05-20 graph's 108
edges under the new rule pulls the top hypothesis from +31.0 →
+8.75; the smallest active hypothesis from +4.0 → +2.08.

Also adds rank + log_lr_base to confidence_log entries so the
math is auditable from the persisted graph.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:16:37 -10:00
BattleTag
f04ccd4bc7 fix(base_agent): forced-retry iter cap 10→30 + narrow tools to record+read
Timeline agent on the 2026-05-20 full run produced 0 phenomena: initial
round hit max_iterations=60 cap before recording, forced retry then hit
max_iterations=10 cap because every grounding-rejected call burns one
iteration in the new gateway. Two changes restore depth without re-
introducing the original "agent wanders off and never records" failure:

  1. Raise retry cap 10 → 30. With grounding auto-rescue (prev commit)
     most rejections heal on the first retry, but some still need 2-3
     turns; 10 is empirically too tight, 30 leaves headroom.

  2. Narrow the retry tool surface to RECORD + graph-write +
     read-only-graph-query tools. Investigation tools (list_directory,
     sqlite_query, parse_registry_key) are dropped on retry so the agent
     can't restart its search loop — the retry is explicitly "record
     what you already found, then stop".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:15:08 -10:00
12 changed files with 2861 additions and 50 deletions

View File

@@ -172,9 +172,11 @@ verified facts带重跑指令的引证与 interpretation明确标注的
- 阈值不变≥0.8 supported / ≤0.2 refuted只是改由 `L_post` 推出。
- `prior_prob` 成为可配置量(默认 0.5 → `L_prior=0`)。
- **简化假设说明**:多条边按独立处理(朴素贝叶斯)。同类证据反复出现并非
完全独立——加一个旋钮:同 `(hypothesis, edge_type)` 的边数封顶或衰减,避免
同一发现被多 agent 重复入图」虚高置信度(现有 Jaccard 去重已部分缓解)。
- **同类证据调和衰减**2026-05 落地):同 `(hypothesis, edge_type)` 的第 k 条边
贡献 `log_lr_base / k`。累计 = `log_lr_base · H_N`(调和级数,~ ln N
解决朴素贝叶斯独立性破产 + 同一发现被多 agent 重复入图导致 L=+31 的失控
2026-05-20 实战数据。单条边不变k=1, 衰减=1.0)。**结构信号**比绝对值
更重要strategist 看 `distinct_sources` 比看 confidence 数值更能判断证据厚度。
附带产出一个 **假设 × 证据矩阵**视图,供报告与线索选择使用。
@@ -235,11 +237,19 @@ network=浏览器/PCAP。改为按**调查职能**组织,并增加平台特
|---|---|
| Phase 1 | 「单镜像初勘」→ **逐源并行 triage**,每源派类型适配的 agent |
| Phase 2 | 假设跨源生成;身份共指假设在此首次登记 |
| Phase 3 | leads 派发到源感知 agent假设×证据矩阵实时更新 |
| Phase 3 | **Strategist 循环**LLM 元 agent 每轮看图决定 propose_lead 或 declare_completeworkers 执行 leadhypothesis 边重判 — 详见 `DESIGN_STRATEGIST.md` |
| Phase 4 | 跨源时间线合并,**按源做时区归一**iOS UTC vs 安卓本地时间) |
| Phase 5 | 一案一份综合报告:含假设结论、实体关联图、每条结论的 provenance 引证 |
断连恢复、运行归档逻辑保留,`graph_state.json` 增量纳入新字段
**Phase 3 的"LLM 决定深度"**2026-05 实战暴露 Phase 3 单轮触发 + log-odds 通胀致使 8 个 pending leads 一个未派发后落地):调度层从代码硬决策("max_rounds=N, converged→stop")转为 LLM 元 agent 驱动
- 新 agent `InvestigationStrategist``agents/strategist.py`每轮取一个动作propose 1-3 lead或 declare_investigation_complete
- 4 个只读视图工具:`graph_overview` / `source_coverage` / `marginal_yield` / `budget_status``tools/strategy.py`)让 LLM 看到调度信号
- 2 个写入决策工具:`propose_lead` / `declare_investigation_complete` 是 strategist 的 mandatory_record
- 编排器读 `config.yaml:strategist.*` + `config.yaml:budgets.*` 控制 max_rounds 和 hard caps
- 看 `[[DESIGN_STRATEGIST]]` 获取完整数据模型、prompt 设计、断连恢复、风险/缓解
断连恢复、运行归档逻辑保留;`graph_state.json` 新增 `investigation_rounds[]` 数组持久化 strategist 每轮决策。
---
@@ -252,8 +262,10 @@ network=浏览器/PCAP。改为按**调查职能**组织,并增加平台特
| `Phenomenon` | + `source_id`description 拆为 `verified_facts[]` + `interpretation`;澄清/移除语义含混的 `confidence`(默认 1.0),观测的可靠性由 grounding 表达 |
| `Hypothesis` | + `prior_prob`、`log_odds`(累加量);`confidence` 改为派生值 |
| `Entity` | + 类型化标识符集合;通过 `same_as` 边跨源连通 |
| Phenomenon→Hypothesis 边 | 携带 `edge_type`,映射到 `log₁₀(LR)`(替换 `_DEFAULT_EDGE_WEIGHTS` |
| Phenomenon→Hypothesis 边 | 携带 `edge_type`,映射到 `log₁₀(LR)`(替换 `_DEFAULT_EDGE_WEIGHTS`;同 `(hyp, edge_type)` 的第 k 条边按 `1/k` 调和衰减 |
| Entity→Entity 边 | **新增** `same_as`(由 coref 假设背书,可逆) |
| `Lead` | + `proposed_by` / `motivating_hypothesis` / `expected_evidence_type` / `round_number`strategist 注解) |
| `InvestigationRound` | **新增**strategist 每轮决策的 provenance + before/after 快照 + 收益指标 |
`evidence_graph.py` 的 `VALID_EDGE_TYPES`、序列化/反序列化、Jaccard 去重相应适配。

543
DESIGN_STRATEGIST.md Normal file
View File

@@ -0,0 +1,543 @@
# Strategist Loop —— Phase 3 信念驱动改造
> 这是 DESIGN.md 的补充设计文档,针对 §4.9 编排器 Phase 3 的具体重写。
>
> **触发动因**2026-05-20 第一次全 6-source 实战(`runs/2026-05-20T20-15-04/`
> 暴露 Phase 3 不工作——8 条 pending leads 一个都没派发,因为
> log-odds 通胀让所有 hypothesis 立即 converged。即使在「调和衰减」修复
> log-odds 数学后commit 在 `evidence_graph.py:update_hypothesis_confidence`
> Phase 3 在当前架构下仍然是「单轮触发、规则收敛」的机械流程——LLM
> 在调度层完全没有发言权。本设计把 Phase 3 改为 LLM 驱动的探索循环。
---
## 0. 范围
### 做什么
`orchestrator.py:Phase 3` 从「单轮、规则触发」改造为「strategist-loop、信念驱动」
新增一个 `InvestigationStrategist` agent + 4 个决策视图工具 + 2 个决策动作工具
+ 编排器循环改写。
### 不做什么
- 不改 Phase 1per-source triage 保持现状)
- 不改 Phase 2HypothesisAgent 不动strategist 可以**调用**它,但不替代)
- 不改 Phase 4/5timeline / report
- 不写专家级 per-source 检查清单(只在 `source_coverage` 工具里塞**软提示**清单)
- 不引入新的图节点类型leads 复用现有结构
### 保留的不变式
- DESIGN.md §4.3 grounding 网关,所有写入仍走它
- DESIGN.md §4.5 log-odds + 调和衰减
- DESIGN.md §4.4 verified_facts vs interpretation 划界
- 断连恢复(`graph_state.json` 序列化兼容)
### 设计原则
1. **"LLM 提议,代码裁决" 上移到调度层**DESIGN.md 第一原则现在只在事实层
grounding兑现调度层「该不该深入、深入哪里、何时停」目前是代码硬决策。
本设计让 LLM 持有调度决策权。
2. **应试能力存在但不被绑死**:系统的工具集和软提示清单覆盖应试场景所需的工件
类别;但是否查某个工件、查到什么深度,由 strategist 看具体案件性质决定,
不被预定义清单强制。
3. **可解释、可审计**:每一轮 strategist 决策、动机、产出收益都被记入持久化的
`InvestigationRound`,可事后复盘。
---
## 1. 数据模型变更
### 1.1 `Lead` 扩 4 字段
`evidence_graph.py:Lead` 现有 `(id, title, description, target_agent, source_id, status, …)`
新增:
```python
@dataclass
class Lead:
# ... existing fields
proposed_by: str = "" # "strategist" | "filesystem" | ... — 提案 agent
motivating_hypothesis: str = "" # hyp-id this lead is meant to corroborate/refute
expected_evidence_type: str = "" # one of edge_types — 期望产出的边类型
round_number: int = 0 # 哪一轮 strategist 产生
```
`motivating_hypothesis` 是关键——它把 lead 和 hypothesis 显式挂钩,让事后能算
"这条 lead 跑完到底有没有改变假设状态",即 strategist 的边际收益度量。
### 1.2 新增 `InvestigationRound` 节点
记录每一轮 strategist 的决策本身——provenance 也要可审计:
```python
@dataclass
class InvestigationRound:
id: str # "round-001"
round_number: int
started_at: str
completed_at: str = ""
strategist_action: str = "" # "propose_leads" | "declare_complete"
leads_proposed: list[str] = field(default_factory=list)
leads_executed: list[str] = field(default_factory=list)
hypothesis_status_snapshot_before: dict = field(default_factory=dict) # hyp_id → status
hypothesis_status_snapshot_after: dict = field(default_factory=dict)
new_phenomena_count: int = 0
new_edges_count: int = 0
decision_rationale: str = "" # strategist 自述
```
随 graph 序列化(加进 `to_dict`/`from_dict`)。
---
## 2. 新工具
放在新文件 `tools/strategy.py`。按现有 `TOOL_CATALOG` 注册模式登记。
### 2.1 `graph_overview()` — 全局态势(只读)
**Signature**: `graph_overview() -> str`
**输出**markdown比 JSON 更易 LLM 解读):
```markdown
# Investigation State
## Hypotheses (8)
| id | title | L | conf | status | edges_in | distinct_sources | flipped_in_last_2_rounds |
|----|-------|---|------|--------|----------|------------------|---------------------------|
| hyp-83db8748 | Multi-Device Composite | +8.75 | 0.99 | supported | 23 | 1 | no |
| hyp-daa7c704 | Multiple Identity Aliases | +9.21 | 0.99 | supported | 11 | 3 | no |
| hyp-7fa9b13e | Sunny.zip contains timer_a | +2.08 | 0.99 | supported | 4 | 1 | yes (active→supported in R2) |
| ...
## Sources (6)
| id | type | phenomena | identities | last_touched_in_round |
| src-usb-leung | disk_image | 8 | 1 | R1 |
| ...
## Pending Leads (3)
| id | from | targeting | for_hypothesis | reason |
| lead-aaa | filesystem | src-ios-chan/Safari | hyp-83db8748 | Safari history likely contains device-switching evidence |
```
**关键标注**`distinct_sources` 一栏暴露了"这个假设只靠一个源支撑"——strategist
看到 23 边都来自 android 源会自动判断"需要从别处独立证据"。
### 2.2 `source_coverage(source_id: str)` — 单源覆盖度(只读)
**Signature**: `source_coverage(source_id: str) -> str`
**实现**:扫 `graph.tool_invocations`,过滤 `source_id == 该源`,按工具名 + 主要 args
分组。然后跟 `EXPECTED_ARTEFACTS[source_type]` 比对,未触达项打 ✗。
```python
# tools/strategy.py
EXPECTED_ARTEFACTS: dict[str, list[dict]] = {
"disk_image+windows": [
{"name": "filesystem layout", "detector": "fls|mmls", "value_for": "deleted files, hidden partitions"},
{"name": "registry hives", "detector": "parse_registry_key", "value_for": "user activity, installed software"},
{"name": "browser history", "detector": "list_directory@AppData/.../History", "value_for": "URL access, downloads"},
{"name": "prefetch", "detector": "extract_file@Windows/Prefetch", "value_for": "program execution evidence"},
# ...
],
"mobile_extraction": [
{"name": "AddressBook", "detector": "sqlite_query@AddressBook.sqlitedb", "value_for": "contacts"},
{"name": "SMS messages", "detector": "sqlite_query@sms.db", "value_for": "messaging content"},
{"name": "WhatsApp messages", "detector": "sqlite_query@ChatStorage.sqlite", "value_for": "WhatsApp content"},
{"name": "Call history", "detector": "sqlite_query@CallHistoryDB", "value_for": "call records"},
{"name": "Safari history", "detector": "sqlite_query@History.db|read_text@Bookmarks.plist", "value_for": "web browsing"},
{"name": "Photos library", "detector": "sqlite_query@Photos.sqlite", "value_for": "photo metadata, EXIF, geolocation"},
{"name": "iCloud accounts", "detector": "parse_plist@Accounts3.sqlite|parse_keychain", "value_for": "Apple ID, services"},
{"name": "App inventory", "detector": "list_directory@var/containers/Bundle/Application", "value_for": "installed apps"},
],
"disk_image+android": [...],
"media_collection": [
{"name": "OCR text", "detector": "ocr_image", "value_for": "screenshot text"},
{"name": "EXIF metadata", "detector": "exif_image", "value_for": "device, timestamps, geolocation"},
],
}
```
**软提示语义**output 末尾必带一句:
> Coverage hints are heuristics, not requirements. Skip an item if the case theory
> makes it irrelevant. Investigate ✗ items only when they could materially affect
> an active hypothesis.
这一句是**"应试能力存在但不被绑死"的关键**——LLM 看到 ✗ 不会盲投,会先看
hypothesis 列表问"这个工件对当前任何 hypothesis 有意义吗"。
### 2.3 `marginal_yield(last_n_rounds: int = 2)` — 边际收益(只读)
**Signature**: `marginal_yield(last_n_rounds: int = 2) -> str`
**实现**:扫最近 N 个 `InvestigationRound`,统计:
- 每轮新增 phenomena 数
- 每轮新增 P→H 边数
- 每轮 hypothesis status flips 数active→supported / 反向)
**输出**
```markdown
# Marginal Yield (last 2 rounds)
| round | new_phenomena | new_edges | status_flips |
| R3 | 5 | 7 | 1 |
| R4 | 2 | 1 | 0 |
Trend: decelerating (R4 yield 33% of R3).
Recommendation interpretation aid: yield trending to zero suggests diminishing
returns; consider declare_complete after one more probe.
```
最后一行是 LLM-friendly heuristic prose不是强制信号。
### 2.4 `budget_status()` — 预算视图(只读)
**Signature**: `budget_status() -> str`
```markdown
# Budget Status
| metric | used | cap | pct |
| tool_calls | 1248 | 5000 | 25% |
| strategist_rounds | 3 | 10 | 30% |
| wall_clock_minutes | 142 | 360 | 39% |
Phase 1 used 89% of allocated. Phase 2 used 4%. Phase 3 (strategist) so far: 7%.
```
预算从 config.yaml 读,新增字段见 §6。无预算配置时进 unbounded 模式(仅靠
strategist 自宣 complete + hard safety cap
### 2.5 决策动作工具(写入)
注册到 strategist 的 `mandatory_record_tools`。Strategist 每轮必须 call 至少一个,
否则 forced-retry 触发(复用现有机制)。
**`propose_lead(...)`**
```python
{
"name": "propose_lead",
"input_schema": {
"type": "object",
"required": [
"description", "target_agent",
"motivating_hypothesis", "expected_evidence_type",
],
"properties": {
"description": {
"type": "string",
"description": "1-2 sentence specific investigation request, including target source/artefact",
},
"target_agent": {
"type": "string",
"enum": ["filesystem","registry","communication","network","ios_artifact","android_artifact","media"],
},
"source_id": {"type": "string", "description": "which source to investigate"},
"motivating_hypothesis": {
"type": "string",
"description": "hyp-id this lead is meant to corroborate or refute",
},
"expected_evidence_type": {
"type": "string",
"enum": ["direct_evidence","supports","contradicts","weakens","prerequisite_met","consequence_observed"],
},
"rationale": {"type": "string", "description": "why this fills a real gap"},
}
}
}
```
**`declare_investigation_complete(...)`**
```python
{
"name": "declare_investigation_complete",
"input_schema": {
"type": "object",
"required": ["reason"],
"properties": {
"reason": {
"type": "string",
"enum": [
"marginal_yield_zero",
"budget_exhausted",
"all_hypotheses_resolved",
"coverage_saturated",
"other",
],
},
"rationale": {"type": "string"},
}
}
}
```
Terminal tool —— 调用即结束循环(复用现有 `terminal_tools` 机制)。
---
## 3. `InvestigationStrategist` agent
新文件 `agents/strategist.py`,约 150 行。
```python
class InvestigationStrategist(BaseAgent):
name = "strategist"
role = (
"You are the investigation strategist. You do not run forensic tools yourself. "
"Your job is to read the current evidence graph and decide ONE of:\n"
" (a) propose 1-3 new investigation leads that would materially affect an active hypothesis, or\n"
" (b) declare the investigation complete.\n"
"\n"
"Use graph_overview / source_coverage / marginal_yield / budget_status to ground your judgment. "
"DO NOT propose a lead that just adds more same-direction evidence to an already-supported hypothesis "
"(harmonic damping makes it ~useless). DO propose leads when:\n"
" - A hypothesis is supported by edges from only ONE source — get cross-source corroboration.\n"
" - A hypothesis is in the active band (0.2 < conf < 0.8) — it needs the deciding evidence.\n"
" - A specific high-value artefact is uncovered on a source where the active hypotheses suggest it matters.\n"
"\n"
"Declare complete when marginal_yield is approaching zero AND no remaining active hypotheses have "
"obvious investigation paths."
)
mandatory_record_tools = ("propose_lead", "declare_investigation_complete")
terminal_tools = ("declare_investigation_complete",)
def _register_graph_tools(self):
# Read-only tools — strategist NEVER writes phenomena/edges directly.
# All graph writes happen via the workers it dispatches.
self._register_graph_read_tools()
# No graph_write_tools.
# Add strategy-specific tools:
for tool_name in (
"graph_overview", "source_coverage", "marginal_yield", "budget_status",
"propose_lead", "declare_investigation_complete",
):
td = TOOL_CATALOG[tool_name]
self.register_tool(td.name, td.description, td.input_schema, td.executor)
```
注册到 `agent_factory._AGENT_CLASSES["strategist"]`
---
## 4. 编排器改造
### 4.1 删除/替换:现在的 Phase 3
`orchestrator.py:Phase 3` 当前逻辑(约 150 行):检查 leads → 派 worker →
检查 converged → 退出。**删除**。
### 4.2 新 Phase 3strategist loop
```python
async def _phase3_strategist_loop(self, run_dir: Path) -> None:
"""Belief-driven investigation: strategist proposes, workers execute, repeat."""
_log("Phase 3: Strategist-Driven Investigation", event="phase")
strategist = self.factory.get_or_create_agent("strategist")
max_rounds = self.config.get("budgets", {}).get("strategist_rounds_max", 10)
for round_num in range(1, max_rounds + 1):
# 1. Record round start + snapshot
rid = await self.graph.start_investigation_round(round_num)
# 2. Strategist run
_log(f"Strategist Round {round_num}", event="phase")
await strategist.run(
f"Review the graph and decide the next investigation action. "
f"This is round {round_num}/{max_rounds}. Budget used so far: see budget_status."
)
# 3. Did strategist declare complete?
if self.graph.is_round_terminal(rid):
_log(f"Strategist declared complete at round {round_num}", event="progress")
break
# 4. Collect new leads proposed this round
new_leads = self.graph.leads_from_round(round_num)
if not new_leads:
_log(f"No leads proposed in round {round_num} — stopping", event="progress")
break
# 5. Dispatch each lead
for lead in new_leads:
await self._execute_lead(lead, round_num)
# 6. Close round + record yield
await self.graph.complete_investigation_round(rid)
# 7. Hard budget check
if self._budget_exceeded():
_log(f"Budget exhausted at round {round_num}", event="progress")
break
```
### 4.3 `_execute_lead` 复用现有 worker 派发逻辑
```python
async def _execute_lead(self, lead: Lead, round_num: int) -> None:
agent_type = AGENT_ALIASES.get(lead.target_agent, lead.target_agent)
worker = self.factory.get_or_create_agent(agent_type)
if worker is None:
logger.warning(f"No worker for lead {lead.id}: {agent_type}")
return
src = self.graph.case.get_source(lead.source_id) if lead.source_id else None
if src:
self.graph.set_active_source(src)
_log(
f"Round {round_num} dispatching: {lead.description}",
event="dispatch", agent=agent_type,
)
await worker.run(
f"Investigate this specific lead from the strategist:\n\n"
f"REQUEST: {lead.description}\n"
f"MOTIVATING HYPOTHESIS: {lead.motivating_hypothesis}\n"
f"EXPECTED EVIDENCE TYPE: {lead.expected_evidence_type}\n"
f"RATIONALE: {lead.rationale}\n\n"
f"After investigating, record findings via add_phenomenon AND link relevant phenomena "
f"to {lead.motivating_hypothesis} via the appropriate edge_type."
)
lead.status = "completed"
self.graph._auto_save()
```
### 4.4 自动 hypothesis 重生成(可选,建议加)
新增 phenomena 可能产生**新假设**(不只是更新现有假设)。让 strategist 用
`propose_lead(target_agent="hypothesis", description="re-examine recent phenomena for new hypotheses")`
显式触发——这是 strategist 自决定的,不是定时触发。一致性优于自动定时。
---
## 5. 状态持久化
`graph_state.json` 新增顶层 key `investigation_rounds: list[InvestigationRound]`
`save_state` / `load_state` 处理。**断连恢复**时:
- 找最近一个未 completed 的 round → 视为该 round 失败
- 从下一个 round 重新开始
- 已完成 round 的 phenomena / edges 自然保留
---
## 6. 配置
`config.yaml` 新增:
```yaml
strategist:
enabled: true # false = 走老 Phase 3 逻辑safety fallback
max_rounds: 10
hard_stop_marginal_yield_zero_rounds: 3 # 连续 3 轮 yield=0 强制停
budgets:
tool_calls_total: 5000
wall_clock_minutes_max: 480
```
---
## 7. 测试策略
新文件 `tests/test_strategist.py` 或加入 `test_optimizations.py`。最少要测:
1. Strategist 调 `declare_complete` 时 loop 立即退出
2. Strategist 调 `propose_lead` 时 lead 入 graph 且 round_number 正确
3. Round snapshot 正确捕获 before/after status
4. 预算耗尽时即使 strategist 还想继续也强制停
5. 断连恢复:中途中断后重启从下一 round 开始
6. `graph_overview` 输出包含 `distinct_sources` 标注
7. `source_coverage` 对未触达项标 ✗
8. `marginal_yield` 数字与 `confidence_log` 一致
不写 LLM 集成测试——strategist 行为通过 mock LLM 验证(已有这种模式见
`test_forced_record_retry_fires_when_zero_phenomena`)。
---
## 8. 实施顺序
按依赖排(**每步独立 commit**——结构性改造,单点回滚关键):
| 步 | 内容 | 依赖 | 工作量估算 |
|---|---|---|---|
| 1 | `Lead` 加 4 字段 + `InvestigationRound` 数据类 + 序列化 | — | 60 行 + 测试 |
| 2 | `graph_overview` / `source_coverage` / `marginal_yield` / `budget_status` 实现 | 1 | 250 行 + 测试 |
| 3 | `propose_lead` / `declare_investigation_complete` 工具 | 1 | 80 行 + 测试 |
| 4 | `InvestigationStrategist` agent class | 2, 3 | 120 行 + 测试 |
| 5 | 编排器 Phase 3 重写 | 4 | 150 行(替换 ~50 行旧)+ 测试 |
| 6 | config schema + 加载逻辑 | 5 | 30 行 |
| 7 | 断连恢复处理 | 5 | 40 行 + 测试 |
| 8 | 真实案件 smoke run小规模USB only | 7 | 0 代码 |
| 9 | 文档DESIGN.md §4.9 改写 + 本文件归档 | 8 | 文档 |
总:~800 行新代码 + 测试 + 文档。
---
## 9. 风险 + 缓解
| 风险 | 缓解 |
|---|---|
| Strategist 太保守(永远 declare_complete | 加 prompt 例子展示什么是"该深入的情况";测试时小样本验证 |
| Strategist 太激进(每轮都 propose 7+ leads | `propose_lead` 工具 schema 限制每轮最多 3-5 个prompt 强调"重质不重量" |
| 单 worker 跑不完 lead 导致预算雪崩 | worker 调用本身 max_iter 不变strategist 预算独立 |
| LLM 不理解 `distinct_sources` 这种暗示 | `graph_overview` 末尾加 1-2 句 plain-English 解读 "Hypothesis X has 23 edges but all from one source → cross-source corroboration would strengthen it" |
| Phase 1 触发产生的 leads 被 strategist 忽略 | strategist prompt 明确"先处理已有 pending leads再产新的" |
| 死循环strategist 反复产同样 lead | Lead 表上加 `(motivating_hyp, expected_type, source_id)` 三元组去重 |
| `EXPECTED_ARTEFACTS` 清单维护成本 | 故意保持"软提示"——清单不完整也不会破,只是某些深度需要更多 LLM 自觉 |
---
## 10. 开放问题
1. **InvestigationRound 该不该自己跑 hypothesis agent**
倾向 strategist 用 lead 显式触发(一致性更好),不做定时触发。
2. **预算超用怎么办——硬停 vs 软警告?**
当前设计硬停;可加 "strategist 看到 budget < 10% 时只能 declare_complete"
的 schema enforcement。
3. **跨 source 边的"独立性奖励"是否纳入 log-odds**
上次衰减用了 `1/k`,没区分跨源 vs 同源。如果要纳入,公式应改为
`1/k_within_source × bonus_for_distinct_sources`。这是后续单独工程。
4. **Strategist 输出的 `rationale` 该不该走 grounding**
它不会写 phenomena`rationale` 字段可能包含具体值
"based on inv-12345...")。倾向不强制——这是元层判断,不是事实落地。
5. **现 Phase 3 的 `max_investigation_rounds` config 留还是删?**
建议留作 `strategist.enabled=false` 时的 fallback 旋钮。
---
## 11. 与 DESIGN.md 的关系
本文档落地后DESIGN.md 需要的对应更新:
- **§4.5**:补一段「同时也要看 log_odds 的**结构**——edges_in 数 / distinct_sources
是 strategist 判断是否深入的关键信号,不只是 confidence 数值」
- **§4.9 Phase 3**表格内容从「leads 派发到源感知 agent」改为
「strategist 循环:看图、提案、执行、复盘、停 / 续」
- **§8**(设计取舍):新增第 6 条:「调度层 LLM 化的取舍——strategist 决定深度,
但每轮预算受 `budgets.*` 硬限制;这是"LLM 提议、代码裁决"原则在调度层的兑现」
---
## 12. 备忘:本设计**不解决**的问题
- 应试题 8% 命中率的根因是**工具集不全**(无 vision、无 ZIP 暴力破解、无 VeraCrypt
挂载、无 blockchain explorer不是调度问题。strategist 让现有工具被用得更狠,
但不会凭空多出工具。
- LLM 编造 `invocation_id`(已修补,见 `feedback_grounding_pending` memory
log-odds 通胀(已修补:调和衰减)是本设计的**前置依赖**,不在本设计范围内。
- Per-edge-type 的更精细贝叶斯建模(如跨源独立性 bonus是独立工程。

View File

@@ -33,6 +33,7 @@ def _load_agent_classes() -> None:
from agents.network import NetworkAgent
from agents.registry import RegistryAgent
from agents.report import ReportAgent
from agents.strategist import InvestigationStrategist
from agents.timeline import TimelineAgent
_AGENT_CLASSES["filesystem"] = FileSystemAgent
_AGENT_CLASSES["registry"] = RegistryAgent
@@ -44,6 +45,7 @@ def _load_agent_classes() -> None:
_AGENT_CLASSES["ios_artifact"] = IOSArtifactAgent
_AGENT_CLASSES["android_artifact"] = AndroidArtifactAgent
_AGENT_CLASSES["media"] = MediaAgent
_AGENT_CLASSES["strategist"] = InvestigationStrategist
# Triage agent per (source.type, platform). disk_image is ambiguous on its

134
agents/strategist.py Normal file
View File

@@ -0,0 +1,134 @@
"""InvestigationStrategist — the LLM that decides depth vs breadth.
DESIGN_STRATEGIST.md §3.
The strategist does NOT run forensic tools. Its job per round is exactly one
decision: propose 1-3 leads that would move an active hypothesis, OR declare
the investigation complete. It reads the graph through four read-only views
(graph_overview / source_coverage / marginal_yield / budget_status) and
expresses its decision through two write tools (propose_lead /
declare_investigation_complete).
This is the smallest possible agent in the system — the entire point is that
strategy decisions live in one agent so they're auditable and the rest of the
codebase doesn't carry implicit depth/breadth policy.
"""
from __future__ import annotations
import logging
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
logger = logging.getLogger(__name__)
class InvestigationStrategist(BaseAgent):
name = "strategist"
role = (
"Investigation strategist. You do not run forensic tools yourself. "
"Each round you take ONE decision: propose 1-3 new investigation leads "
"that would materially affect an active hypothesis, OR declare the "
"investigation complete. Your judgment is grounded in the graph "
"(hypotheses, sources, coverage, marginal yield, budget) — never in "
"speculation."
)
# At least one of these must be called every round, otherwise BaseAgent's
# forced RECORD retry kicks in and re-prompts the strategist to take a
# documented decision.
mandatory_record_tools = ("propose_lead", "declare_investigation_complete")
# declare_complete is terminal — calling it short-circuits the tool loop,
# which is what we want (strategist returns immediately on "done").
terminal_tools = ("declare_investigation_complete",)
# Strategist-specific tools, plus the read-only graph queries inherited
# from BaseAgent. NO graph write tools (no add_phenomenon / link_to_entity
# / observe_identity); the strategist must NOT mutate evidence directly.
_STRATEGY_TOOLS = (
"graph_overview",
"source_coverage",
"marginal_yield",
"budget_status",
"propose_lead",
"declare_investigation_complete",
)
def _register_graph_tools(self) -> None:
"""Strategist gets read-only graph queries + the six strategy tools.
It does NOT get write tools (no add_phenomenon, observe_identity,
link_to_entity, add_temporal_edge). Every graph mutation must come
from a dispatched worker, not from the planner.
"""
self._register_graph_read_tools()
for tool_name in self._STRATEGY_TOOLS:
td = TOOL_CATALOG.get(tool_name)
if td is None:
logger.warning(
"Strategist could not find tool %s in TOOL_CATALOG — "
"register_all_tools must run before agent instantiation.",
tool_name,
)
continue
self.register_tool(td.name, td.description, td.input_schema, td.executor)
def _build_system_prompt(self, task: str) -> str:
"""Strategist-specific prompt. Replaces the BaseAgent default which
walks an INVESTIGATE→RECORD→LINK workflow that is wrong for a
planner agent.
"""
return (
f"You are {self.name}, the investigation strategist.\n"
f"Role: {self.role}\n\n"
f"Your task: {task}\n\n"
f"WORKFLOW (do this exactly):\n"
f" 1. Call graph_overview FIRST. Look at: which hypotheses are\n"
f" active (conf 0.2-0.8) vs already supported/refuted; which\n"
f" ones have many edges but only 1 distinct_source; which had\n"
f" a recent_flip vs none in two rounds.\n"
f" 2. Call marginal_yield to see if the last rounds produced anything.\n"
f" 3. Call budget_status to know your runway.\n"
f" 4. For each candidate lead direction, call source_coverage on\n"
f" the relevant source to see what's been touched.\n"
f" 5. Take exactly ONE of these terminal actions:\n"
f" (a) Call propose_lead 1-3 times for leads that would\n"
f" materially move an active hypothesis. STOP after this.\n"
f" (b) Call declare_investigation_complete with a specific\n"
f" reason. STOP after this.\n"
f"\n"
f"DECISION CRITERIA — when to propose vs when to stop:\n"
f" PROPOSE when:\n"
f" - A hypothesis is supported only by ONE source — get\n"
f" cross-source corroboration. Same-source repeats are\n"
f" cheap (harmonic damping).\n"
f" - A hypothesis is in the active band (0.2 < conf < 0.8) —\n"
f" it needs the deciding evidence.\n"
f" - A high-value artefact is ✗ on source_coverage AND an\n"
f" active hypothesis depends on the kind of evidence that\n"
f" artefact would produce.\n"
f" STOP (declare_complete) when:\n"
f" - marginal_yield shows zero across 2+ rounds.\n"
f" - budget_status warns ≥90% on tool_calls or rounds.\n"
f" - all active hypotheses are resolved (supported or refuted).\n"
f" - coverage saturation: every ✗ on every source is irrelevant\n"
f" to active hypotheses.\n"
f"\n"
f"HARD RULES:\n"
f" - You CANNOT call investigation tools (list_directory,\n"
f" sqlite_query, parse_registry_key, extract_file, etc.) — your\n"
f" job is to direct workers, not to investigate yourself.\n"
f" - You CANNOT call write tools (add_phenomenon, observe_identity,\n"
f" link_to_entity, add_hypothesis, add_temporal_edge). All\n"
f" evidence mutations come from the workers you dispatch.\n"
f" - Every propose_lead MUST cite a real hyp-id from\n"
f" graph_overview's table — fabricated ids will be rejected.\n"
f" - Don't propose more than 3 leads in one round. Quality over\n"
f" quantity — a 4th lead almost always means you're not really\n"
f" sure what would move the graph.\n"
f" - Don't re-propose a lead that's already pending. The system\n"
f" deduplicates (motivating_hyp, expected_type, agent, source)\n"
f" so duplicates silently no-op, but they waste your budget."
)

View File

@@ -228,12 +228,27 @@ class BaseAgent:
f"what you already found. Then end."
),
})
# Narrow the retry tool surface so the agent can't wander off
# to investigate again — only RECORD and read-only graph
# query tools survive. Each grounding-rejected call burns one
# iteration, so the cap is 30 (not the original 10): a
# Timeline agent writing ~10 temporal edges with one rejection
# apiece needs ~20 turns under the rewritten gateway.
retry_tool_names = set(registered_mandatory) | {
"list_phenomena", "list_assets", "search_graph",
"add_temporal_edge", "link_to_entity", "add_lead",
"add_hypothesis", "save_report",
}
retry_tools = [
td for td in self.get_tool_definitions()
if td["name"] in retry_tool_names
]
final_text, _ = await self.llm.tool_call_loop(
messages=conversation,
tools=self.get_tool_definitions(),
tools=retry_tools,
tool_executor=self._executors,
system=system,
max_iterations=10,
max_iterations=30,
terminal_tools=self.terminal_tools,
)

71
config.example.yaml Normal file
View File

@@ -0,0 +1,71 @@
# MASForensics Configuration — template.
#
# Copy this file to `config.yaml` and fill in your API key. config.yaml is
# git-ignored so secrets don't land in commits. The two files share schema;
# only this template is tracked.
agent:
base_url: "https://api.deepseek.com"
api_key: "YOUR-API-KEY-HERE"
model: "deepseek-v4-pro"
max_tokens: 16384
reasoning_effort: "high" # DeepSeek/o1-style reasoning depth; omit to disable
thinking_enabled: true # DeepSeek extra_body.thinking switch
# Maximum rounds of hypothesis-directed investigation (Phase 3).
# Only consulted when strategist.enabled is false (legacy fallback path).
max_investigation_rounds: 1
# Phase 3 strategist loop (DESIGN_STRATEGIST.md). When enabled, the
# InvestigationStrategist agent decides each round whether to propose new
# leads or declare the investigation complete. When disabled, the legacy
# fixed-round investigation loop runs instead.
strategist:
enabled: true
max_rounds: 10
# Safety net: if the strategist keeps proposing leads but yield (new
# phenomena + edges + status flips) is zero for this many consecutive
# rounds, the orchestrator force-stops Phase 3 regardless.
hard_stop_marginal_yield_zero_rounds: 3
# Hard caps that bound the whole run. The strategist's budget_status tool
# reads these to pace its proposals; the orchestrator also enforces them
# as hard stops (DESIGN_STRATEGIST.md §4.2 step 7). Comment out any cap
# to make it unbounded.
budgets:
tool_calls_total: 5000
strategist_rounds_max: 10
wall_clock_minutes_max: 480
# Optional: override the per-edge-type log₁₀(LR) calibration table.
# Confidence updates accumulate these in odds space (additive, order-
# independent), then map back to probability via sigmoid. Single edge
# magnitudes: ≥ +0.602 lifts confidence above the 0.8 supported threshold,
# ≤ 0.602 drops it below the 0.2 refuted threshold.
# If omitted, evidence_graph._DEFAULT_LOG_LR is used.
# hypothesis_log_lr:
# direct_evidence: 2.0
# supports: 1.0
# consequence_observed: 1.0
# prerequisite_met: 0.5
# weakens: -0.5
# contradicts: -2.0
# Optional: manually specify initial hypotheses. If omitted, the
# HypothesisAgent auto-generates them from Phase 1 findings.
# hypotheses:
# - title: "..."
# description: "..."
# Investigation areas — LLM-derived from active hypotheses after Phase 2.
# Each entry below acts as a MANUAL OVERRIDE: it is seeded into the graph
# before the LLM derives areas, so manual entries always survive (slug-based
# dedupe; LLM only augments keyword/tool lists, never overwrites).
#
# investigation_areas:
# - area: shutdown_time
# description: "Last recorded shutdown time"
# agent: registry
# priority: 3
# keywords: [shutdown, last shutdown]
# tools: [get_shutdown_time]

View File

@@ -417,7 +417,14 @@ class Edge:
@dataclass
class Lead:
"""An investigative lead that should be followed up by an agent."""
"""An investigative lead that should be followed up by an agent.
Phase 1 agents create leads as "things outside my scope but worth chasing".
The strategist (DESIGN_STRATEGIST.md) also creates leads, and additionally
annotates each with the hypothesis it's meant to corroborate or refute plus
the kind of edge it expects to produce — so the orchestrator can later
measure "did this lead actually change any belief".
"""
id: str
target_agent: str
@@ -426,13 +433,77 @@ class Lead:
context: dict = field(default_factory=dict)
status: str = "pending" # pending, assigned, completed, failed
hypothesis_id: str | None = None
# Strategist-loop annotations. proposed_by names the agent that created
# the lead ("filesystem", "strategist", ...). motivating_hypothesis and
# expected_evidence_type let the orchestrator measure marginal yield.
# round_number is 0 for Phase 1 leads, ≥1 for strategist-produced leads.
proposed_by: str = ""
motivating_hypothesis: str = ""
expected_evidence_type: str = ""
round_number: int = 0
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Lead:
return cls(**d)
# Forward-compatible: old state files predate the strategist annotations.
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
@dataclass
class InvestigationRound:
"""One round of strategist-driven investigation.
DESIGN_STRATEGIST.md §1.2: provenance for the strategist's decisions. Each
round records what hypothesis statuses looked like before vs. after, what
leads were proposed, which actually got executed, and how many new
phenomena/edges resulted. ``marginal_yield`` over recent rounds is what
the strategist consults to decide whether to keep digging or declare
complete.
"""
id: str # "round-{nnn}"
round_number: int
started_at: str
completed_at: str = ""
strategist_action: str = "" # "propose_leads" | "declare_complete"
leads_proposed: list[str] = field(default_factory=list)
leads_executed: list[str] = field(default_factory=list)
hypothesis_status_snapshot_before: dict = field(default_factory=dict)
hypothesis_status_snapshot_after: dict = field(default_factory=dict)
phenomena_count_before: int = 0
phenomena_count_after: int = 0
edges_count_before: int = 0
edges_count_after: int = 0
decision_rationale: str = ""
@property
def new_phenomena_count(self) -> int:
return max(0, self.phenomena_count_after - self.phenomena_count_before)
@property
def new_edges_count(self) -> int:
return max(0, self.edges_count_after - self.edges_count_before)
@property
def status_flips(self) -> int:
before = self.hypothesis_status_snapshot_before
after = self.hypothesis_status_snapshot_after
flips = 0
for hid, after_status in after.items():
if before.get(hid) and before.get(hid) != after_status:
flips += 1
return flips
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> InvestigationRound:
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
@dataclass
@@ -598,6 +669,26 @@ class EvidenceGraph:
# claimed fact values against real tool outputs.
self.tool_invocations: dict[str, ToolInvocation] = {}
# Investigation rounds — provenance for the strategist's per-round
# decisions (DESIGN_STRATEGIST.md). Empty for runs that don't reach
# Phase 3 or that disable the strategist via config.
self.investigation_rounds: list[InvestigationRound] = []
# Budget config + run-start monotonic clock. Set by the orchestrator
# when it boots; the budget_status strategy tool reads these. None
# means unbounded / not yet started.
self.budgets: dict[str, int] = {}
self.run_start_monotonic: float | None = None
# Current strategist round number. Set by the orchestrator at the
# top of each strategist loop iteration so propose_lead / declare_
# investigation_complete can tag their actions correctly. 0 when
# the strategist is not running.
self.current_strategist_round: int = 0
# Set to True by declare_investigation_complete so the orchestrator
# knows to break out of the strategist loop after this round.
self.strategist_complete_requested: bool = False
# _current_agent / _current_task_id are exposed as @property below,
# backed by module-level ContextVars (race-free under asyncio.gather).
@@ -658,6 +749,9 @@ class EvidenceGraph:
"tool_invocations": {
iid: inv.to_dict() for iid, inv in self.tool_invocations.items()
},
"investigation_rounds": [
r.to_dict() for r in self.investigation_rounds
],
"saved_at": datetime.now().isoformat(),
}
tmp = self._persist_path.with_suffix(".tmp")
@@ -730,6 +824,10 @@ class EvidenceGraph:
iid: ToolInvocation.from_dict(inv)
for iid, inv in data.get("tool_invocations", {}).items()
}
graph.investigation_rounds = [
InvestigationRound.from_dict(r)
for r in data.get("investigation_rounds", [])
]
graph._rebuild_adjacency()
logger.info(
"EvidenceGraph restored: %d phenomena, %d hypotheses, %d entities, "
@@ -1020,9 +1118,20 @@ class EvidenceGraph:
**Idempotency**: if a ``(phenomenon, hypothesis, edge_type)`` edge
already exists, this is a no-op — the same agent re-recording the
same link (or two agents linking via the orchestrator's batch
judge and a manual override) does not double-count. Independent
evidence — *different* phenomena pointing the same way — still
accumulates fully.
judge and a manual override) does not double-count.
**Harmonic damping of repeated same-direction evidence** (added
post first full-case run, 2026-05-20): independent evidence —
different phenomena pointing the same way — still accumulates,
but with diminishing returns: the k-th edge of the same
``(hyp_id, edge_type)`` contributes ``log_lr_base / k``. After N
same-direction edges the cumulative contribution is
``log_lr_base · H_N`` (harmonic sum, grows as ln N). This
formalises the naive-Bayes-breakdown DESIGN.md §4.5 calls out:
"同一发现被多 agent 重复入图". Single-edge hypotheses are
unaffected (k=1, damping = 1.0). Cross-direction edges (supports
vs contradicts) keep their own independent counts so a strong
contradicting fact still bites against piled-on supports.
"""
if edge_type not in self.edge_log_lr:
raise ValueError(
@@ -1045,7 +1154,17 @@ class EvidenceGraph:
):
return hyp.confidence
log_lr = self.edge_log_lr[edge_type]
# Harmonic damping rank: count existing edges of the SAME
# edge_type already incident on this hypothesis. The new edge
# becomes the (rank+1)-th of its kind. _adj_rev is keyed by
# target so this is O(in-degree(hyp)) without scanning all edges.
existing_same_type = sum(
1 for e in self._adj_rev.get(hyp_id, [])
if e.edge_type == edge_type
)
rank = existing_same_type + 1
log_lr_base = self.edge_log_lr[edge_type]
log_lr = log_lr_base / rank
old_log_odds = hyp.log_odds
old_conf = hyp.confidence
new_log_odds = old_log_odds + log_lr
@@ -1065,7 +1184,9 @@ class EvidenceGraph:
"timestamp": datetime.now().isoformat(),
"phenomenon_id": phenomenon_id,
"edge_type": edge_type,
"log_lr": log_lr,
"log_lr_base": log_lr_base,
"rank": rank,
"log_lr": round(log_lr, 4),
"old_log_odds": round(old_log_odds, 4),
"new_log_odds": round(new_log_odds, 4),
"old_confidence": round(old_conf, 4),
@@ -1474,9 +1595,29 @@ class EvidenceGraph:
priority: int = 5,
context: dict | None = None,
hypothesis_id: str | None = None,
proposed_by: str = "",
motivating_hypothesis: str = "",
expected_evidence_type: str = "",
round_number: int = 0,
) -> str:
async with self._lock:
lid = f"lead-{uuid.uuid4().hex[:8]}"
# Idempotency for strategist proposals: identical
# (motivating_hypothesis, expected_evidence_type, target_agent,
# source_id) triple should not be created twice — this guards
# against the "strategist loops on the same lead" failure mode.
if motivating_hypothesis and proposed_by == "strategist":
source_id = (context or {}).get("source_id", "")
for existing in self.leads:
if (
existing.proposed_by == "strategist"
and existing.motivating_hypothesis == motivating_hypothesis
and existing.expected_evidence_type == expected_evidence_type
and existing.target_agent == target_agent
and (existing.context or {}).get("source_id", "") == source_id
and existing.status in ("pending", "assigned")
):
return existing.id
self.leads.append(Lead(
id=lid,
target_agent=target_agent,
@@ -1484,10 +1625,89 @@ class EvidenceGraph:
priority=priority,
context=context or {},
hypothesis_id=hypothesis_id,
proposed_by=proposed_by,
motivating_hypothesis=motivating_hypothesis,
expected_evidence_type=expected_evidence_type,
round_number=round_number,
))
self._auto_save()
return lid
# ---- Investigation rounds (strategist loop) ----------------------------
async def start_investigation_round(
self, round_number: int,
) -> str:
"""Open a new investigation round + capture pre-round snapshot.
Called by the orchestrator at the top of each strategist iteration.
The snapshot records hypothesis status, phenomena count, and edges
count so that ``complete_investigation_round`` can compute the
round's yield deltas.
"""
async with self._lock:
rid = f"round-{round_number:03d}"
snapshot_before = {
hid: h.status for hid, h in self.hypotheses.items()
}
self.investigation_rounds.append(InvestigationRound(
id=rid,
round_number=round_number,
started_at=datetime.now().isoformat(),
hypothesis_status_snapshot_before=snapshot_before,
phenomena_count_before=len(self.phenomena),
edges_count_before=len(self.edges),
))
self._auto_save()
return rid
async def complete_investigation_round(
self,
round_id: str,
strategist_action: str = "propose_leads",
leads_executed: list[str] | None = None,
decision_rationale: str = "",
) -> InvestigationRound | None:
"""Close a round, recording after-snapshot + which leads got executed.
Idempotent on already-closed rounds (returns the existing record).
"""
async with self._lock:
for r in self.investigation_rounds:
if r.id != round_id:
continue
if r.completed_at:
return r
r.completed_at = datetime.now().isoformat()
r.strategist_action = strategist_action
r.leads_executed = list(leads_executed or [])
r.leads_proposed = [
l.id for l in self.leads
if l.round_number == r.round_number
and l.proposed_by == "strategist"
]
r.hypothesis_status_snapshot_after = {
hid: h.status for hid, h in self.hypotheses.items()
}
r.phenomena_count_after = len(self.phenomena)
r.edges_count_after = len(self.edges)
r.decision_rationale = decision_rationale
self._auto_save()
return r
return None
def get_investigation_round(self, round_id: str) -> InvestigationRound | None:
for r in self.investigation_rounds:
if r.id == round_id:
return r
return None
def latest_round(self) -> InvestigationRound | None:
return self.investigation_rounds[-1] if self.investigation_rounds else None
def leads_from_round(self, round_number: int) -> list[Lead]:
return [l for l in self.leads if l.round_number == round_number]
async def get_pending_leads(self, agent_type: str | None = None) -> list[Lead]:
async with self._lock:
leads = [l for l in self.leads if l.status == "pending"]

View File

@@ -148,6 +148,8 @@ READ_ONLY_TOOLS: set[str] = {
"parse_ios_keychain", "read_idevice_info",
# Android + media reads (S6) — set_active_partition is NOT read-only.
"probe_android_partitions", "ocr_image",
# Strategist view tools (DESIGN_STRATEGIST.md §2) — pure renders.
"graph_overview", "source_coverage", "marginal_yield", "budget_status",
}

View File

@@ -119,6 +119,11 @@ class Orchestrator:
self._failure_count = 0
self._max_failures = 3
self._start_time = datetime.now()
# Make budgets visible to strategy tools via the graph object. The
# budget_status tool reads graph.budgets / graph.run_start_monotonic
# directly so it does not need a back-reference to the orchestrator.
self.graph.budgets = dict(self.config.get("budgets", {}) or {})
self.graph.run_start_monotonic = time.monotonic()
def _resolve_agent_type(self, agent_type: str) -> str:
return AGENT_ALIASES.get(agent_type, agent_type)
@@ -195,6 +200,298 @@ class Orchestrator:
lead.context["retry"] = True
await self._dispatch_leads_parallel(failed)
# ---- Phase 3: strategist loop (DESIGN_STRATEGIST.md §4) ------------------
def _budget_exceeded(self) -> bool:
"""Hard budget enforcement, complementing strategist self-throttling.
Any of these triggers an immediate Phase 3 exit even if the
strategist hasn't called declare_investigation_complete. Each cap
is optional — leave it out of config to make it unbounded.
"""
b = self.graph.budgets or {}
tc_cap = b.get("tool_calls_total")
if tc_cap and len(self.graph.tool_invocations) >= tc_cap:
return True
wc_cap = b.get("wall_clock_minutes_max")
if wc_cap and self.graph.run_start_monotonic is not None:
elapsed_min = (time.monotonic() - self.graph.run_start_monotonic) / 60.0
if elapsed_min >= wc_cap:
return True
return False
async def _execute_strategist_lead(self, lead, round_num: int) -> None:
"""Dispatch one strategist-proposed lead to its target worker.
Unlike the legacy bulk dispatcher this runs leads serially so each
worker run reads a graph that includes prior leads' findings — the
strategist's next round can see the cumulative effect of this round.
"""
agent_type = AGENT_ALIASES.get(lead.target_agent, lead.target_agent)
worker = self.factory.get_or_create_agent(agent_type)
if worker is None:
logger.warning(
"No worker registered for lead %s: target_agent=%s",
lead.id, agent_type,
)
lead.status = "failed"
lead.context["failure_reason"] = f"no worker for agent type '{agent_type}'"
self.graph._auto_save()
return
source_id = (lead.context or {}).get("source_id", "")
if source_id and self.graph.case is not None:
src = self.graph.case.get_source(source_id)
if src:
self.graph.set_active_source(src)
rationale = (lead.context or {}).get("rationale", "")
worker_task = (
f"Investigate this specific lead from the strategist:\n\n"
f"REQUEST: {lead.description}\n"
f"MOTIVATING HYPOTHESIS: {lead.motivating_hypothesis or '(unspecified)'}\n"
f"EXPECTED EVIDENCE TYPE: {lead.expected_evidence_type or '(unspecified)'}\n"
f"RATIONALE: {rationale or '(unspecified)'}\n\n"
f"After investigating, record findings via add_phenomenon AND "
f"link relevant phenomena to "
f"{lead.motivating_hypothesis or 'the motivating hypothesis'} via the "
f"appropriate edge_type. If your investigation produces no relevant "
f"finding, record that as a negative phenomenon so the strategist "
f"can see the gap was probed."
)
_log(
f"Round {round_num} dispatching: {lead.description[:80]}",
event="dispatch", agent=agent_type, lead=lead.id,
)
lead.status = "assigned"
self.graph._auto_save()
try:
await worker.run(worker_task, lead_id=lead.id)
lead.status = "completed"
except Exception as e:
logger.error("Strategist lead %s failed: %s", lead.id, e, exc_info=True)
lead.status = "failed"
lead.context["failure_reason"] = str(e)
finally:
self.graph._auto_save()
async def _resume_strategist_state(self) -> int:
"""Repair any open InvestigationRound after a resume and return the
next round number to use.
An "open" round is one with ``started_at`` set but ``completed_at``
empty — interrupted before its complete step. Mark it as completed
with action=interrupted_resume so the run history is self-describing,
and mark any leads still in the "assigned" state from that round as
"failed" so the gap-analysis / retry paths can re-process them.
Returns the round number the strategist loop should start from
(1 + the highest existing round_number).
"""
if not self.graph.investigation_rounds:
return 1
highest = max(r.round_number for r in self.graph.investigation_rounds)
last = self.graph.latest_round()
if last is not None and not last.completed_at:
assigned_in_round = [
l for l in self.graph.leads
if l.round_number == last.round_number
and l.status == "assigned"
]
for lead in assigned_in_round:
lead.status = "failed"
lead.context["failure_reason"] = "interrupted before complete"
await self.graph.complete_investigation_round(
last.id, strategist_action="interrupted_resume",
decision_rationale=(
f"resume repair: this round was interrupted before "
f"completion; {len(assigned_in_round)} assigned leads "
f"have been re-marked as failed."
),
)
logger.info(
"Strategist resume: repaired open round R%d (closed %d assigned leads)",
last.round_number, len(assigned_in_round),
)
return highest + 1
async def _phase3_strategist_loop(self) -> None:
"""Belief-driven investigation: strategist proposes, workers execute,
repeat. Replaces the legacy fixed-round investigation loop.
"""
_log("Phase 3: Strategist-Driven Investigation", event="phase")
strategist_cfg = self.config.get("strategist", {}) or {}
max_rounds = int(strategist_cfg.get("max_rounds", 10))
zero_yield_cap = int(strategist_cfg.get("hard_stop_marginal_yield_zero_rounds", 3))
strategist = self.factory.get_or_create_agent("strategist")
if strategist is None:
logger.error(
"InvestigationStrategist agent not registered — falling back "
"to legacy Phase 3 loop. Check agent_factory._AGENT_CLASSES."
)
await self._phase3_legacy_loop()
return
# Resume support: if we're restarting after an interruption, repair
# any half-open round and pick up at the next number.
start_round = await self._resume_strategist_state()
if start_round > 1:
_log(
f"Resuming strategist loop at round {start_round} "
f"(history: {len(self.graph.investigation_rounds)} prior rounds)",
event="progress",
)
zero_yield_streak = 0
for round_num in range(start_round, max_rounds + 1):
# Reset per-round flags so a previous round's declare_complete
# doesn't leak across iterations (defensive — strategist also
# only sets True, never False).
self.graph.strategist_complete_requested = False
self.graph.current_strategist_round = round_num
rid = await self.graph.start_investigation_round(round_num)
_log(
f"Strategist Round {round_num}/{max_rounds}", event="phase",
round=round_num,
)
t0 = time.monotonic()
try:
await strategist.run(
f"Review the current investigation state and decide the "
f"next action. This is round {round_num}/{max_rounds}. "
f"Use graph_overview / marginal_yield / budget_status / "
f"source_coverage to ground your decision, then call "
f"propose_lead 1-3 times OR declare_investigation_complete."
)
except Exception as e:
logger.error("Strategist round %d failed: %s", round_num, e, exc_info=True)
await self.graph.complete_investigation_round(
rid, decision_rationale=f"strategist crashed: {e}",
)
break
# Strategist declared complete → no leads execute, exit loop.
if self.graph.strategist_complete_requested:
_log(
f"Strategist declared complete at round {round_num}",
event="progress", elapsed=time.monotonic() - t0,
)
await self.graph.complete_investigation_round(
rid, strategist_action="declare_complete",
decision_rationale="strategist declare_investigation_complete",
)
break
# Collect this round's leads (proposed_by=strategist + matching round).
new_leads = [
l for l in self.graph.leads
if l.round_number == round_num
and l.proposed_by == "strategist"
and l.status == "pending"
]
if not new_leads:
_log(
f"Round {round_num}: strategist proposed no new leads — exiting loop",
event="progress", elapsed=time.monotonic() - t0,
)
await self.graph.complete_investigation_round(
rid, strategist_action="no_leads",
decision_rationale="strategist proposed no new leads",
)
break
# Dispatch each lead to its worker.
for lead in new_leads:
await self._execute_strategist_lead(lead, round_num)
# After workers run, judge any new phenomena against existing
# hypotheses (so confidence updates happen before the next round
# of strategist reasoning).
if self.graph.phenomena and self.graph.hypotheses:
await self._judge_new_phenomena()
closed = await self.graph.complete_investigation_round(
rid, strategist_action="propose_leads",
leads_executed=[l.id for l in new_leads],
)
# Show round outcome.
for h in self.graph.hypotheses.values():
_log(f" {h.summary()}", event="hypothesis")
_log(
_progress_summary(self.graph) + f" (yield: +{closed.new_phenomena_count}ph, +{closed.new_edges_count}edges, {closed.status_flips}flips)",
event="progress", elapsed=time.monotonic() - t0,
)
# Marginal-yield hard stop. Distinct from strategist self-throttle:
# if the strategist insists on continuing through repeated dry
# rounds, force-stop. This protects against an over-eager
# strategist + a confused worker that produces no edges.
yield_total = (
closed.new_phenomena_count
+ closed.new_edges_count
+ closed.status_flips
)
if yield_total == 0:
zero_yield_streak += 1
if zero_yield_streak >= zero_yield_cap:
_log(
f"Hard stop: {zero_yield_streak} consecutive "
f"zero-yield rounds (cap {zero_yield_cap})",
event="progress",
)
break
else:
zero_yield_streak = 0
if self._budget_exceeded():
_log(
f"Budget exhausted after round {round_num} — exiting Phase 3",
event="progress",
)
break
else:
_log(
f"Strategist max_rounds={max_rounds} reached", event="progress",
)
# Always reset the round counter on exit so subsequent runs don't
# inherit the last value.
self.graph.current_strategist_round = 0
async def _phase3_legacy_loop(self) -> None:
"""Legacy fixed-round Phase 3 — preserved for fallback / regression.
Engaged when config has ``strategist.enabled: false`` or when the
strategist agent class is somehow not registered. Behaves identically
to the pre-DESIGN_STRATEGIST orchestrator: bounded iteration,
hypothesis-derived leads, parallel dispatch, gap analysis.
"""
max_rounds = self.config.get("max_investigation_rounds", 5)
for round_num in range(max_rounds):
_log(f"Phase 3: Investigation Round {round_num}", event="phase")
t0 = time.monotonic()
if self.graph.hypotheses_converged():
_log("All hypotheses converged — stopping", event="progress")
break
await self._generate_hypothesis_leads()
pending = await self.graph.get_pending_leads()
if not pending:
_log("No pending leads — round complete", event="progress")
break
await self._dispatch_leads_parallel(pending)
await self._judge_new_phenomena()
for h in self.graph.hypotheses.values():
_log(f" {h.summary()}", event="hypothesis")
_log(_progress_summary(self.graph), event="progress", elapsed=time.monotonic() - t0)
# ---- Hypothesis generation -----------------------------------------------
async def _generate_hypotheses_manual(self, hypotheses_config: list[dict]) -> None:
@@ -881,39 +1178,26 @@ class Orchestrator:
event="progress", elapsed=time.monotonic() - t0,
)
# Phase 3: Hypothesis-directed investigation (iterative)
# Phase 3: Strategist-driven investigation (DESIGN_STRATEGIST.md)
if resume_phase <= 3:
max_rounds = self.config.get("max_investigation_rounds", 5)
for round_num in range(max_rounds):
_log(f"Phase 3: Investigation Round {round_num}", event="phase")
t0 = time.monotonic()
strategist_cfg = self.config.get("strategist", {}) or {}
strategist_enabled = strategist_cfg.get("enabled", True)
if strategist_enabled:
await self._phase3_strategist_loop()
else:
# Legacy fallback — keep the old hypothesis-directed
# iterative loop available for runs that explicitly
# disable the strategist (debugging, regression
# comparison, or environments without the strategist
# agent registered).
await self._phase3_legacy_loop()
if self.graph.hypotheses_converged():
_log("All hypotheses converged — stopping", event="progress")
break
await self._generate_hypothesis_leads()
pending = await self.graph.get_pending_leads()
if not pending:
_log("No pending leads — round complete", event="progress")
break
await self._dispatch_leads_parallel(pending)
await self._judge_new_phenomena()
# Show hypothesis status update
for h in self.graph.hypotheses.values():
_log(f" {h.summary()}", event="hypothesis")
_log(_progress_summary(self.graph), event="progress", elapsed=time.monotonic() - t0)
# Retry failed leads
# Retry failed leads + Gap Analysis run regardless of which
# Phase 3 variant was used — they operate on the leads/
# hypothesis graph the strategist loop leaves behind.
await self._retry_failed_leads()
# Gap analysis
_log("Phase 3: Gap Analysis", event="phase")
await self._run_gap_analysis()
self.graph.mark_remaining_inconclusive()
# Phase 4: Timeline construction

View File

@@ -9,6 +9,7 @@ import pytest
from evidence_graph import (
EvidenceGraph, Phenomenon, Hypothesis, Lead, GroundingError,
InvestigationRound,
_compute_quality_score, _jaccard_similarity,
prob_to_log_odds, log_odds_to_prob,
)
@@ -1332,6 +1333,69 @@ class TestInvestigationAreaDerivation:
assert edge_added["n"] == 1
assert agent._record_call_counts["add_temporal_edge"] == 1
@pytest.mark.asyncio
async def test_forced_retry_uses_higher_cap_and_narrowed_tools(self):
"""The forced RECORD retry must (a) get a generous iter cap so that
grounding-rejected retries don't blow the budget, and (b) hand the
LLM a tool surface restricted to RECORD + read-only graph tools so
it can't wander back into investigation.
"""
from unittest.mock import AsyncMock
from base_agent import BaseAgent
graph = EvidenceGraph()
llm = AsyncMock()
# Capture per-call kwargs of tool_call_loop so we can assert what
# the retry round received.
call_kwargs: list[dict] = []
async def real_add_edge(**kw):
return None
class TimelineLike(BaseAgent):
mandatory_record_tools = ("add_temporal_edge",)
agent = TimelineLike(llm, graph)
agent.name = "timeline_like"
agent._register_graph_tools = lambda: None
agent.register_tool("add_temporal_edge", "", {}, real_add_edge)
# An investigation-style tool the retry must NOT expose.
async def real_inv(**kw): return ""
agent.register_tool("list_directory", "", {}, real_inv)
# A read-only graph query — should remain available in retry.
async def real_ro(**kw): return ""
agent.register_tool("list_phenomena", "", {}, real_ro)
async def fake_tool_call_loop(messages, tools, tool_executor, system, max_iterations=40, terminal_tools=()):
call_kwargs.append({
"tools": [t["name"] for t in tools],
"max_iterations": max_iterations,
})
already_retrying = any(
"STOP." in (m.get("content", "") if isinstance(m, dict) else "")
for m in messages
)
if not already_retrying:
return "no record", list(messages)
await tool_executor["add_temporal_edge"]()
return "recorded.", []
llm.tool_call_loop = fake_tool_call_loop
await agent.run("build timeline")
assert len(call_kwargs) == 2
first_call, retry_call = call_kwargs
# First call: full tool surface, default iter cap.
assert "list_directory" in first_call["tools"]
# Retry call: investigation tool dropped, mandatory + read-only kept.
assert "list_directory" not in retry_call["tools"]
assert "add_temporal_edge" in retry_call["tools"]
assert "list_phenomena" in retry_call["tools"]
# Iter cap on the retry is now generous — 10 was empirically too tight
# because grounding-rejected calls burn iterations.
assert retry_call["max_iterations"] >= 30
# ---- terminal_tools: real LLMClient.tool_call_loop short-circuit -----
@pytest.mark.asyncio
@@ -1957,8 +2021,9 @@ class TestLogOddsConfidence:
# All three orderings must agree exactly.
assert confs[0] == pytest.approx(confs[1])
assert confs[1] == pytest.approx(confs[2])
# And the value should be 1 + 1 0.5 = 1.5 → sigmoid ≈ 0.9694
assert confs[0] == pytest.approx(0.9694, abs=1e-3)
# With harmonic damping: 2 supports → 1.0 + 0.5 = +1.5, 1 weakens
# → 0.5, net log_odds = +1.0 → sigmoid 1/(1+10^-1) ≈ 0.9091.
assert confs[0] == pytest.approx(0.9091, abs=1e-3)
@pytest.mark.asyncio
async def test_each_edge_type_calibrated(self, graph):
@@ -2019,17 +2084,91 @@ class TestLogOddsConfidence:
assert sum(1 for e in graph.edges if e.edge_type == "supports") == first_edges
@pytest.mark.asyncio
async def test_independent_evidence_accumulates(self, graph):
"""Distinct phenomena with same edge_type DO accumulate (independent)."""
async def test_repeated_same_direction_evidence_dampens_harmonically(self, graph):
"""Distinct phenomena with same edge_type accumulate WITH harmonic
damping (post 2026-05-20 full-case-run fix). The k-th edge of a
given (hyp, edge_type) contributes log_lr_base/k; after 3 supports
edges the total is 1.0 · (1 + 1/2 + 1/3) ≈ 1.833, not 3.0. This
formalises the breakdown of the naive-Bayes independence
assumption when multiple agents pile on the same finding.
"""
hid = await graph.add_hypothesis("h", "d")
for i in range(3):
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", f"ph {i}", f"d {i}", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid, "supports", "")
# 3 × +1.0 = +3.0 log_odds → conf ≈ 0.999
assert graph.hypotheses[hid].log_odds == pytest.approx(3.0)
assert graph.hypotheses[hid].confidence > 0.99
# 1.0 · H_3 = 1.0 + 0.5 + 0.3333... = 1.8333
expected = 1.0 + 0.5 + (1.0 / 3.0)
assert graph.hypotheses[hid].log_odds == pytest.approx(expected, abs=1e-6)
# Still above the 0.8 supported threshold (good — 3 independent
# observations remain strong evidence) but well below the runaway
# confidence pre-fix produced.
assert graph.hypotheses[hid].status == "supported"
@pytest.mark.asyncio
async def test_first_edge_undamped(self, graph):
"""Damping is rank-1 → /1 → no change. A hypothesis with a single
edge must contribute the full calibrated log_lr value (otherwise the
whole calibration table would be off by a factor)."""
hid = await graph.add_hypothesis("h", "d")
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", "lone", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid, "direct_evidence", "")
assert graph.hypotheses[hid].log_odds == pytest.approx(2.0)
@pytest.mark.asyncio
async def test_damping_independent_per_edge_type(self, graph):
"""Damping rank is keyed per (hyp, edge_type) — supports' counter
does NOT advance direct_evidence's counter."""
hid = await graph.add_hypothesis("h", "d")
for i in range(5):
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", f"s{i}", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid, "supports", "")
log_after_supports = graph.hypotheses[hid].log_odds
# H_5 = 1 + 1/2 + 1/3 + 1/4 + 1/5 = 2.2833
assert log_after_supports == pytest.approx(2.2833, abs=1e-3)
pid_de, _ = await graph.add_phenomenon(
"fs", "filesystem", "de", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid_de, "direct_evidence", "")
delta = graph.hypotheses[hid].log_odds - log_after_supports
assert delta == pytest.approx(2.0, abs=1e-6)
@pytest.mark.asyncio
async def test_damping_independent_per_hypothesis(self, graph):
"""The rank counter is per (hyp, edge_type), so two hypotheses
each receiving 'supports' edges accumulate independently."""
h1 = await graph.add_hypothesis("h1", "d")
h2 = await graph.add_hypothesis("h2", "d")
for i in range(3):
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", f"to-h1 {i}", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(h1, pid, "supports", "")
pid_h2, _ = await graph.add_phenomenon(
"fs", "filesystem", "to-h2", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(h2, pid_h2, "supports", "")
assert graph.hypotheses[h2].log_odds == pytest.approx(1.0)
@pytest.mark.asyncio
async def test_damping_rank_persists_in_confidence_log(self, graph):
"""The rank used for damping must be recorded so the math is
auditable from the persisted graph (no need to recompute)."""
hid = await graph.add_hypothesis("h", "d")
for i in range(2):
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", f"p {i}", "d", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid, "supports", "")
entries = graph.hypotheses[hid].confidence_log
assert entries[0]["rank"] == 1 and entries[0]["log_lr"] == pytest.approx(1.0)
assert entries[1]["rank"] == 2 and entries[1]["log_lr"] == pytest.approx(0.5)
@pytest.mark.asyncio
async def test_prior_prob_shifts_starting_log_odds(self, graph):
# prior 0.9 → log_odds ≈ +0.954
@@ -2898,3 +3037,641 @@ class TestOrchestratorMultiSource:
assert Orchestrator._is_analysable(ok_media)
assert not Orchestrator._is_analysable(no_path)
# ---------------------------------------------------------------------------
# Strategist loop foundation (DESIGN_STRATEGIST.md §1)
# ---------------------------------------------------------------------------
class TestLeadExtensionsForStrategist:
"""Lead now carries strategist-loop annotations: proposed_by,
motivating_hypothesis, expected_evidence_type, round_number. Old state
files predate these fields — from_dict must accept them missing.
"""
@pytest.mark.asyncio
async def test_add_lead_records_strategist_annotations(self):
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
lid = await graph.add_lead(
target_agent="filesystem",
description="Check Safari bookmarks for device-switching evidence",
proposed_by="strategist",
motivating_hypothesis=hid,
expected_evidence_type="supports",
round_number=2,
context={"source_id": "src-ios-chan"},
)
lead = next(l for l in graph.leads if l.id == lid)
assert lead.proposed_by == "strategist"
assert lead.motivating_hypothesis == hid
assert lead.expected_evidence_type == "supports"
assert lead.round_number == 2
@pytest.mark.asyncio
async def test_strategist_lead_idempotency(self):
"""Same (motivating_hyp, expected_type, target_agent, source_id) from
the strategist should NOT create a duplicate while pending.
"""
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
first = await graph.add_lead(
target_agent="filesystem", description="probe X",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=1,
context={"source_id": "src-A"},
)
again = await graph.add_lead(
target_agent="filesystem", description="probe X (rephrased)",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=2,
context={"source_id": "src-A"},
)
assert first == again
assert len(graph.leads) == 1
@pytest.mark.asyncio
async def test_non_strategist_leads_not_deduped(self):
"""Phase 1 worker leads (proposed_by != 'strategist') should NOT be
deduped — agents can legitimately propose the same kind of lead
multiple times from different contexts.
"""
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
a = await graph.add_lead(
target_agent="filesystem", description="x", proposed_by="filesystem",
motivating_hypothesis=hid, expected_evidence_type="supports",
)
b = await graph.add_lead(
target_agent="filesystem", description="x", proposed_by="filesystem",
motivating_hypothesis=hid, expected_evidence_type="supports",
)
assert a != b
assert len(graph.leads) == 2
@pytest.mark.asyncio
async def test_lead_from_old_state_file_loads_with_defaults(self, tmp_path):
"""Forward-compat: a state file written before the strategist fields
existed must still load. The new fields take their defaults.
"""
legacy = {
"id": "lead-legacy01",
"target_agent": "filesystem",
"description": "old-style lead",
"priority": 5,
"context": {},
"status": "pending",
"hypothesis_id": None,
}
lead = Lead.from_dict(legacy)
assert lead.proposed_by == ""
assert lead.motivating_hypothesis == ""
assert lead.round_number == 0
class TestInvestigationRound:
"""The InvestigationRound provenance node + start/complete lifecycle."""
@pytest.mark.asyncio
async def test_round_lifecycle_captures_before_and_after(self):
graph = EvidenceGraph()
h1 = await graph.add_hypothesis("h1", "d")
h2 = await graph.add_hypothesis("h2", "d")
rid = await graph.start_investigation_round(1)
r = graph.get_investigation_round(rid)
assert r is not None
assert r.round_number == 1
assert r.hypothesis_status_snapshot_before == {h1: "active", h2: "active"}
assert r.phenomena_count_before == 0
assert r.completed_at == ""
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", "found something", "interp", source_tool="t",
)
await graph.update_hypothesis_confidence(h1, pid, "direct_evidence", "")
closed = await graph.complete_investigation_round(
rid, strategist_action="propose_leads",
decision_rationale="found new evidence for h1",
)
assert closed is not None
assert closed.completed_at != ""
assert closed.hypothesis_status_snapshot_after[h1] == "supported"
assert closed.hypothesis_status_snapshot_after[h2] == "active"
assert closed.new_phenomena_count == 1
assert closed.new_edges_count == 1
assert closed.status_flips == 1
@pytest.mark.asyncio
async def test_complete_round_is_idempotent(self):
graph = EvidenceGraph()
rid = await graph.start_investigation_round(1)
first = await graph.complete_investigation_round(rid)
await graph.add_phenomenon("fs", "x", "y", "z", source_tool="t")
second = await graph.complete_investigation_round(rid)
assert first is second
assert first.phenomena_count_after == 0
@pytest.mark.asyncio
async def test_leads_from_round_filters_correctly(self):
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
await graph.add_lead(
target_agent="filesystem", description="r1 lead",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=1,
)
await graph.add_lead(
target_agent="filesystem", description="r2 lead",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=2,
context={"source_id": "src-different"},
)
await graph.add_lead(
target_agent="ios_artifact", description="phase 1 finding",
proposed_by="filesystem", round_number=0,
)
r1 = graph.leads_from_round(1)
r2 = graph.leads_from_round(2)
r0 = graph.leads_from_round(0)
assert len(r1) == 1 and r1[0].description == "r1 lead"
assert len(r2) == 1 and r2[0].description == "r2 lead"
assert len(r0) == 1 and r0[0].proposed_by == "filesystem"
@pytest.mark.asyncio
async def test_round_persistence_round_trip(self, tmp_path):
"""Investigation rounds must survive save/load."""
path = tmp_path / "state.json"
graph = EvidenceGraph(persist_path=path)
hid = await graph.add_hypothesis("h", "d")
rid = await graph.start_investigation_round(1)
await graph.complete_investigation_round(
rid, decision_rationale="probe complete",
)
loaded = EvidenceGraph.load_state(path)
assert len(loaded.investigation_rounds) == 1
r = loaded.investigation_rounds[0]
assert r.id == rid
assert r.decision_rationale == "probe complete"
assert hid in r.hypothesis_status_snapshot_before
@pytest.mark.asyncio
async def test_strategy_tool_helpers(self):
"""Smoke-test the strategy tool renders against a small graph.
Renders are markdown strings — we assert structural anchors rather
than full text so the test is robust to wording tweaks.
"""
from tools import strategy
from case import Case, EvidenceSource
graph = EvidenceGraph()
src = EvidenceSource(
id="src-test", label="test iOS", type="mobile_extraction",
access_mode="tree", path="/tmp/x",
)
graph.case = Case(case_id="c", name="c", sources=[src])
graph.set_active_source(src)
graph._current_agent = "ios_artifact"
graph._current_task_id = "task-1"
# graph_overview on empty graph still renders.
ov = strategy.graph_overview(graph)
assert "# Investigation State" in ov
assert "_(none yet" in ov # hypotheses section
assert "src-test" in ov
# Source coverage — every entry should be ✗ initially.
cov = strategy.source_coverage(graph, "src-test")
assert "Coverage: **0/" in cov
assert "" in cov
assert "Coverage hints are heuristics" in cov
# Record one invocation that matches the AddressBook detector.
await graph.record_tool_invocation(
tool="sqlite_query",
args={"db_path": "/x/var/mobile/Library/AddressBook/AddressBook.sqlitedb"},
output="contact list",
)
cov2 = strategy.source_coverage(graph, "src-test")
assert "Coverage: **1/" in cov2 or "Coverage: **2/" in cov2
# marginal_yield: no rounds → empty render.
my = strategy.marginal_yield(graph)
assert "no completed investigation rounds" in my
# budget_status with no budgets shows unbounded.
bs = strategy.budget_status(graph, None, None)
assert "tool_calls" in bs
assert "(unbounded)" in bs
# budget_status with budgets + pacing hint.
bs2 = strategy.budget_status(
graph,
{"tool_calls_total": 1, "strategist_rounds_max": 1},
None,
)
assert "≥ 90%" in bs2 # already over 90% (1 of 1 tool calls used)
@pytest.mark.asyncio
async def test_strategist_agent_registers_correct_toolset(self):
"""Strategist gets read-only graph queries + the 6 strategy tools;
crucially NO graph-write tools (no add_phenomenon, observe_identity,
link_to_entity, add_hypothesis, add_temporal_edge).
"""
from tool_registry import register_all_tools
from agent_factory import AgentFactory
from llm_client import LLMClient
graph = EvidenceGraph()
register_all_tools(graph)
llm = LLMClient.__new__(LLMClient)
factory = AgentFactory(llm, graph)
agent = factory.get_or_create_agent("strategist")
agent._register_graph_tools()
registered = set(agent._tools.keys())
assert {
"graph_overview", "source_coverage", "marginal_yield",
"budget_status", "propose_lead", "declare_investigation_complete",
} <= registered
assert {"list_phenomena", "get_phenomenon", "search_graph"} <= registered
forbidden = {
"add_phenomenon", "observe_identity", "link_to_entity",
"add_hypothesis", "add_temporal_edge", "add_lead",
}
leaked = registered & forbidden
assert not leaked, f"Strategist must not have write tools: {leaked}"
def test_strategist_terminal_tool_is_declare_complete(self):
"""The strategist class declares declare_investigation_complete as
its terminal tool — the tool_call_loop must short-circuit on that
call (verified at the LLM client level by an existing test).
"""
from agents.strategist import InvestigationStrategist
assert InvestigationStrategist.terminal_tools == ("declare_investigation_complete",)
assert "propose_lead" in InvestigationStrategist.mandatory_record_tools
assert "declare_investigation_complete" in InvestigationStrategist.mandatory_record_tools
@pytest.mark.asyncio
async def test_propose_lead_validates_hypothesis_id(self):
"""propose_lead must reject leads whose motivating_hypothesis isn't
actually a registered hypothesis — that's a strategist hallucination
analogous to citing a bogus invocation_id.
"""
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-1"
graph.current_strategist_round = 1
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="probe X",
target_agent="filesystem",
motivating_hypothesis="hyp-does-not-exist",
expected_evidence_type="supports",
)
assert "not in graph.hypotheses" in result
assert not graph.leads
@pytest.mark.asyncio
async def test_propose_lead_creates_strategist_lead(self):
"""propose_lead happy path writes a strategist-attributed lead
tagged with the current round_number."""
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-2"
graph.current_strategist_round = 3
hid = await graph.add_hypothesis("h", "d")
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="check Safari bookmarks",
target_agent="ios_artifact",
motivating_hypothesis=hid,
expected_evidence_type="supports",
rationale="single-source hypothesis needs corroboration",
)
assert "proposed" in result
lead = graph.leads[0]
assert lead.proposed_by == "strategist"
assert lead.motivating_hypothesis == hid
assert lead.round_number == 3
assert lead.expected_evidence_type == "supports"
@pytest.mark.asyncio
async def test_propose_lead_rejects_invalid_evidence_type(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-3"
graph.current_strategist_round = 1
hid = await graph.add_hypothesis("h", "d")
register_all_tools(graph)
td = TOOL_CATALOG["propose_lead"]
result = await td.executor(
description="x", target_agent="filesystem",
motivating_hypothesis=hid,
expected_evidence_type="bogus_type",
)
assert "not one of" in result
assert not graph.leads
@pytest.mark.asyncio
async def test_declare_complete_flips_request_flag(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-4"
graph.current_strategist_round = 5
register_all_tools(graph)
td = TOOL_CATALOG["declare_investigation_complete"]
assert graph.strategist_complete_requested is False
result = await td.executor(
reason="marginal_yield_zero",
rationale="two rounds with 0 yield",
)
assert graph.strategist_complete_requested is True
assert "round 5" in result
assert "marginal_yield_zero" in result
@pytest.mark.asyncio
async def test_declare_complete_rejects_bogus_reason(self):
from tool_registry import register_all_tools, TOOL_CATALOG
graph = EvidenceGraph()
graph._current_agent = "strategist"
graph._current_task_id = "task-strat-5"
register_all_tools(graph)
td = TOOL_CATALOG["declare_investigation_complete"]
result = await td.executor(reason="i_just_want_to_quit")
assert "not in" in result
assert graph.strategist_complete_requested is False
@pytest.mark.asyncio
async def test_resume_repairs_open_round(self, tmp_path):
"""Simulate a crash mid-round: half-open InvestigationRound + a
lead in 'assigned' state. _resume_strategist_state must close the
round and re-mark the lead as failed."""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
rid = await graph.start_investigation_round(1)
lid = await graph.add_lead(
target_agent="filesystem", description="probe",
proposed_by="strategist", motivating_hypothesis=hid,
expected_evidence_type="supports", round_number=1,
)
lead = next(l for l in graph.leads if l.id == lid)
lead.status = "assigned"
llm = AsyncMock()
class FakeFactory:
def get_or_create_agent(self, name):
return AsyncMock()
orch = Orchestrator(llm, graph, FakeFactory(), config={})
next_round = await orch._resume_strategist_state()
round_ = graph.get_investigation_round(rid)
assert round_.completed_at != ""
assert round_.strategist_action == "interrupted_resume"
lead2 = next(l for l in graph.leads if l.id == lid)
assert lead2.status == "failed"
assert "interrupted" in lead2.context.get("failure_reason", "")
assert next_round == 2
@pytest.mark.asyncio
async def test_resume_state_is_idempotent_on_clean_graph(self):
"""No prior rounds → resume returns 1, no changes."""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
graph = EvidenceGraph()
llm = AsyncMock()
class FakeFactory:
def get_or_create_agent(self, name):
return AsyncMock()
orch = Orchestrator(llm, graph, FakeFactory(), config={})
result = await orch._resume_strategist_state()
assert result == 1
assert graph.investigation_rounds == []
@pytest.mark.asyncio
async def test_strategist_loop_exits_on_declare_complete(self):
"""Mock strategist that declares complete in round 1 — orchestrator
must exit the Phase 3 loop without dispatching any worker."""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
graph = EvidenceGraph()
llm = AsyncMock()
worker_runs: list[str] = []
class FakeStrategist:
name = "strategist"
async def run(self, task, lead_id=None):
graph.strategist_complete_requested = True
return "complete"
class FakeFactory:
def __init__(self):
self._instances = {"strategist": FakeStrategist()}
def get_or_create_agent(self, name):
return self._instances.get(name)
orch = Orchestrator(llm, graph, FakeFactory(), config={
"strategist": {"enabled": True, "max_rounds": 5},
})
await orch._phase3_strategist_loop()
assert len(graph.investigation_rounds) == 1
r = graph.investigation_rounds[0]
assert r.strategist_action == "declare_complete"
assert r.completed_at != ""
assert worker_runs == []
@pytest.mark.asyncio
async def test_strategist_loop_dispatches_lead_then_completes(self):
"""Strategist proposes 1 lead in round 1, declares complete in round 2.
Loop must dispatch the worker for the lead, then exit cleanly.
"""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
from case import Case, EvidenceSource
graph = EvidenceGraph()
src = EvidenceSource(id="src-A", label="A", type="disk_image",
access_mode="image", path="/tmp/x")
graph.case = Case(case_id="c", name="n", sources=[src])
graph.set_active_source(src)
hid = await graph.add_hypothesis("h", "d")
llm = AsyncMock()
worker_calls: list[tuple[str, str]] = []
class FakeStrategist:
name = "strategist"
def __init__(self):
self.round = 0
async def run(self, task, lead_id=None):
self.round += 1
if self.round == 1:
await graph.add_lead(
target_agent="filesystem",
description="probe X",
proposed_by="strategist",
motivating_hypothesis=hid,
expected_evidence_type="supports",
round_number=graph.current_strategist_round,
)
else:
graph.strategist_complete_requested = True
return "ok"
class FakeWorker:
name = "filesystem"
async def run(self, task, lead_id=None):
worker_calls.append((self.name, lead_id))
return "did the thing"
class FakeFactory:
def __init__(self):
self.s = FakeStrategist()
self.w = FakeWorker()
def get_or_create_agent(self, name):
if name == "strategist": return self.s
return self.w
orch = Orchestrator(llm, graph, FakeFactory(), config={
"strategist": {"enabled": True, "max_rounds": 5,
"hard_stop_marginal_yield_zero_rounds": 99},
})
await orch._phase3_strategist_loop()
assert len(graph.investigation_rounds) == 2
assert graph.investigation_rounds[0].strategist_action == "propose_leads"
assert graph.investigation_rounds[1].strategist_action == "declare_complete"
assert len(worker_calls) == 1
assert worker_calls[0][0] == "filesystem"
leads = [l for l in graph.leads if l.proposed_by == "strategist"]
assert len(leads) == 1
assert leads[0].status == "completed"
@pytest.mark.asyncio
async def test_strategist_loop_hard_stop_on_zero_yield(self):
"""If the strategist insists on more rounds but yield stays zero for
N consecutive rounds, the orchestrator force-stops as a safety net."""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
graph = EvidenceGraph()
llm = AsyncMock()
class FakeStrategist:
name = "strategist"
async def run(self, task, lead_id=None):
hid_local = next(iter(graph.hypotheses)) if graph.hypotheses else None
await graph.add_lead(
target_agent="filesystem", description="probe",
proposed_by="strategist",
motivating_hypothesis=hid_local or "",
expected_evidence_type="supports",
round_number=graph.current_strategist_round,
)
class FakeWorker:
name = "filesystem"
async def run(self, task, lead_id=None):
return ""
class FakeFactory:
def __init__(self):
self.s = FakeStrategist()
self.w = FakeWorker()
def get_or_create_agent(self, name):
return self.s if name == "strategist" else self.w
hid = await graph.add_hypothesis("h", "d")
orch = Orchestrator(llm, graph, FakeFactory(), config={
"strategist": {
"enabled": True, "max_rounds": 20,
"hard_stop_marginal_yield_zero_rounds": 2,
},
})
await orch._phase3_strategist_loop()
assert len(graph.investigation_rounds) == 2
@pytest.mark.asyncio
async def test_strategist_loop_budget_exhaustion_stops_loop(self):
"""Hard budget cap on tool_calls_total kills the loop even when the
strategist wants to continue."""
from unittest.mock import AsyncMock
from orchestrator import Orchestrator
graph = EvidenceGraph()
llm = AsyncMock()
# Pre-stuff the invocations log so we're already past the cap.
await graph.record_tool_invocation(
tool="probe", args={}, output="x",
)
await graph.record_tool_invocation(
tool="probe", args={}, output="y",
)
class FakeStrategist:
name = "strategist"
async def run(self, task, lead_id=None):
hid_local = next(iter(graph.hypotheses)) if graph.hypotheses else ""
await graph.add_lead(
target_agent="filesystem", description="x",
proposed_by="strategist",
motivating_hypothesis=hid_local,
expected_evidence_type="supports",
round_number=graph.current_strategist_round,
)
class FakeWorker:
name = "filesystem"
async def run(self, task, lead_id=None):
await graph.record_tool_invocation(
tool="probe", args={}, output="z",
)
class FakeFactory:
def __init__(self):
self.s = FakeStrategist()
self.w = FakeWorker()
def get_or_create_agent(self, name):
return self.s if name == "strategist" else self.w
hid = await graph.add_hypothesis("h", "d")
orch = Orchestrator(llm, graph, FakeFactory(), config={
"strategist": {"enabled": True, "max_rounds": 99,
"hard_stop_marginal_yield_zero_rounds": 99},
"budgets": {"tool_calls_total": 2},
})
await orch._phase3_strategist_loop()
assert len(graph.investigation_rounds) == 1
@pytest.mark.asyncio
async def test_marginal_yield_after_two_rounds(self):
"""Verify marginal_yield captures phenomena/edge/status deltas."""
from tools import strategy
graph = EvidenceGraph()
hid = await graph.add_hypothesis("h", "d")
rid1 = await graph.start_investigation_round(1)
pid, _ = await graph.add_phenomenon(
"fs", "filesystem", "p1", "interp", source_tool="t",
)
await graph.update_hypothesis_confidence(hid, pid, "direct_evidence", "")
await graph.complete_investigation_round(rid1)
rid2 = await graph.start_investigation_round(2)
await graph.complete_investigation_round(rid2)
out = strategy.marginal_yield(graph, last_n_rounds=2)
assert "R1" in out and "R2" in out
assert "Trend" in out
assert ("collapsed" in out or "Decelerating" in out
or "Diminishing" in out or "diminishing" in out)

View File

@@ -24,6 +24,7 @@ from tools import mobile_ios as ios
from tools import parsers
from tools import registry as reg
from tools import sleuthkit as tsk
from tools import strategy as strat
logger = logging.getLogger(__name__)
@@ -985,6 +986,271 @@ def register_all_tools(graph: Any) -> None:
tags=["media", "ocr", "image"],
)
# ---- Strategist-loop view tools (DESIGN_STRATEGIST.md §2) ----
# Pure read-only renders over graph state. The strategist agent uses
# these to decide whether to keep investigating or to declare complete.
# They go through invocation logging like every other tool (so the
# strategist's reads are auditable) but are NOT cacheable — graph
# state changes between calls and a stale snapshot would mislead.
async def _exec_graph_overview() -> str:
return strat.graph_overview(graph)
TOOL_CATALOG["graph_overview"] = ToolDefinition(
name="graph_overview",
description=(
"Top-level investigation state: hypotheses (with log-odds, "
"confidence, edges_in, distinct_sources contributing, recent "
"status flips), sources (phenomena/identity counts, last-touched "
"round), and pending leads. Always call this first when deciding "
"the next strategist action."
),
input_schema={"type": "object", "properties": {}},
executor=_exec_graph_overview,
module="strategy",
tags=["strategy", "overview", "read-only"],
)
async def _exec_source_coverage(source_id: str) -> str:
return strat.source_coverage(graph, source_id)
TOOL_CATALOG["source_coverage"] = ToolDefinition(
name="source_coverage",
description=(
"Per-source artefact coverage report: which expected categories "
"have been touched (✓) vs not (✗) on the given source. Coverage "
"items are heuristic hints, not requirements — investigate ✗ "
"items only when an active hypothesis depends on them."
),
input_schema={
"type": "object",
"properties": {
"source_id": {"type": "string", "description": "Source id, e.g. 'src-ios-chan'."},
},
"required": ["source_id"],
},
executor=_exec_source_coverage,
module="strategy",
tags=["strategy", "coverage", "read-only"],
)
async def _exec_marginal_yield(last_n_rounds: int = 2) -> str:
return strat.marginal_yield(graph, int(last_n_rounds))
TOOL_CATALOG["marginal_yield"] = ToolDefinition(
name="marginal_yield",
description=(
"How much information the last N investigation rounds added: "
"new phenomena, new edges, and hypothesis status flips per round. "
"Two consecutive zero-yield rounds means diminishing returns are "
"decisive — declare_investigation_complete with reason "
"marginal_yield_zero."
),
input_schema={
"type": "object",
"properties": {
"last_n_rounds": {"type": "integer", "description": "How many recent rounds to summarise (default 2)."},
},
},
executor=_exec_marginal_yield,
module="strategy",
tags=["strategy", "yield", "read-only"],
)
async def _exec_budget_status() -> str:
return strat.budget_status(
graph,
getattr(graph, "budgets", None),
getattr(graph, "run_start_monotonic", None),
)
TOOL_CATALOG["budget_status"] = ToolDefinition(
name="budget_status",
description=(
"Budget vs caps: tool_calls, strategist_rounds, wall_clock_minutes. "
"Includes pacing hints when usage crosses 70% / 90% thresholds. "
"Use this to decide whether to keep proposing leads or to wind down."
),
input_schema={"type": "object", "properties": {}},
executor=_exec_budget_status,
module="strategy",
tags=["strategy", "budget", "read-only"],
)
# ---- Strategist decision actions (DESIGN_STRATEGIST.md §2.5) ----
# propose_lead is the strategist's tool for "go deeper here";
# declare_investigation_complete is its tool for "we're done".
# Both are required to be in BaseAgent.mandatory_record_tools for the
# strategist subclass so the agent can't return without taking a
# documented decision.
_ALLOWED_EVIDENCE_EDGE_TYPES = (
"direct_evidence", "supports", "contradicts",
"weakens", "prerequisite_met", "consequence_observed",
)
async def _exec_propose_lead(
description: str,
target_agent: str,
motivating_hypothesis: str,
expected_evidence_type: str,
rationale: str = "",
source_id: str = "",
) -> str:
"""Propose a new lead from the strategist. Idempotent on the
(motivating_hypothesis, expected_evidence_type, target_agent,
source_id) tuple within a single run.
"""
# Validate refs early so the strategist gets a fast, specific error.
if motivating_hypothesis and motivating_hypothesis not in graph.hypotheses:
return (
f"Error: motivating_hypothesis {motivating_hypothesis!r} is "
f"not in graph.hypotheses. Call graph_overview to see the "
f"current hypothesis ids."
)
if expected_evidence_type not in _ALLOWED_EVIDENCE_EDGE_TYPES:
return (
f"Error: expected_evidence_type {expected_evidence_type!r} is "
f"not one of {list(_ALLOWED_EVIDENCE_EDGE_TYPES)}."
)
if source_id:
src_obj = graph.case.get_source(source_id) if graph.case else None
if src_obj is None:
return (
f"Error: source_id {source_id!r} is not in the case. "
f"Valid ids: {[s.id for s in (graph.case.sources if graph.case else [])]}"
)
lid = await graph.add_lead(
target_agent=target_agent,
description=description,
proposed_by="strategist",
motivating_hypothesis=motivating_hypothesis,
expected_evidence_type=expected_evidence_type,
round_number=graph.current_strategist_round,
hypothesis_id=motivating_hypothesis or None,
context={"source_id": source_id, "rationale": rationale} if source_id or rationale else {},
)
return (
f"Lead {lid} proposed: target_agent={target_agent}, "
f"motivating_hypothesis={motivating_hypothesis}, "
f"expected={expected_evidence_type}, source={source_id or ''}."
)
TOOL_CATALOG["propose_lead"] = ToolDefinition(
name="propose_lead",
description=(
"Propose a specific investigation lead that will be dispatched "
"after this strategist round. Each lead MUST name a motivating "
"hypothesis it expects to move and the kind of edge it expects "
"to produce. Do NOT propose a lead that just adds more same-"
"direction evidence to an already-supported hypothesis — harmonic "
"damping makes repeats cheap. DO propose leads when (a) a "
"hypothesis is supported only by one source — get cross-source "
"corroboration; (b) a hypothesis is in the active band — give it "
"the deciding evidence; (c) a high-value artefact is uncovered on "
"a source where an active hypothesis suggests it matters. "
"Idempotent on (motivating_hypothesis, expected_evidence_type, "
"target_agent, source_id) — re-proposing the same triple while "
"pending is a no-op that returns the existing lead's id."
),
input_schema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "1-2 sentence specific investigation request, including target source/artefact.",
},
"target_agent": {
"type": "string",
"enum": [
"filesystem", "registry", "communication", "network",
"ios_artifact", "android_artifact", "media",
"hypothesis", "timeline",
],
"description": "Which worker agent should pick this up.",
},
"source_id": {
"type": "string",
"description": "Which evidence source to investigate (e.g. 'src-ios-chan'). Optional for cross-source leads.",
},
"motivating_hypothesis": {
"type": "string",
"description": "hyp-id this lead is meant to corroborate or refute.",
},
"expected_evidence_type": {
"type": "string",
"enum": list(_ALLOWED_EVIDENCE_EDGE_TYPES),
"description": "What kind of P→H edge you expect this lead to produce.",
},
"rationale": {
"type": "string",
"description": "Why this fills a real gap — referenced in audit + worker prompt.",
},
},
"required": [
"description", "target_agent",
"motivating_hypothesis", "expected_evidence_type",
],
},
executor=_exec_propose_lead,
module="strategy",
tags=["strategy", "lead", "decision"],
)
_COMPLETE_REASONS = (
"marginal_yield_zero", "budget_exhausted",
"all_hypotheses_resolved", "coverage_saturated", "other",
)
async def _exec_declare_investigation_complete(
reason: str, rationale: str = "",
) -> str:
"""Terminal strategist action: signal "we're done" to the orchestrator."""
if reason not in _COMPLETE_REASONS:
return (
f"Error: reason {reason!r} not in "
f"{list(_COMPLETE_REASONS)}."
)
graph.strategist_complete_requested = True
return (
f"Investigation marked complete in round "
f"{graph.current_strategist_round}. reason={reason}. "
f"rationale={rationale or '(none)'}. The orchestrator will exit "
f"the strategist loop after this round."
)
TOOL_CATALOG["declare_investigation_complete"] = ToolDefinition(
name="declare_investigation_complete",
description=(
"Terminal strategist action. Call this when (a) marginal_yield "
"shows zero across 2+ rounds, (b) budget is exhausted, (c) all "
"active hypotheses are resolved, or (d) coverage is saturated "
"with respect to the active hypotheses. After this call, the "
"orchestrator finishes the strategist loop and proceeds to "
"Phase 4 (timeline) and Phase 5 (report). The current round's "
"in-flight work still completes."
),
input_schema={
"type": "object",
"properties": {
"reason": {
"type": "string",
"enum": list(_COMPLETE_REASONS),
"description": "Termination cause — picked from a closed set so the audit log is consistent.",
},
"rationale": {
"type": "string",
"description": "Free-text justification — quoted into the InvestigationRound's decision_rationale.",
},
},
"required": ["reason"],
},
executor=_exec_declare_investigation_complete,
module="strategy",
tags=["strategy", "terminal", "decision"],
)
# ---- Wrap every executor with invocation logging (+ cache + auto-record) ----
# Must run AFTER all tools are registered. Every tool call now produces
# a ToolInvocation entry on the graph (provenance for grounding), and

485
tools/strategy.py Normal file
View File

@@ -0,0 +1,485 @@
"""Strategist-loop tools — read-only views over graph state that let the
InvestigationStrategist agent decide whether to keep investigating or to
declare the investigation complete.
DESIGN_STRATEGIST.md §2. Four read-only views:
graph_overview() → hypotheses + sources + pending leads snapshot
source_coverage(src_id) → which artefact categories on this source have
been touched vs are still ✗
marginal_yield(n_rounds) → how much information the last N rounds added
budget_status() → tool calls / rounds / wall-clock against caps
These are pure render functions over the graph — they MUST NOT mutate state.
The strategist never writes phenomena/edges directly; all graph mutations
happen through worker agents that the strategist dispatches via propose_lead
(which is registered separately in tool_registry).
"""
from __future__ import annotations
import time
from typing import Any
# ---------------------------------------------------------------------------
# Expected artefact catalogue (per source type)
#
# These are SOFT HINTS — items the strategist might want to check on a given
# source type if any active hypothesis depends on them. The catalogue is
# intentionally compact; expand it in-place when a new forensic specialty
# joins the toolset. Each entry:
#
# name human-readable artefact category
# detector how to recognise that this category has been touched — either
# a tool name OR a `<tool>@<path-substring>` pattern, joined with
# `|` for alternatives. The matcher is substring on the tool name
# and on the args' string representation.
# value_for one-line description of why this category might matter
# ---------------------------------------------------------------------------
EXPECTED_ARTEFACTS: dict[str, list[dict[str, str]]] = {
"disk_image+windows": [
{"name": "partition layout", "detector": "partition_info|mmls",
"value_for": "deleted files, hidden partitions"},
{"name": "filesystem walk", "detector": "list_directory|fls",
"value_for": "directory tree, recoverable deleted entries"},
{"name": "registry hives", "detector": "parse_registry_key|list_installed_software|get_user_activity",
"value_for": "installed software, user activity, timezone"},
{"name": "browser history", "detector": "list_directory@AppData|read_text_file@History|read_text_file@Bookmarks",
"value_for": "URL access, downloads, web search terms"},
{"name": "prefetch", "detector": "parse_prefetch|extract_file@Prefetch",
"value_for": "program execution evidence"},
{"name": "email/IM config", "detector": "get_email_config",
"value_for": "user accounts, configured mail/IM clients"},
{"name": "recycle bin", "detector": "list_directory@$Recycle|count_deleted_files",
"value_for": "deleted file metadata and recovery"},
],
"disk_image+android": [
{"name": "partition probe", "detector": "probe_android_partitions",
"value_for": "discover EFS / SYSTEM / USERDATA layout"},
{"name": "system properties", "detector": "read_text_file@build.prop|read_text_file@default.prop",
"value_for": "device model, OS version, CSC region"},
{"name": "app inventory", "detector": "list_directory@data/app|list_directory@data/data",
"value_for": "installed apps, package names"},
{"name": "user data dbs", "detector": "list_directory@data/data|sqlite_query",
"value_for": "messages, contacts, app-specific data"},
{"name": "device identity", "detector": "search_strings@imei|search_strings@serial|search_strings@DRI",
"value_for": "IMEI, serial, device fingerprint"},
],
"mobile_extraction": [
{"name": "device info", "detector": "read_idevice_info|read_text_file@iDevice_info",
"value_for": "model, iOS version, IMEI, ICCID, Bluetooth MAC, UDID"},
{"name": "AddressBook", "detector": "sqlite_query@AddressBook.sqlitedb",
"value_for": "contacts, owner identity"},
{"name": "SMS / iMessage", "detector": "sqlite_query@sms.db",
"value_for": "messaging content, OTP / verification codes"},
{"name": "WhatsApp messages", "detector": "sqlite_query@ChatStorage.sqlite|sqlite_query@WhatsApp",
"value_for": "WhatsApp content, group membership, call records"},
{"name": "WeChat", "detector": "sqlite_query@MM.sqlite|sqlite_query@wcdb|list_directory@WeChat",
"value_for": "WeChat IDs, messages, follow targets"},
{"name": "Call history", "detector": "sqlite_query@CallHistory|sqlite_query@call_history",
"value_for": "incoming/outgoing call log"},
{"name": "Safari history", "detector": "sqlite_query@History.db|read_text_file@Bookmarks.plist|parse_plist@Bookmarks",
"value_for": "URL access, bookmarks, search queries"},
{"name": "Photos library", "detector": "sqlite_query@Photos.sqlite|parse_plist@Photos",
"value_for": "photo metadata, EXIF, geolocation, source app"},
{"name": "iCloud accounts", "detector": "parse_plist@Accounts3|parse_ios_keychain",
"value_for": "Apple ID, registered services, authentication tokens"},
{"name": "app inventory", "detector": "list_directory@Bundle/Application|list_directory@Containers",
"value_for": "installed apps, app-specific containers"},
{"name": "Wi-Fi history", "detector": "parse_plist@com.apple.wifi|read_text_file@known_networks",
"value_for": "connected SSIDs, keys, first/last seen times"},
],
"media_collection": [
{"name": "archive unpack", "detector": "unzip_archive|list_directory",
"value_for": "extract images / docs for downstream analysis"},
{"name": "OCR text", "detector": "ocr_image",
"value_for": "screenshot text content (chat, transaction, IDs)"},
{"name": "metadata", "detector": "read_binary_preview|search_strings",
"value_for": "EXIF, embedded timestamps, device fingerprints"},
],
"archive": [
{"name": "archive unpack", "detector": "unzip_archive",
"value_for": "expose contents for further analysis"},
],
}
def _key_for_source(src) -> str:
"""Return the EXPECTED_ARTEFACTS key for a source: 'disk_image+platform'
when platform is set in meta, otherwise just the source type."""
src_type = getattr(src, "type", "")
if src_type == "disk_image":
platform = (getattr(src, "meta", {}) or {}).get("platform", "").lower()
if platform:
return f"disk_image+{platform}"
return src_type
def _detector_matches(detector: str, tool_name: str, args_str: str) -> bool:
"""Return True if any '|'-separated branch of `detector` matches.
A branch like ``sqlite_query@AddressBook.sqlitedb`` requires both the
tool name (substring) AND the args (substring) to match. A branch like
``parse_prefetch`` is a tool-name-only check.
"""
for branch in detector.split("|"):
branch = branch.strip()
if not branch:
continue
if "@" in branch:
t, sub = branch.split("@", 1)
if t in tool_name and sub.lower() in args_str.lower():
return True
else:
if branch in tool_name:
return True
return False
# ---------------------------------------------------------------------------
# graph_overview()
# ---------------------------------------------------------------------------
def graph_overview(graph) -> str:
"""Render hypotheses + sources + pending leads as the strategist's
primary decision view.
Annotates each hypothesis with the count of distinct sources that
contribute supporting (positive-LR) edges. A hypothesis with many edges
but only one source is a strategist signal to seek cross-source
corroboration.
"""
lines: list[str] = ["# Investigation State", ""]
# Hypotheses table.
if graph.hypotheses:
lines.append(f"## Hypotheses ({len(graph.hypotheses)})")
lines.append("")
lines.append(
"| id | title | L | conf | status | edges_in | distinct_sources | recent_flip |"
)
lines.append("|----|-------|---|------|--------|---------:|-----------------:|--------------|")
# Sort by absolute log-odds magnitude descending so the strategist
# sees the most decided hypotheses first; active ones float to the
# middle of the table where decisions matter most.
for hid, h in sorted(
graph.hypotheses.items(),
key=lambda kv: (kv[1].status != "active", -abs(kv[1].log_odds)),
):
in_edges = graph._adj_rev.get(hid, [])
edges_in = len(in_edges)
# Distinct sources contributing edges (looked up via source
# phenomenon's source_id; entity→entity edges have no source).
distinct_sources: set[str] = set()
for e in in_edges:
src_node = graph.phenomena.get(e.source_id)
if src_node is not None and src_node.source_id:
distinct_sources.add(src_node.source_id)
# Did this hypothesis's status change in the last 2 rounds?
recent = "no"
recent_rounds = graph.investigation_rounds[-2:]
for r in recent_rounds:
before = r.hypothesis_status_snapshot_before.get(hid)
after = r.hypothesis_status_snapshot_after.get(hid)
if before and after and before != after:
recent = f"yes ({before}{after} in R{r.round_number})"
break
title = (h.title or "")[:60].replace("|", "/")
lines.append(
f"| {hid[:14]} | {title} | {h.log_odds:+.2f} | "
f"{h.confidence:.2f} | {h.status} | {edges_in} | "
f"{len(distinct_sources)} | {recent} |"
)
lines.append("")
else:
lines.append("## Hypotheses\n\n_(none yet — Phase 2 has not produced any)_\n")
# Sources table.
if graph.case and graph.case.sources:
lines.append(f"## Sources ({len(graph.case.sources)})")
lines.append("")
lines.append(
"| id | type | phenomena | identities | last_touched_in_round |"
)
lines.append("|----|------|----------:|-----------:|----------------------|")
for src in graph.case.sources:
ph_count = sum(
1 for p in graph.phenomena.values() if p.source_id == src.id
)
id_count = sum(
1 for e in graph.entities.values()
for i in e.identifiers
if any(
p.source_id == src.id
for p in graph.phenomena.values()
if p.id == i.get("phenomenon_id")
)
)
# Latest round in which a tool invocation was made against this src.
last_r = ""
for r in reversed(graph.investigation_rounds):
if r.new_phenomena_count > 0:
# Heuristic: if any phenomenon created during this round
# was on this source, mark this round as the last touch.
in_round = [
p for p in graph.phenomena.values()
if p.source_id == src.id
and r.started_at <= p.created_at
and (not r.completed_at or p.created_at <= r.completed_at)
]
if in_round:
last_r = f"R{r.round_number}"
break
lines.append(
f"| {src.id} | {src.type} | {ph_count} | {id_count} | {last_r} |"
)
lines.append("")
# Pending leads.
pending = [l for l in graph.leads if l.status == "pending"]
if pending:
lines.append(f"## Pending Leads ({len(pending)})")
lines.append("")
lines.append("| id | from | target_agent | for_hypothesis | description |")
lines.append("|----|------|--------------|----------------|-------------|")
for l in pending[:20]:
desc = (l.description or "")[:80].replace("|", "/")
mh = l.motivating_hypothesis or l.hypothesis_id or ""
lines.append(
f"| {l.id} | {l.proposed_by or ''} | {l.target_agent} | "
f"{mh[:14] if mh != '' else ''} | {desc} |"
)
if len(pending) > 20:
lines.append(f"\n_(+{len(pending) - 20} more pending leads not shown)_")
lines.append("")
else:
lines.append("## Pending Leads\n\n_(none — no investigations queued)_\n")
# Interpretation hint at the end, plain English.
lines.append("---")
lines.append(
"**Interpretation hints**: A hypothesis with many edges but only one "
"distinct_source has fragile cross-source independence — a single "
"edge from a *different* source would do more for it than another "
"edge from the same source (harmonic damping makes repeats cheap). "
"Hypotheses in the active band (0.2 < conf < 0.8) are the ones a "
"well-targeted lead can flip. recent_flip = 'yes' means belief is "
"still moving on that hypothesis; 'no' across 2 rounds suggests "
"stability."
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# source_coverage(source_id)
# ---------------------------------------------------------------------------
def source_coverage(graph, source_id: str) -> str:
"""Render which expected artefact categories have been touched on
*source_id*, and which remain ✗.
Output is markdown. The closing paragraph reminds the strategist that
coverage hints are heuristics — investigate ✗ items only when an active
hypothesis depends on them. This is the design's central guardrail
against the system devolving into a fixed forensic checklist.
"""
src = graph.case.get_source(source_id) if graph.case else None
if src is None:
return f"Error: source_id {source_id!r} not found in case."
key = _key_for_source(src)
expected = EXPECTED_ARTEFACTS.get(key, [])
# Collect this source's invocation history.
invs = [
inv for inv in graph.tool_invocations.values()
if inv.source_id == source_id
]
# For each expected category, decide ✓ / ✗ + show example invocation if ✓.
rows: list[tuple[str, str, str, str]] = []
for entry in expected:
name = entry["name"]
detector = entry["detector"]
value_for = entry["value_for"]
matched: str | None = None
for inv in invs:
args_str = ""
try:
args_str = " ".join(f"{k}={v}" for k, v in (inv.args or {}).items())
except Exception:
args_str = str(inv.args)
if _detector_matches(detector, inv.tool, args_str):
matched = f"{inv.tool}({args_str[:60]})"
break
mark = "" if matched else ""
evidence = matched or ""
rows.append((mark, name, evidence, value_for))
lines: list[str] = [
f"# Coverage of source `{source_id}` ({src.label})",
"",
f"Source type: `{src.type}` / access_mode: `{src.access_mode}`",
f"Invocations made against this source: **{len(invs)}**",
"",
]
if not expected:
lines.append(
f"_(no expected-artefact catalogue entry for source type `{key}` — "
"coverage cannot be assessed against a baseline)_"
)
else:
lines.append(
"| ✓/✗ | category | example invocation | what it would tell us |"
)
lines.append("|-----|----------|---------------------|------------------------|")
for mark, name, evidence, value_for in rows:
lines.append(
f"| {mark} | {name} | {evidence[:70].replace('|','/')} | {value_for} |"
)
n_covered = sum(1 for r in rows if r[0] == "")
n_total = len(rows)
lines.append("")
lines.append(f"Coverage: **{n_covered}/{n_total}** ({n_covered*100//max(n_total,1)}%)")
# Other invocations on this source that didn't match any expected entry —
# could be genuine novel exploration; strategist might want to know.
lines.append("")
lines.append("---")
lines.append(
"**Coverage hints are heuristics, not requirements.** Skip an item if "
"the case theory makes it irrelevant — a financial-fraud case has no "
"reason to OCR every photo. Investigate ✗ items only when they could "
"materially affect an active hypothesis. If you propose a lead just "
"because something is ✗, the strategist prompt is being misused."
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# marginal_yield(last_n_rounds)
# ---------------------------------------------------------------------------
def marginal_yield(graph, last_n_rounds: int = 2) -> str:
"""Render the last N investigation rounds' yield deltas.
Yield columns:
- new_phenomena: phenomena created during the round
- new_edges: edges (any direction) added during the round
- status_flips: hypotheses whose status changed during the round
A row of zeros means that round didn't move the graph. Two consecutive
such rows is strong evidence of diminishing returns; the strategist
should consider declare_investigation_complete with reason
marginal_yield_zero.
"""
rounds = [r for r in graph.investigation_rounds if r.completed_at]
if not rounds:
return (
"# Marginal Yield\n\n"
"_(no completed investigation rounds yet — yield not applicable)_"
)
recent = rounds[-max(1, last_n_rounds):]
lines = [f"# Marginal Yield (last {len(recent)} of {len(rounds)} rounds)", ""]
lines.append("| round | new_phenomena | new_edges | status_flips |")
lines.append("|-------|--------------:|----------:|-------------:|")
yields: list[tuple[int, int, int]] = []
for r in recent:
yields.append((r.new_phenomena_count, r.new_edges_count, r.status_flips))
lines.append(
f"| R{r.round_number} | {r.new_phenomena_count} | "
f"{r.new_edges_count} | {r.status_flips} |"
)
# Trend interpretation aid.
lines.append("")
if all(y == (0, 0, 0) for y in yields):
trend = (
"Yield is zero across these rounds — diminishing returns are "
"confirmed. Strongly consider declare_investigation_complete "
"(reason: marginal_yield_zero)."
)
elif len(yields) >= 2:
first = yields[0][0] + yields[0][1] + yields[0][2]
last = yields[-1][0] + yields[-1][1] + yields[-1][2]
if last == 0 and first > 0:
trend = (
"Yield collapsed to zero in the most recent round. One more "
"well-targeted probe is reasonable; another zero-yield round "
"after that means stop."
)
elif last < first / 2 and first > 0:
trend = (
f"Decelerating ({last}/{first}"
f"{int(100*last/first)}% of the earlier round). Diminishing "
"returns are accumulating."
)
else:
trend = "Yield is still active — further investigation is paying off."
else:
trend = (
"Only one completed round — too early to call a trend. Run at "
"least one more before considering completion."
)
lines.append(f"**Trend**: {trend}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# budget_status()
# ---------------------------------------------------------------------------
def budget_status(graph, budgets: dict[str, Any] | None, start_time: float | None) -> str:
"""Render budget usage against config.yaml `budgets` block.
Counters:
- tool_calls: len(graph.tool_invocations)
- strategist_rounds: len(graph.investigation_rounds)
- wall_clock_minutes: now - start_time (when start_time is supplied)
"""
budgets = budgets or {}
tool_calls_used = len(graph.tool_invocations)
rounds_used = len(graph.investigation_rounds)
minutes_used: float | None = None
if start_time is not None:
minutes_used = (time.monotonic() - start_time) / 60.0
def _row(name: str, used: float, cap: Any) -> str:
if cap is None:
return f"| {name} | {used:g} | — | (unbounded) |"
pct = (used / cap) * 100 if cap else 0
return f"| {name} | {used:g} | {cap} | {pct:.0f}% |"
lines = ["# Budget Status", ""]
lines.append("| metric | used | cap | pct |")
lines.append("|--------|-----:|----:|----:|")
lines.append(_row("tool_calls", tool_calls_used, budgets.get("tool_calls_total")))
lines.append(_row("strategist_rounds", rounds_used, budgets.get("strategist_rounds_max")))
if minutes_used is not None:
lines.append(_row(
"wall_clock_minutes", round(minutes_used, 1),
budgets.get("wall_clock_minutes_max"),
))
# Pacing hint.
lines.append("")
flags = []
cap_calls = budgets.get("tool_calls_total")
cap_rounds = budgets.get("strategist_rounds_max")
if cap_calls and tool_calls_used / cap_calls >= 0.9:
flags.append("tool_calls budget ≥ 90% used — favour declare_complete")
if cap_rounds and rounds_used / cap_rounds >= 0.7:
flags.append("strategist rounds ≥ 70% used — only propose leads with high expected yield")
if flags:
lines.append("**Budget warnings**:")
for f in flags:
lines.append(f"- {f}")
else:
lines.append(
"Budget room remains. Standard rule: each propose_lead should "
"name a specific hypothesis it expects to move; otherwise skip it."
)
return "\n".join(lines)