Compare commits

2 Commits

Author SHA1 Message Date
BattleTag
6b485b98f7 fix(grounding): auto-rescue hallucinated invocation_id + list real ids in error
First full-case run (runs/2026-05-20T20-15-04/) produced 83 GroundingError
rejections, almost all from a single failure mode: LLM cites a plausible-
looking inv-XXXXXXXX that doesn't exist, while the fact's value is in fact
present verbatim in one of its real tool outputs. The agent knew which
tool it read from, it just mis-typed the citation id.

Two-layer fix in evidence_graph.validate_fact_grounding:

  Layer A (silent heal): when the cited inv-id misses, search the same
  agent / task's invocations for one whose output contains the value
  (strict or normalised substring). If exactly one matches, rewrite
  fact.invocation_id in place and accept. Multi-match is NOT auto-
  rescued — the candidate ids go back to the LLM so it picks deliberately.

  Layer B (informative retry): GroundingError now appends the agent's
  recent invocation ids and a brief tool-call summary, so the LLM has
  the real ids in front of it for the next attempt rather than
  fabricating again from memory.

Both layers preserve the design invariant: the fact's value must still
be present in a real tool output — nothing new can land grounded that
wasn't already verifiable. Cross-agent / cross-task isolation is also
preserved (rescue candidates filtered on agent + task_id).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:14:20 -10:00
BattleTag
81ade8f7ac feat(refit): complete S1-S6 — case abstraction, grounding, log-odds, plugins, coref, multi-source
Consolidates the long-running refit work (DESIGN.md as authoritative spec)
into a single baseline commit. Six stages landed together:

  S1  Case + EvidenceSource abstraction; tools parameterised by source_id
      (case.py, main.py multi-source bootstrap, .bin extension support)
  S2  Grounding gateway in add_phenomenon: verified_facts cite real
      ToolInvocation ids; substring / normalised match enforced; agent +
      task scope checked. Phenomenon.description split into verified_facts
      (grounded) + interpretation (free text). [invocation: inv-xxx]
      prefix on every wrapped tool result so the LLM can cite.
  S3  Confidence as additive log-odds: edge_type → log10(LR) calibration
      table; commutative updates; supported / refuted thresholds derived
      from log_odds; hypothesis × evidence matrix view.
  S4  iOS plugin: unzip_archive + parse_plist / sqlite_tables /
      sqlite_query / parse_ios_keychain / read_idevice_info;
      IOSArtifactAgent; SOURCE_TYPE_AGENTS routing.
  S5  Cross-source entity resolution: typed identifiers on Entity,
      observe_identity gateway, auto coref hypothesis with shared /
      conflicting strong/weak LR edges, reversible same_as edges,
      actor_clusters() view.
  S6  Android partition probe + AndroidArtifactAgent; MediaAgent with
      OCR fallback; orchestrator Phase 1 iterates every analysable
      source; platform-aware get_triage_agent_type; ReportAgent renders
      actor clusters + per-source breakdown.

142 unit tests / 1 skipped — full coverage of the new gateway, log-odds
math, coref hypothesis fall-out, and orchestrator multi-source dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 02:12:10 -10:00
24 changed files with 5310 additions and 244 deletions

305
DESIGN.md Normal file
View File

@@ -0,0 +1,305 @@
# MASForensics 系统改造设计
> 目标:把当前「单台 Windows 磁盘取证」系统改造为能处理**多设备、多行为人、
> 异构证据、需跨源关联**的复杂取证系统。本文是唯一的权威设计文档
> (已合并早先的 `REFIT_PLAN.md` / `RESEARCH_DESIGN.md` 两份草稿)。
>
> 触发本次改造的实际案件2025 美亚杯资格赛 Individual —— 5 份证据
> 1 USB E01、1 安卓整盘 `blk0_sda.bin`、3 份 iOS 提取、1 组交易截图),
> 跨 LEUNG YL / CHAN MH / FUNG CC 至少 3 人。
---
## 1. 设计原则(贯穿全文的不变式)
1. **LLM 提议,代码裁决**。LLM 负责语言/分类/感知;它**不持有案件状态、
不产出数值、不写入未经核验的事实**。所有「真相」在符号层。
2. **每条记录的事实都可从一次工具调用重新推导**。结论可被独立复核。
3. **推理核心与设备类型无关**。设备特定逻辑全部位于「能力插件」中;
支持一种新设备 = 写插件,绝不改核心。
4. **看似不可逆的操作(如实体归并)实为可逆、带证据的论断**,可被推翻。
这四条不是口号——下文每个设计决策都对应其中一条。
---
## 2. 现状问题诊断
| # | 问题 | 位置 | 后果 |
|---|---|---|---|
| P1 | **单镜像假设深植**:工具是闭包绑死 `image_path`,图是单源,主程序只选一个镜像 | `tool_registry.py:148` `register_all_tools``main.py:91-153` | 无法摄取多份证据,无法跨设备关联 |
| P2 | **反幻觉只写在提示词里** | `base_agent.py` system prompt | LLM 一旦不听话,错误事实进入案件记录且**事后无法识别** |
| P3 | **置信度公式无统计含义且有序依赖缺陷**`delta=weight*(1-conf)`(正)/`weight*conf`(负),正负边混合时更新结果与边的到达顺序有关 | `evidence_graph.py:26-33` | 置信度不可校准、不可辩护 |
| P4 | **工件分类是 Windows 专属**:靠 hive 名 / `.pf` / `mirc` 关键词 | `tool_registry.py:80-107` `_auto_categorize` | iOS/安卓工件全部落入 `other` |
| P5 | **案件信息硬编码** `cfreds_hacking_case` | `config.yaml:35-50` | 换案即需改代码 |
| P6 | **镜像发现靠扩展名 glob**`.bin` 不在列表 | `main.py:28` `_IMAGE_GLOBS` | `blk0_sda.bin` 不被发现 |
| P7 | **Phenomenon 无来源标注** | `evidence_graph.py:85` `Phenomenon` | 不知道某发现出自哪台设备,跨源关联无锚点 |
改造同时解决「接入新证据」与「修掉 P1-P7 这些固有缺陷」。
---
## 3. 目标架构
```
case.yaml ──► Case ──► N × EvidenceSource
├ id / type / owner / path
└ access_mode: image | tree
┌──────────────┴───────────────┐
image-backed tree-backed
(TSK, inode 寻址) (路径寻址:已挂载/已解包)
│ │
└────────────┬─────────────────┘
SourceRegistry ── source_id → SourceHandle解析 path/offset/mode
ToolRegistry ── 工具按 access_mode 注册,调用时绑定 source_id
┌──────────────────────┼───────────────────────┐
▼ ▼ ▼
Knowledge-Source Graph Write Gateway ToolInvocationLog
Agents (LLM) ──► (唯一写入口,强制 (每次工具调用留痕:
只能经网关写图 前置条件 = grounding args / 输出 / sha256
│ │
└──────────────────────┴──► Grounded Evidence Graph (GEG)
Phenomenon / Hypothesis / Entity
置信度 = 对数几率累加
```
**保留**现有的五阶段流水线、断连恢复、运行归档、工具结果缓存、
`AgentFactory` 动态组合——这些设计是好的,不重写,只适配。
---
## 4. 核心设计
### 4.1 证据源抽象(解决 P1/P5/P6/P7地基
新增 `case.py`
- **`EvidenceSource`** 数据类:`id``label``type``owner`(关联人)、
`path``access_mode``meta`(类型特定,如分区 offset / 解包后根目录)。
- **`Case`**:持有 `list[EvidenceSource]` + 案件元数据,从 `case.yaml` 加载。
- **`access_mode` 是关键设计区分**
- `image`:块设备/磁盘镜像,用 TSK 按 inode 寻址USB E01、安卓 `blk0_sda` 各分区)。
- `tree`已挂载文件系统或已解包目录按路径寻址iOS 提取解压后、归档展开后)。
- 工具按 access_mode 分族注册(见 4.2)。一份证据可经「准备」从 image 变为 tree
(如分区 mount、zip 解包)。
`main.py``select_image_interactive`:91-153改为加载/构造 `Case`
`_IMAGE_GLOBS` 改为类型探测(`mmls` 试探 + 文件头嗅探),不再靠扩展名。
`config.yaml` 删除 `cfreds_hacking_case`,案件信息移入 `case.yaml`
### 4.2 工具注册按源参数化(解决 P1
现状:`register_all_tools(image_path, offset, ...)` 把单一镜像闭包进每个工具
`tool_registry.py:159+`)。改造:
- 工具执行器签名增加 `source_id`;执行时经 `SourceRegistry` 解析出真实 path/offset/mode。
- `TOOL_CATALOG``access_mode` 标注工具适用性agent 拿到的工具集由其
负责的源类型决定。
- **「当前源」上下文**:编排器为 agent 设置 current source类比现有
`graph._current_agent`工具默认作用于它——LLM 不必每次传 `source_id`
(减少出错)。跨源工具(时间线合并、实体查询)显式跨源。
- 缓存键 `_cache_key``tool_registry.py:41`)纳入 `source_id`,防止跨源串味。
### 4.3 图写入网关(解决 P2落实原则 1
现状agent 通过 `add_phenomenon` 等工具直接写图,约束只在 prompt。改造
- 所有图变更(`add_phenomenon` / `add_hypothesis` / `link` / `observe_identity` …)
收敛到**一个写入网关**。网关在代码层强制前置条件。
- 现有 prompt 里的「反幻觉规则」下沉为网关的硬校验。LLM agent 的四阶段工作流
INVESTIGATE→RECORD→LINK→ANSWER不变——变的是 RECORD 这一步底下的网关变严。
- `base_agent.py``mandatory_record_tools` 机制保留(它保证 agent 真的记录了东西)。
### 4.4 证据落地约束 Grounding解决 P2落实原则 2
这是系统可靠性的核心机制。
**ToolInvocationLog**:每次工具调用留痕一条记录
`{invocation_id, source_id, tool, args, output, output_sha256, agent, ts}`
现有结果缓存(`tool_registry.py:29`)已存确定性输出,扩展为完整留痕即可。
**Phenomenon 一分为二**——把「事实」和「解读」分开:
- `verified_facts`: `list[{type, value, invocation_id}]`
`type ∈ {path, timestamp, inode, hash, identifier, count, ...}`
- `interpretation`: 自由文本agent 的分析叙述。
**`add_phenomenon` 网关前置条件**
1. 每个 fact 必须引用一次**本 agent 本任务内真实发生过的** `invocation_id`
2. 代码校验 `fact.value` 命中该次调用的输出:
- 文本输出 → 逐字 substring 匹配;
- 结构化/二进制工具输出 → 与解析后的字段匹配。
3. 任一 fact 不通过 → **整条拒绝写入**,返回失败的 factagent 须修正重试。
4. 通过 → 写入;`verified_facts` 每条带 `invocation_id`(可重跑复核),
`interpretation` 标记为「未核验分析」。
**效果**:在系统里「记录一条工具输出未支撑的路径/时间戳/哈希/标识符」
**结构性地不可能**。LLM 仍可能写错 `interpretation`,但报告会把
verified facts带重跑指令的引证与 interpretation明确标注的分析
**分开渲染**,人类调查员一眼可辨。这是诚实划定边界的可靠性保证。
> 现有 `_make_auto_record``tool_registry.py:126`)把工具输出直接转 phenomenon——
> 那是「平凡落地」的特例(描述即输出),新设计是它的一般化与形式化。
### 4.5 假设置信度:似然比 / 对数几率(解决 P3
`evidence_graph.py:26``_DEFAULT_EDGE_WEIGHTS` 从「拍脑袋的 delta」
换成基于**似然比LR**的对数几率累加:
- 每条 `Phenomenon → Hypothesis` 边代表一个似然比。LLM 仍只做**离散分类**
(这条证据对这条假设是 direct_evidence / supports / weakens / contradicts …),
数值 `log₁₀(LR)` 由标定表查得——**LLM 绝不吐数字**延续现有「LLM 选类型、
代码算数值」哲学并赋予统计基础)。
- 置信度更新:
```
L_post = L_prior + Σ log₁₀(LR_i) # 对数几率,可交换 → 无序依赖
confidence = 1 / (1 + 10^(L_post))
```
- 边类型 → `log₁₀(LR)` 标定表(初值,后续可由标注案例校准):
| 边类型 | log₁₀LR |
|---|---:|
| `direct_evidence` | +2.0 |
| `supports` / `consequence_observed` | +1.0 |
| `prerequisite_met` | +0.5 |
| `weakens` | 0.5 |
| `contradicts` | 2.0 |
- 阈值不变≥0.8 supported / ≤0.2 refuted只是改由 `L_post` 推出。
- `prior_prob` 成为可配置量(默认 0.5 → `L_prior=0`)。
- **简化假设说明**:多条边按独立处理(朴素贝叶斯)。同类证据反复出现并非
完全独立——加一个旋钮:同 `(hypothesis, edge_type)` 的边数封顶或衰减,避免
「同一发现被多 agent 重复入图」虚高置信度(现有 Jaccard 去重已部分缓解)。
附带产出一个 **假设 × 证据矩阵**视图,供报告与线索选择使用。
### 4.6 跨源实体解析(解决「复杂场景」的关联难题,落实原则 4
复杂取证的核心难题iPhone keychain 里的 Apple ID、安卓短信库里的号码、
USB 文件作者、交易截图里的钱包地址——**哪些指向同一行为人?**
**关键设计:「身份共指」本身就是一条假设**——于是实体解析不是独立子系统,
而是 4.5 假设机制的复用:
- agent 观察到标识符即经网关 `observe_identity`,记一条**类型化**的标识符
强标识符IMEI / 钱包地址 / email / 电话号;弱标识符:昵称 / 显示名),
挂到暂定 `Entity`。
- 「Entity A ≡ Entity B」登记为一条 `Hypothesis`;共享强标识符 = 强 +LR 边,
共享弱标识符 = 弱 +LR 边,冲突的强标识符 = 强 LR 边——用 4.5 同一套计算打分。
- **不做破坏性归并**:跨阈值时在两个 Entity 间加一条 `same_as` 边(由该 coref
假设背书)。查询时把 `same_as` 连通分量视作同一行为人。**完全可逆、可审计、
可被后续 contradicts 证据推翻**(落实原则 4
- **Blocking**:只在「至少共享一个标识符或名称高相似」的实体对间建 coref 假设,
避免 O(n²)。
跨设备时间线、「谁在何时做了什么」由 `same_as` 连通后的实体图自然涌现。
### 4.7 能力插件层(接入 5 类证据)
每类证据 = 一个 `(摄取 handler, 工具集, 知识源 agent)` 三元组。推理核心不动。
| 插件 | 摄取 | 新工具 | 知识源 agent |
|---|---|---|---|
| **iOS 提取** | `unzip` 解包为 `tree` 源 | `parse_plist`(含二进制 plist)、`sqlite_tables`/`sqlite_query`(sms.db、WhatsApp `ChatStorage.sqlite`、通讯录)、`parse_ios_keychain`、`read_idevice_info` | `iOSArtifactAgent` |
| **安卓整盘** | `mmls` 分区→各分区 `image` 源;可 mount 为 `tree` | 复用 TSKext4/F2FS 读取;`fsstat` 探明加密 | 复用 filesystem + `AndroidArtifactAgent` |
| **磁盘镜像(E01)** | 已支持TSK 含 ewf | 现有 TSK 工具链 | 现有 filesystem/registry |
| **归档** | `unzip_archive` 通用解包 | —— | —— |
| **媒体/截图** | —— | `ocr_image`tesseract注意 DeepSeek 无视觉能力,必须走 OCR | `MediaAgent` |
**安卓风险**`blk0_sda` 的 `userdata` 分区大概率 FBE 加密。先 `fsstat` 各分区
探明未加密→TSK 直接用;加密且无密钥→只能分析 `EFS`/`PARAM`/`system` 等非加密区。
`tool_registry.py:80` 的 `_auto_categorize` 改为可扩展:分类由源插件提供自己的
工件分类表,而非全局 Windows 关键词表(解决 P4
### 4.8 Agent 体系重组
现有 7 个 agent 按 Windows 工件命名registry、communication=邮件/IRC、
network=浏览器/PCAP。改为按**调查职能**组织,并增加平台特定 agent
- `agent_factory.py` 的 `_AGENT_CLASSES`:34-40扩充新增 `ios_artifact`、
`android_artifact`、`financial`(钱包/交易)、`media`。
- `communication` 泛化:邮件 + IM + 短信,跨平台。
- 新增 **源类型 → 适任 agent** 映射,供 Phase 1 逐源派 triage agent。
- `create_specialized_agent`:69的动态组合机制保留——它本就是应对能力缺口的
正确手段,只是工具目录变大后选择空间更丰富。
### 4.9 编排器多源流水线
| 阶段 | 改造 |
|---|---|
| Phase 1 | 「单镜像初勘」→ **逐源并行 triage**,每源派类型适配的 agent |
| Phase 2 | 假设跨源生成;身份共指假设在此首次登记 |
| Phase 3 | leads 派发到源感知 agent假设×证据矩阵实时更新 |
| Phase 4 | 跨源时间线合并,**按源做时区归一**iOS UTC vs 安卓本地时间) |
| Phase 5 | 一案一份综合报告:含假设结论、实体关联图、每条结论的 provenance 引证 |
断连恢复、运行归档逻辑保留,`graph_state.json` 增量纳入新字段。
---
## 5. 数据模型变更汇总
| 节点/结构 | 变更 |
|---|---|
| `EvidenceSource` | **新增**一等节点(`src-*` |
| `ToolInvocation` | **新增**留痕记录(`inv-*`),随 graph 持久化 |
| `Phenomenon` | + `source_id`description 拆为 `verified_facts[]` + `interpretation`;澄清/移除语义含混的 `confidence`(默认 1.0),观测的可靠性由 grounding 表达 |
| `Hypothesis` | + `prior_prob`、`log_odds`(累加量);`confidence` 改为派生值 |
| `Entity` | + 类型化标识符集合;通过 `same_as` 边跨源连通 |
| Phenomenon→Hypothesis 边 | 携带 `edge_type`,映射到 `log₁₀(LR)`(替换 `_DEFAULT_EDGE_WEIGHTS` |
| Entity→Entity 边 | **新增** `same_as`(由 coref 假设背书,可逆) |
`evidence_graph.py` 的 `VALID_EDGE_TYPES`、序列化/反序列化、Jaccard 去重相应适配。
---
## 6. 组件改动清单
| 文件 | 改动 |
|---|---|
| `case.py` | **新建**`Case` / `EvidenceSource` / `SourceRegistry` |
| `main.py` | 选源逻辑改为加载 `Case`;类型探测替代扩展名 glob |
| `tool_registry.py` | 工具按 `source_id` 参数化;缓存键含 source`_auto_categorize` 改可扩展;`ToolInvocationLog` |
| `evidence_graph.py` | 数据模型变更(第 5 节LR/对数几率置信度;写入网关 + grounding 校验 |
| `base_agent.py` | RECORD 走网关;`add_phenomenon` 改为 `verified_facts`+`interpretation` 接口 |
| `agent_factory.py` | `_AGENT_CLASSES` 扩充源类型→agent 映射 |
| `orchestrator.py` | Phase 1 逐源Phase 4 跨源时区归一Phase 5 综合报告 |
| `agents/` | 新增 `ios_artifact.py` / `android_artifact.py` / `financial.py` / `media.py``communication.py` 泛化 |
| `tools/` | 新增 `mobile_ios.py`plist/sqlite/keychain、`media.py`OCR、`archive.py`(解包) |
| `config.yaml` / `case.yaml` | 删除 `cfreds_hacking_case`;新建 `case.yaml` 证据清单 |
---
## 7. 构建顺序(按依赖排序)
| 阶段 | 内容 | 依赖 | 价值 |
|---|---|---|---|
| **S1** | 4.1 证据源抽象 + 4.2 工具参数化 + 修 P6 | —— | 地基;先只在 USB E01 上跑通验证不破坏现有逻辑 |
| **S2** | 4.3 写入网关 + 4.4 grounding + ToolInvocationLog | S1 | 可靠性核心;可量化「零幻觉录入」 |
| **S3** | 4.5 LR/对数几率置信度 | 独立(可与 S2 并行) | 修 P3置信度可辩护 |
| **S4** | 4.7 iOS 插件 + 4.8 agent 重组 | S1 | 覆盖率 1/5 → 4/5 |
| **S5** | 4.6 跨源实体解析 | S1+S3 | 跨设备关联,复杂场景能力成型 |
| **S6** | 4.7 安卓 + 媒体插件 + 4.9 编排器适配 | S1+S4 | 全 5 份证据接入 |
S1+S2+S3 是「把系统改对」S4-S6 是「把能力铺全」。建议严格按序——
S1 不稳,后面全是空中楼阁。
---
## 8. 设计取舍与未决问题
1. **grounding 对自由文本的边界**:只硬核验 `verified_facts` 里的结构化原子,
`interpretation` 不做逐字核验(诚实划界)。可加一个二级 lint扫描
interpretation 中形似路径/时间戳/哈希但未被任何引用调用覆盖的串并告警。
2. **LR 标定表初值人定**:先用第 4.5 节的初值跑通;「从标注案例学习 LR」是后续工作。
3. **安卓 userdata 加密**:能否取得解密密钥决定 4.7 安卓插件的证据深度——需尽早探明。
4. **实体解析的破坏性 vs 可逆**:本设计选**可逆的 `same_as` 边**而非破坏性归并——
牺牲一点查询效率换取完全可审计可回滚,符合原则 4。
5. **报告粒度**:定为「一案一份综合报告」,内嵌每证据小节 + 跨源关联,
而非每证据独立成篇。

View File

@@ -24,9 +24,12 @@ def _load_agent_classes() -> None:
"""Lazy-import agent classes to avoid circular imports."""
if _AGENT_CLASSES:
return
from agents.android_artifact import AndroidArtifactAgent
from agents.communication import CommunicationAgent
from agents.filesystem import FileSystemAgent
from agents.hypothesis import HypothesisAgent
from agents.ios_artifact import IOSArtifactAgent
from agents.media import MediaAgent
from agents.network import NetworkAgent
from agents.registry import RegistryAgent
from agents.report import ReportAgent
@@ -38,6 +41,50 @@ def _load_agent_classes() -> None:
_AGENT_CLASSES["timeline"] = TimelineAgent
_AGENT_CLASSES["hypothesis"] = HypothesisAgent
_AGENT_CLASSES["report"] = ReportAgent
_AGENT_CLASSES["ios_artifact"] = IOSArtifactAgent
_AGENT_CLASSES["android_artifact"] = AndroidArtifactAgent
_AGENT_CLASSES["media"] = MediaAgent
# Triage agent per (source.type, platform). disk_image is ambiguous on its
# own — both a Windows USB image and an Android raw dump are disk_image —
# so the routing helper also looks at source.meta.platform when present.
SOURCE_TYPE_AGENTS: dict[str, str] = {
"disk_image": "filesystem", # default for unknown platform
"mobile_extraction": "ios_artifact",
"archive": "filesystem",
"media_collection": "media",
}
# Per-platform overrides for disk_image sources. Keys come from
# source.meta.platform in case.yaml (lowercased).
_DISK_IMAGE_PLATFORM_AGENTS: dict[str, str] = {
"windows": "filesystem",
"linux": "filesystem",
"android": "android_artifact",
"ios": "ios_artifact",
}
def get_triage_agent_type(source) -> str:
"""Pick the right Phase-1 agent for *source*.
Accepts either an :class:`EvidenceSource` or a raw source.type string
(for back-compat with the S5 signature). Disk-image sources additionally
consult ``source.meta.platform`` so Windows USBs and Android raw dumps —
both type=disk_image — get different agents.
"""
# Back-compat: accept a plain type string.
if isinstance(source, str):
return SOURCE_TYPE_AGENTS.get(source, "filesystem")
src_type = getattr(source, "type", "disk_image")
if src_type == "disk_image":
meta = getattr(source, "meta", {}) or {}
platform = str(meta.get("platform", "")).lower()
if platform in _DISK_IMAGE_PLATFORM_AGENTS:
return _DISK_IMAGE_PLATFORM_AGENTS[platform]
return SOURCE_TYPE_AGENTS.get(src_type, "filesystem")
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,58 @@
"""Android Artifact Agent — multi-partition analysis of raw Android dumps.
DESIGN.md §4.7 安卓: ``mmls`` slices the dump into partitions; each one is
its own analysable surface. Ext4-backed partitions (typically SYSTEM,
USERDATA when not FBE-encrypted, EFS in some variants) yield to TSK; raw
partitions (BOOT, RECOVERY, RADIO, MODEM blobs) are best mined with
``search_strings``. Userdata is the prize and is often FBE-encrypted on
modern devices — the agent must check fsstat before assuming readability
(see ``probe_android_partitions`` for the survey).
"""
from __future__ import annotations
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
class AndroidArtifactAgent(BaseAgent):
name = "android_artifact"
role = (
"Android forensic analyst. You navigate raw Android disk dumps "
"(blk0_sda-style images) partition by partition. Workflow: call "
"probe_android_partitions ONCE to map the disk; pick the partitions "
"with fs_type=Ext4 or fs_type=F2FS (SYSTEM, USERDATA if readable, "
"EFS); for each, call set_active_partition(offset_from_512_sector_column) "
"and then list_directory / extract_file / search_strings as usual. "
"For raw partitions (BOOT, RECOVERY, RADIO, TOMBSTONES) skip directly "
"to search_strings — they have no filesystem. If USERDATA shows "
"fs_type=unknown it is almost certainly FBE-encrypted: record that "
"as a negative finding (the absence IS evidence) and move on to "
"what's reachable."
)
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
super().__init__(llm, graph)
self._register_tools()
def _register_tools(self) -> None:
tool_names = [
# Android-specific
"probe_android_partitions",
"set_active_partition",
# Reused TSK toolset — partition_offset comes from active_source
"partition_info", "filesystem_info", "list_directory",
"extract_file", "find_file", "search_strings",
"count_deleted_files", "build_filesystem_timeline",
# Generic parsers
"read_text_file", "read_binary_preview", "search_text_file",
"read_text_file_section", "list_extracted_dir", "find_files",
# SQLite — Android apps store data in sqlite too (WhatsApp, etc.)
"sqlite_tables", "sqlite_query",
]
for name in tool_names:
td = TOOL_CATALOG.get(name)
if td:
self.register_tool(td.name, td.description, td.input_schema, td.executor)

49
agents/ios_artifact.py Normal file
View File

@@ -0,0 +1,49 @@
"""iOS Artifact Agent — analyses unpacked iOS extractions.
DESIGN.md §4.7/§4.8: tree-mode iOS sources are the third evidence family
the system handles (alongside disk images and pcaps). This agent owns the
iOS-specific toolset; the grounded ``add_phenomenon`` contract from
BaseAgent applies unchanged — every fact must cite a tool invocation.
"""
from __future__ import annotations
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
class IOSArtifactAgent(BaseAgent):
name = "ios_artifact"
role = (
"iOS forensic analyst. You analyse unpacked iOS extractions — "
"binary/XML plists, SQLite databases (sms.db, ChatStorage.sqlite, "
"AddressBook.sqlitedb), the keychain (keychain-2.db), and the "
"iDevice_info.txt summary — to extract device identity, accounts, "
"messaging, contacts, and credential metadata. Domain-rooted iOS "
"trees (HomeDomain, AppDomain*, ProtectedDomain, NetworkDomain) "
"are your map; navigate by path, not by inode."
)
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
super().__init__(llm, graph)
self._register_tools()
def _register_tools(self) -> None:
tool_names = [
# navigation — find_files is the workhorse on 10k+-file iOS trees;
# list_extracted_dir is for initial layout summary only.
"list_extracted_dir", "find_files",
"read_text_file", "read_text_file_section", "read_binary_preview",
"search_text_file",
# iOS-specific parsers
"parse_plist",
"sqlite_tables", "sqlite_query",
"parse_ios_keychain",
"read_idevice_info",
]
for name in tool_names:
td = TOOL_CATALOG.get(name)
if td:
self.register_tool(td.name, td.description, td.input_schema, td.executor)

52
agents/media.py Normal file
View File

@@ -0,0 +1,52 @@
"""Media Agent — OCR-based analysis of screenshot/photo evidence.
DESIGN.md §4.7: the LLM backend has no vision capability, so JPEG/PNG
evidence must go through tesseract first. The agent runs OCR, then
records extracted strings — especially identifiers (wallet addresses,
phone numbers, usernames) — via the grounded observe_identity gateway so
they participate in cross-source coref the same way iOS keychain entries
or Windows account names do.
If the OCR runtime is missing on the host, ocr_image returns an explicit
install hint; the agent should record that as a negative finding ("no
text extracted — tesseract not installed") rather than guessing.
"""
from __future__ import annotations
from base_agent import BaseAgent
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from tool_registry import TOOL_CATALOG
class MediaAgent(BaseAgent):
name = "media"
role = (
"Media / OCR forensic analyst. You analyse screenshots, photos, and "
"scanned documents — any pixel-based evidence the LLM cannot read "
"directly. Workflow: list_extracted_dir to enumerate images, "
"ocr_image on each promising one, then add_phenomenon (with the "
"OCR'd text as the verified_fact value) and observe_identity for "
"any wallet addresses, phone numbers, email addresses, or "
"usernames the text contains. If OCR fails because tesseract is "
"missing, RECORD that as a negative finding instead of fabricating "
"image content — the absence is a real fact about this run."
)
def __init__(self, llm: LLMClient, graph: EvidenceGraph) -> None:
super().__init__(llm, graph)
self._register_tools()
def _register_tools(self) -> None:
tool_names = [
"ocr_image",
"list_extracted_dir", "find_files",
"read_binary_preview",
"read_text_file",
"search_text_file",
]
for name in tool_names:
td = TOOL_CATALOG.get(name)
if td:
self.register_tool(td.name, td.description, td.input_schema, td.executor)

View File

@@ -12,9 +12,20 @@ class ReportAgent(BaseAgent):
role = (
"Forensic report writer. You synthesize all findings from the investigation "
"into a structured, professional forensic analysis report organized by hypotheses.\n\n"
"Only include findings that have a source_tool attribution (marked VERIFIED). "
"If evidence lacks source attribution, mark it as UNVERIFIED. "
"Do NOT invent or fabricate any data, timestamps, or findings not present in the evidence."
"Phenomena are marked GROUNDED (verified_facts cite a real tool invocation), "
"TOOL-ONLY (source_tool set but no facts), or UNVERIFIED (neither). When "
"writing the report, render verified_facts as primary evidence with their "
"invocation citations, and render interpretation as 'agent analysis' so the "
"reader can tell ground truth from inference. Do NOT invent or fabricate any "
"data, timestamps, or findings not present in the evidence.\n\n"
"This is a cross-source case: phenomena come from multiple evidence "
"sources, and entities discovered on different sources may refer to the "
"same real-world actor. ALWAYS include:\n"
" - 'Findings by Source' section sourced from get_phenomena_by_source\n"
" - 'Actor Clusters' section sourced from get_actor_clusters (the "
"cross-source attribution view — multi-source clusters answer "
"'which findings on different devices belong to the same person')\n"
" - 'Hypothesis × Evidence Matrix' from get_hypothesis_evidence_matrix"
)
# Calling save_report is BOTH the recording action and the completion
# signal. tool_call_loop returns the moment save_report executes; the
@@ -38,9 +49,12 @@ class ReportAgent(BaseAgent):
f"Investigation state:\n{self.graph.stats_summary()}\n\n"
f"Your task: {task}\n\n"
f"WORKFLOW:\n"
f"1. Call get_hypotheses_with_evidence, get_all_phenomena, get_entities, get_case_info "
f" to gather all the data needed for the report. Make these calls in parallel.\n"
f"2. Assemble the complete markdown forensic report.\n"
f"1. Call get_hypotheses_with_evidence, get_all_phenomena, get_entities,\n"
f" get_case_info, get_hypothesis_evidence_matrix, get_actor_clusters,\n"
f" and get_phenomena_by_source in parallel — these are the eight data\n"
f" sources you assemble the report from.\n"
f"2. Assemble the complete markdown forensic report. Cross-source\n"
f" actor clusters and per-source breakdown are MANDATORY sections.\n"
f"3. Call save_report(content=<full markdown>, output_path=\"report.md\").\n"
f" This single call is the completion signal — the run ENDS the moment it executes.\n"
f" Do NOT call any read tools after this point; they will not run.\n"
@@ -83,6 +97,45 @@ class ReportAgent(BaseAgent):
executor=self._get_entities,
)
self.register_tool(
name="get_hypothesis_evidence_matrix",
description=(
"Render the hypothesis × evidence pivot as a markdown table. "
"Columns: per edge_type counts, log_odds, confidence, status. "
"Embed this directly in the report to show how each hypothesis "
"stands relative to the others on a single screen."
),
input_schema={"type": "object", "properties": {}},
executor=self._get_hypothesis_evidence_matrix,
)
self.register_tool(
name="get_actor_clusters",
description=(
"Render the cross-source actor clusters: each cluster is the "
"set of Entity nodes the system currently treats as the same "
"actor (via active same_as edges backed by coref hypotheses "
"≥ 0.8). Includes the aggregated identifier evidence per "
"cluster. Use this in the report's 'Entities / Actors' "
"section so readers see who-is-who across devices, not just "
"raw entity rows."
),
input_schema={"type": "object", "properties": {}},
executor=self._get_actor_clusters,
)
self.register_tool(
name="get_phenomena_by_source",
description=(
"Group every phenomenon by its originating evidence source "
"(source_id). Use this to drive the report's 'Findings by "
"Source' section so each evidence item's per-device "
"contribution is auditable."
),
input_schema={"type": "object", "properties": {}},
executor=self._get_phenomena_by_source,
)
self.register_tool(
name="save_report",
description="Save the final report to a file.",
@@ -115,12 +168,24 @@ class ReportAgent(BaseAgent):
items = [ph for ph in phenomena.values() if ph.category == cat]
lines.append(f"\n--- {cat.upper()} ({len(items)} entries) ---")
for ph in items:
verified = "VERIFIED" if ph.source_tool else "UNVERIFIED"
lines.append(f"\n[{verified}] {ph.title} ({ph.id})")
# Grounded = at least one verified fact AND a source_tool.
grounded = bool(ph.verified_facts) and bool(ph.source_tool)
marker = "GROUNDED" if grounded else (
"TOOL-ONLY" if ph.source_tool else "UNVERIFIED"
)
lines.append(f"\n[{marker}] {ph.title} ({ph.id})")
lines.append(f" Source: {ph.source_agent} | Tool: {ph.source_tool or 'N/A'}")
if ph.timestamp:
lines.append(f" Timestamp: {ph.timestamp}")
lines.append(f" {ph.description[:500]}")
if ph.verified_facts:
lines.append(f" Verified facts ({len(ph.verified_facts)}):")
for f in ph.verified_facts:
lines.append(
f" - [{f.get('type','?')}] {str(f.get('value',''))[:200]} "
f"(cite: {f.get('invocation_id','?')})"
)
if ph.interpretation:
lines.append(f" Analysis: {ph.interpretation[:500]}")
return "\n".join(lines)
async def _get_hypotheses_with_evidence(self) -> str:
@@ -150,12 +215,87 @@ class ReportAgent(BaseAgent):
return "\n".join(lines)
async def _get_case_info(self) -> str:
info = self.graph.case_info
lines = ["=== Case Information ==="]
for k, v in info.items():
lines.append(f" {k}: {v}")
lines.append(f" Image path: {self.graph.image_path}")
lines.append(f" Partition offset: {self.graph.partition_offset}")
case = self.graph.case
if case is not None:
lines.append(f" case_id: {case.case_id}")
lines.append(f" name: {case.name}")
for k, v in (case.meta or {}).items():
lines.append(f" {k}: {v}")
lines.append(f" sources: {len(case.sources)}")
for s in case.sources:
owner = f", owner={s.owner}" if s.owner else ""
platform = s.meta.get("platform") if s.meta else None
plat = f", platform={platform}" if platform else ""
lines.append(
f" - {s.id}: {s.label} "
f"(type={s.type}, mode={s.access_mode}{plat}{owner})"
)
else:
# Legacy single-image fallback — surface whatever case_info dict
# was passed in (e.g. the old CFReDS MD5 block).
for k, v in (self.graph.case_info or {}).items():
lines.append(f" {k}: {v}")
lines.append(f" Image path: {self.graph.image_path}")
lines.append(f" Partition offset: {self.graph.partition_offset}")
return "\n".join(lines)
async def _get_hypothesis_evidence_matrix(self) -> str:
return self.graph.hypothesis_evidence_matrix_markdown()
async def _get_actor_clusters(self) -> str:
clusters = self.graph.actor_clusters()
if not clusters:
return "(no entities recorded)"
# Show multi-member clusters first — they're the cross-source links
# the human reader most needs to see.
clusters.sort(key=lambda c: (-len(c["members"]), c["members"]))
lines = [f"=== Actor Clusters ({len(clusters)}) ==="]
for i, c in enumerate(clusters, 1):
members = c["members"]
label = "MULTI-SOURCE CLUSTER" if len(members) > 1 else "Single entity"
lines.append(f"\n[{label} #{i}] {len(members)} member(s):")
for eid in members:
ent = self.graph.entities.get(eid)
if ent:
lines.append(f" - {ent.summary()}")
if c["identifiers"]:
lines.append(" Aggregated identifiers:")
for ident in c["identifiers"]:
strong_tag = "strong" if ident.get("strong") else "weak"
lines.append(
f" [{strong_tag}] {ident.get('type')}={ident.get('value')} "
f"(on {ident.get('on_entity')})"
)
if c["coref_hypotheses"]:
lines.append(" Backing coref hypotheses (≥0.8 active):")
for hid in c["coref_hypotheses"]:
hyp = self.graph.hypotheses.get(hid)
if hyp:
lines.append(f" - {hid}: conf={hyp.confidence:.2f}, L={hyp.log_odds:+.2f}")
return "\n".join(lines)
async def _get_phenomena_by_source(self) -> str:
by_src: dict[str, list] = {}
for ph in self.graph.phenomena.values():
by_src.setdefault(ph.source_id or "(unbound)", []).append(ph)
if not by_src:
return "(no phenomena recorded)"
# Resolve source labels via graph.case when possible.
def _label(src_id: str) -> str:
if self.graph.case:
src = self.graph.case.get_source(src_id)
if src:
return f"{src_id}{src.label} ({src.type})"
return src_id
lines = [f"=== Phenomena by Source ({len(by_src)} source(s)) ==="]
for src_id in sorted(by_src):
phs = by_src[src_id]
lines.append(f"\n--- {_label(src_id)} ({len(phs)} phenomena) ---")
for ph in phs:
grounded = "G" if ph.verified_facts and ph.source_tool else "·"
lines.append(f" [{grounded}] {ph.summary()}")
return "\n".join(lines)
async def _get_entities(self) -> str:
@@ -174,18 +314,27 @@ class ReportAgent(BaseAgent):
return "\n".join(lines)
async def _verify_phenomena(self) -> str:
verified = []
unverified = []
grounded: list[str] = []
tool_only: list[str] = []
unverified: list[str] = []
for ph in self.graph.phenomena.values():
entry = f" [{ph.category}] {ph.title} (agent: {ph.source_agent}, tool: {ph.source_tool or 'N/A'})"
if ph.source_tool:
verified.append(entry)
nf = len(ph.verified_facts)
entry = (
f" [{ph.category}] {ph.title} "
f"(agent: {ph.source_agent}, tool: {ph.source_tool or 'N/A'}, facts: {nf})"
)
if ph.verified_facts and ph.source_tool:
grounded.append(entry)
elif ph.source_tool:
tool_only.append(entry)
else:
unverified.append(entry)
lines = ["=== Phenomena Verification Report ==="]
lines.append(f"\nVERIFIED ({len(verified)}have source_tool):")
lines.extend(verified)
lines.append(f"\nGROUNDED ({len(grounded)}facts + source_tool):")
lines.extend(grounded)
lines.append(f"\nTOOL-ONLY ({len(tool_only)} — source_tool, no facts):")
lines.extend(tool_only)
lines.append(f"\nUNVERIFIED ({len(unverified)} — no source_tool):")
lines.extend(unverified)
return "\n".join(lines)

View File

@@ -122,7 +122,15 @@ class TimelineAgent(BaseAgent):
lines = []
for ph in items:
lines.append(f"{ph.timestamp} | [{ph.category}] {ph.title} ({ph.id})")
lines.append(f" {ph.description[:150]}")
preview = ph.interpretation[:150] if ph.interpretation else ""
if ph.verified_facts:
fact_preview = ", ".join(
f"{f.get('type','?')}={str(f.get('value',''))[:40]}"
for f in ph.verified_facts[:3]
)
preview = f"{preview} [facts: {fact_preview}]" if preview else f"[facts: {fact_preview}]"
if preview:
lines.append(f" {preview}")
return "\n".join(lines)
async def _add_temporal_edge(

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import json
import logging
import time
import uuid
from typing import Any
from evidence_graph import EvidenceGraph
@@ -36,7 +37,9 @@ class BaseAgent:
# forced retry with an explicit "you forgot to record" instruction.
# Subclasses override to declare their own recording responsibility
# (timeline → add_temporal_edge, hypothesis → add_hypothesis, report → save_report).
mandatory_record_tools: tuple[str, ...] = ("add_phenomenon",)
# observe_identity (S5) counts as a recording too — it writes through the
# same grounding gateway and produces an identity_observation phenomenon.
mandatory_record_tools: tuple[str, ...] = ("add_phenomenon", "observe_identity")
# Tools whose invocation ends the run immediately. After any terminal tool
# is called, tool_call_loop returns with that tool's result text as
@@ -110,8 +113,23 @@ class BaseAgent:
f" Call investigation tools (list_directory, parse_registry_key, etc.) to gather data.\n"
f" Only extract_file for forensically relevant files (user data, logs, configs, hives) — NOT system DLLs or OS files.\n"
f" Create add_lead for anything outside your expertise.\n\n"
f"Phase B — RECORD PHENOMENA:\n"
f" For EACH significant finding from Phase A, call add_phenomenon.\n"
f"Phase B — RECORD PHENOMENA (GROUNDED):\n"
f" For EACH significant finding from Phase A, call add_phenomenon with:\n"
f" * interpretation: your analysis — free text, NOT verified.\n"
f" * verified_facts: one entry per concrete atom (path, timestamp,\n"
f" inode, hash, identifier, count) you want recorded as truth.\n"
f" Each entry MUST have:\n"
f" - type: e.g. 'path', 'timestamp', 'inode', 'hash', 'identifier', 'count'\n"
f" - value: a VERBATIM substring from the tool output\n"
f" - invocation_id: the inv-xxx ID from the '[invocation: inv-xxx]'\n"
f" header at the top of the tool result that produced this value\n"
f" IDENTIFIERS — call observe_identity (in ADDITION to add_phenomenon)\n"
f" whenever you see an email, phone number, Apple ID, IMEI, wallet\n"
f" address, MAC, UDID, persistent nickname, or display name. Same\n"
f" grounding contract: value must be verbatim in the cited tool\n"
f" output. This is HOW cross-source attribution gets built — without\n"
f" it, we can't tell whether the Apple ID in keychain belongs to the\n"
f" same person as the Windows account on the USB.\n"
f" Do NOT call link_to_entity yet — just record all phenomena first.\n\n"
f"Phase C — LINK ENTITIES:\n"
f" FIRST call list_phenomena to get the current IDs — do NOT rely on memory.\n"
@@ -125,20 +143,22 @@ class BaseAgent:
f"- You MUST call add_phenomenon for EVERY significant finding BEFORE you stop.\n"
f"- NEGATIVE findings count too. If you searched X (a directory, a pattern, "
f"a registry key) and found NOTHING, that absence IS evidence — call "
f"add_phenomenon with a 'No matches for X' title and the search scope in "
f"raw_data. Negative findings constrain the hypothesis space and prevent "
f"the next agent from wasting time re-searching.\n"
f"add_phenomenon with a 'No matches for X' title, the search scope in "
f"raw_data, and cite the search tool's invocation_id (verified_facts may "
f"be empty for a true negative; the cited invocation in source_tool still "
f"anchors it). Negative findings constrain the hypothesis space.\n"
f"- If you stop without having called add_phenomenon at least once, the task "
f"is FAILED and a forced retry will fire.\n"
f"- Include exact file paths, inode numbers, timestamps, and the source_tool "
f"that produced each finding.\n\n"
f"ANTI-HALLUCINATION RULES — STRICTLY ENFORCED:\n"
f"- ONLY record findings that appear VERBATIM in tool results you received\n"
f"- NEVER invent or guess timestamps, file paths, inode numbers, or program names\n"
f"- If tool output was truncated, state '[truncated]' — do NOT fill in the missing data\n"
f"- If you are unsure whether something exists, call a tool to verify or create a lead — do NOT assume\n"
f"- Quote exact strings from tool output when recording evidence descriptions\n"
f"- Do NOT fabricate execution timestamps — only report timestamps returned by tools"
f"is FAILED and a forced retry will fire.\n\n"
f"GROUNDING GATEWAY — STRUCTURALLY ENFORCED:\n"
f"- Every tool result begins with '[invocation: inv-xxxxxxxx]' — that ID\n"
f" is what you cite in each fact's invocation_id.\n"
f"- fact.value must be a substring of the cited invocation's output.\n"
f" Case, whitespace, and path-separator (/ ↔ \\) variants are tolerated;\n"
f" anything else fabricated is REJECTED with a per-fact reason.\n"
f"- On REJECTED: quote the literal text from the output (or drop the\n"
f" fact), and put guesses / inferred paths / model names in\n"
f" `interpretation` instead. Then call add_phenomenon again.\n"
f"- You may cite ONLY invocations made within THIS task."
)
async def run(self, task: str, lead_id: str | None = None) -> str:
@@ -146,6 +166,11 @@ class BaseAgent:
_log(task, event="agent_start", agent=self.name)
self.graph.agent_status[self.name] = "running"
self.graph._current_agent = self.name
# Fresh task scope per agent run. Used by the grounding gateway to
# check that facts in add_phenomenon cite invocations made *within
# this run* — preventing the agent from forwarding stale IDs from
# earlier work or another agent.
self.graph._current_task_id = f"task-{uuid.uuid4().hex[:8]}"
self._current_lead_id = lead_id
self._register_graph_tools()
@@ -350,20 +375,67 @@ class BaseAgent:
self.register_tool(
name="add_phenomenon",
description=(
"Record a forensic finding (phenomenon) on the evidence graph. "
"You MUST specify source_tool: the name of the tool call that produced this finding."
"Record a forensic finding on the evidence graph. The finding is "
"split into provenance-bound atoms (verified_facts) and free-form "
"analysis (interpretation). Each fact MUST cite the invocation_id "
"of a tool call you made in THIS task — the gateway checks every "
"fact's value against that call's real output, byte-for-byte. "
"Any fact that fails grounding causes the whole record to be "
"rejected with a list of failures; fix the facts and call again."
),
input_schema={
"type": "object",
"properties": {
"category": {"type": "string", "description": "Category of the finding."},
"title": {"type": "string", "description": "Short title."},
"description": {"type": "string", "description": "Detailed description. Quote exact data from tool output."},
"interpretation": {
"type": "string",
"description": (
"Free-form analysis text — your reasoning, why this "
"matters, what it implies. NOT verified by the gateway. "
"Rendered in reports as 'agent analysis', not truth."
),
},
"verified_facts": {
"type": "array",
"description": (
"Atoms you want preserved as ground truth. Each must "
"appear verbatim in the cited tool output."
),
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": (
"Kind of fact: path, timestamp, inode, "
"hash, identifier, count, raw, ..."
),
},
"value": {
"type": "string",
"description": (
"Verbatim substring from the cited tool "
"output. The gateway does a literal "
"string-in-string check — no paraphrasing."
),
},
"invocation_id": {
"type": "string",
"description": (
"ID from the '[invocation: inv-xxx]' header "
"of the tool call that produced this value."
),
},
},
"required": ["type", "value", "invocation_id"],
},
},
"raw_data": {"type": "object", "description": "Structured raw data supporting this finding."},
"timestamp": {"type": "string", "description": "Timestamp if any. ONLY use timestamps from tool output."},
"source_tool": {"type": "string", "description": "Name of the tool that produced this (e.g. 'list_directory')."},
},
"required": ["category", "title", "description", "source_tool"],
"required": ["category", "title", "source_tool"],
},
executor=self._add_phenomenon,
)
@@ -414,6 +486,67 @@ class BaseAgent:
executor=self._link_to_entity,
)
self.register_tool(
name="observe_identity",
description=(
"Record a typed identifier (email / phone / Apple ID / IMEI / "
"wallet address / nickname / display name / …) for an entity. "
"Goes through the same grounding gateway as add_phenomenon — "
"value MUST be a verbatim substring of the cited tool output. "
"After attachment, the engine automatically proposes / "
"strengthens / weakens cross-source coreference hypotheses "
"between this entity and any others carrying the same or "
"conflicting identifiers. This is how 'is the Apple ID in iOS "
"keychain the same person as the Windows login name?' gets "
"answered. Call this in ADDITION to add_phenomenon for "
"identifier-bearing findings."
),
input_schema={
"type": "object",
"properties": {
"entity_name": {"type": "string", "description": "Human-readable entity name (e.g. 'LEUNG YL', 'alice@example.com')."},
"entity_type": {
"type": "string",
"enum": ["person", "program", "file", "host", "ip_address"],
"description": "Kind of entity this identifier belongs to (usually 'person').",
},
"identifier_type": {
"type": "string",
"description": (
"Strong (near-unique): email, phone_number, imei, "
"imsi, apple_id, icloud_id, google_account, "
"wallet_address, udid, mac_address, device_serial. "
"Weak (free-form, may collide): nickname, "
"display_name, username, screen_name."
),
},
"value": {
"type": "string",
"description": (
"The identifier value, quoted VERBATIM from the "
"tool output you cite in invocation_id."
),
},
"invocation_id": {
"type": "string",
"description": (
"ID from the '[invocation: inv-xxx]' header of "
"the tool call that surfaced this identifier."
),
},
"source_tool": {
"type": "string",
"description": "Name of the tool that produced the identifier.",
},
},
"required": [
"entity_name", "entity_type", "identifier_type",
"value", "invocation_id",
],
},
executor=self._observe_identity,
)
# ---- Tool executors -----------------------------------------------------
async def _list_phenomena(self, category: str | None = None) -> str:
@@ -453,16 +586,29 @@ class BaseAgent:
self,
category: str,
title: str,
description: str,
interpretation: str = "",
verified_facts: list[dict] | None = None,
raw_data: dict | None = None,
timestamp: str | None = None,
source_tool: str = "",
# Back-compat: older prompts (and accidental LLM emissions) may pass
# ``description``; treat it as ``interpretation`` rather than failing.
description: str | None = None,
) -> str:
if description and not interpretation:
interpretation = description
# GroundingError propagates: llm_client._execute_single_tool turns
# raised exceptions into "Error executing add_phenomenon: <msg>" tool
# results the LLM sees, and _wrap_record_executor does NOT increment
# the mandatory-record counter (the increment only runs after a
# successful return), so the forced-retry mechanism still fires if
# the agent never lands a grounded phenomenon.
pid, merged = await self.graph.add_phenomenon(
source_agent=self.name,
category=category,
title=title,
description=description,
interpretation=interpretation,
verified_facts=verified_facts,
raw_data=raw_data,
timestamp=timestamp,
source_tool=source_tool,
@@ -508,6 +654,51 @@ class BaseAgent:
status = "linked to existing" if existing else "created and linked"
return f"Entity {status}: {entity_name} ({entity_type}) ←[{edge_type}]— {phenomenon_id}"
async def _observe_identity(
self,
entity_name: str,
entity_type: str,
identifier_type: str,
value: str,
invocation_id: str,
source_tool: str = "",
) -> str:
# GroundingError / ValueError propagate to llm_client's per-tool
# exception handler, which formats them back to the LLM. That keeps
# the mandatory-record counter honest — only a successful return
# triggers the increment in _wrap_record_executor.
result = await self.graph.observe_identity(
entity_name=entity_name,
entity_type=entity_type,
identifier_type=identifier_type,
value=value,
source_agent=self.name,
source_tool=source_tool,
invocation_id=invocation_id,
)
lines = [
f"Identity observed: {identifier_type}={value} "
f"on entity {result['entity_id']} ({entity_name})."
]
if result.get("new_identifier"):
lines.append(
f" Observation phenomenon: {result['phenomenon_id']}"
)
else:
lines.append(" (identifier already recorded on this entity — idempotent)")
for prop in result.get("coref_proposals", []):
lines.append(
f" → Coref candidate: {prop['other_entity_id']} via "
f"{prop['match']['edge_type']} (conf={prop['confidence']:.2f}, "
f"hypothesis={prop['hypothesis_id']})"
)
for c in prop.get("conflicts", []):
lines.append(
f" ⚠ conflict on {c['type']}: "
f"{c['new_value']} vs {c['other_value']}"
)
return "\n".join(lines)
async def _list_assets(self, category: str | None = None) -> str:
results = self.graph.list_assets(category)
if not results:

41
case.example.yaml Normal file
View File

@@ -0,0 +1,41 @@
# MASForensics case definition — template
#
# Copy this file to `case.yaml` and edit it for your case. If `case.yaml`
# exists in the working directory, `python main.py` loads it automatically;
# otherwise main.py falls back to interactive single-image selection.
#
# A case is a set of evidence sources. Each source has:
# id optional — auto-derived from label if omitted ("src-<slug>")
# label human-readable name
# type disk_image | mobile_extraction | archive | media_collection
# access_mode image | tree (optional — defaults by type)
# image = block device / disk image, navigated by Sleuth Kit
# tree = mounted filesystem / unpacked extraction, path-based
# owner optional — the person the source is associated with
# path filesystem path (relative paths resolve against this file)
# partition_offset image-mode only — sector offset of the partition to analyze
# meta optional free-form notes
#
# NOTE: at the current refit stage only image-mode (disk) sources are
# analysable; tree-mode sources are accepted but skipped.
case_id: example-case
name: "Example forensic case"
meta:
notes: "free-form case-level metadata"
sources:
- id: src-suspect-laptop
label: "Suspect laptop disk image"
type: disk_image
access_mode: image
owner: "John Doe"
path: image/suspect_laptop.E01
partition_offset: 0 # run `mmls <image>` to find the right offset
- id: src-suspect-phone
label: "Suspect phone extraction"
type: mobile_extraction
access_mode: tree
owner: "John Doe"
path: image/suspect_phone.zip

226
case.py Normal file
View File

@@ -0,0 +1,226 @@
"""Case and evidence-source model — the foundation for multi-evidence analysis.
A :class:`Case` is a collection of :class:`EvidenceSource` entries. Each source
has a *type* (disk image, mobile extraction, archive, ...) and an *access mode*
that determines how forensic tools reach its contents:
- ``"image"`` — a block device / disk image, navigated by The Sleuth Kit via
inode addressing (raw, E01, dd, ...).
- ``"tree"`` — an already-mounted filesystem or unpacked extraction,
navigated by ordinary filesystem paths.
This module is pure data model + loading. Partition probing and interactive
selection live in ``main.py``.
"""
from __future__ import annotations
import logging
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
# Recognised source types and access modes.
SOURCE_TYPES = {"disk_image", "mobile_extraction", "archive", "media_collection"}
ACCESS_MODES = {"image", "tree"}
# Disk-image file extensions for interactive discovery.
# P6 fix: ``.bin`` (and vmdk/vhd) added — extension globbing previously missed
# raw block-device dumps such as ``blk0_sda.bin``.
DISK_IMAGE_EXTS = {
".001", ".dd", ".raw", ".img", ".bin", ".e01", ".iso", ".vmdk", ".vhd",
}
# Default access mode per source type.
_DEFAULT_ACCESS_MODE = {
"disk_image": "image",
"mobile_extraction": "tree",
"archive": "tree",
"media_collection": "tree",
}
def slugify(text: str) -> str:
"""Reduce *text* to a lowercase, hyphen-separated slug for use in IDs."""
slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-")
return slug or "src"
@dataclass
class EvidenceSource:
"""One piece of evidence within a :class:`Case`."""
id: str # "src-<slug>"
label: str # human-readable name
type: str # one of SOURCE_TYPES
path: str # filesystem path to the evidence
access_mode: str # "image" | "tree"
owner: str = "" # associated person, if known
partition_offset: int = 0 # sector offset (image-mode sources only)
meta: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> EvidenceSource:
"""Reconstruct from a dict, ignoring unknown keys (forward-compatible)."""
known = set(cls.__dataclass_fields__)
return cls(**{k: v for k, v in d.items() if k in known})
def summary(self) -> str:
loc = (
f"@{self.partition_offset}"
if self.access_mode == "image" and self.partition_offset
else ""
)
owner = f" owner={self.owner}" if self.owner else ""
return f"[{self.id}] {self.label} ({self.type}/{self.access_mode}{loc}){owner}"
@dataclass
class Case:
"""A forensic case: a set of evidence sources plus metadata."""
case_id: str
name: str
sources: list[EvidenceSource] = field(default_factory=list)
meta: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"case_id": self.case_id,
"name": self.name,
"sources": [s.to_dict() for s in self.sources],
"meta": dict(self.meta),
}
@classmethod
def from_dict(cls, d: dict) -> Case:
return cls(
case_id=d.get("case_id", ""),
name=d.get("name", ""),
sources=[EvidenceSource.from_dict(s) for s in d.get("sources", [])],
meta=d.get("meta", {}),
)
def get_source(self, source_id: str) -> EvidenceSource | None:
for s in self.sources:
if s.id == source_id:
return s
return None
# ---------------------------------------------------------------------------
# case.yaml loading
# ---------------------------------------------------------------------------
def _build_source(raw: dict, base_dir: Path, index: int) -> EvidenceSource:
"""Validate and normalise one source entry from case.yaml.
Missing ``id`` is derived from the label; missing ``access_mode`` defaults
by type; relative paths are resolved against *base_dir* (the case file's
directory).
"""
label = str(raw.get("label") or raw.get("id") or f"source-{index}")
src_type = str(raw.get("type", "disk_image"))
if src_type not in SOURCE_TYPES:
logger.warning("Unknown source type %r for %r — treating as disk_image",
src_type, label)
src_type = "disk_image"
access_mode = str(raw.get("access_mode") or _DEFAULT_ACCESS_MODE.get(src_type, "tree"))
if access_mode not in ACCESS_MODES:
logger.warning("Unknown access_mode %r for %r — defaulting", access_mode, label)
access_mode = _DEFAULT_ACCESS_MODE.get(src_type, "tree")
src_id = str(raw.get("id") or f"src-{slugify(label)}")
if not src_id.startswith("src-"):
src_id = f"src-{slugify(src_id)}"
raw_path = str(raw.get("path", "")).strip()
path = raw_path
if raw_path:
p = Path(raw_path).expanduser()
if not p.is_absolute():
p = (base_dir / p)
path = str(p)
return EvidenceSource(
id=src_id,
label=label,
type=src_type,
path=path,
access_mode=access_mode,
owner=str(raw.get("owner", "")),
partition_offset=int(raw.get("partition_offset", 0) or 0),
meta=dict(raw.get("meta", {})),
)
def build_case(data: dict, base_dir: Path | None = None) -> Case:
"""Build a validated :class:`Case` from a loosely-typed case.yaml dict."""
base_dir = base_dir or Path.cwd()
sources: list[EvidenceSource] = []
seen_ids: set[str] = set()
for i, raw in enumerate(data.get("sources", []) or []):
if not isinstance(raw, dict):
logger.warning("Skipping malformed source entry #%d", i)
continue
src = _build_source(raw, base_dir, i)
if src.id in seen_ids:
src.id = f"{src.id}-{i}"
seen_ids.add(src.id)
if not src.path:
logger.warning("Source %r has no path — keeping but it is not analysable",
src.label)
sources.append(src)
return Case(
case_id=str(data.get("case_id", "case")),
name=str(data.get("name", "Untitled case")),
sources=sources,
meta=dict(data.get("meta", {})),
)
def load_case(path: str | Path = "case.yaml") -> Case | None:
"""Load a :class:`Case` from a case.yaml file. Returns None if absent."""
case_path = Path(path)
if not case_path.exists():
return None
import yaml
try:
data = yaml.safe_load(case_path.read_text()) or {}
except Exception as e:
logger.error("Failed to parse %s: %s", case_path, e)
return None
if not isinstance(data, dict):
logger.error("%s is not a YAML mapping", case_path)
return None
case = build_case(data, base_dir=case_path.resolve().parent)
logger.info("Loaded case %r with %d source(s) from %s",
case.name, len(case.sources), case_path)
return case
def single_source_case(
image_path: str,
partition_offset: int = 0,
label: str | None = None,
) -> Case:
"""Wrap a single disk image as a one-source Case (interactive fallback)."""
name = label or Path(image_path).name
src = EvidenceSource(
id=f"src-{slugify(Path(image_path).stem)}",
label=name,
type="disk_image",
path=image_path,
access_mode="image",
partition_offset=partition_offset,
)
return Case(case_id="adhoc", name=name, sources=[src])

File diff suppressed because it is too large Load Diff

View File

@@ -142,6 +142,12 @@ READ_ONLY_TOOLS: set[str] = {
# Parser reads
"read_text_file", "read_binary_preview", "search_text_file",
"read_text_file_section", "list_extracted_dir", "parse_pcap_strings",
"find_files",
# iOS plugin reads (S4)
"parse_plist", "sqlite_tables", "sqlite_query",
"parse_ios_keychain", "read_idevice_info",
# Android + media reads (S6) — set_active_partition is NOT read-only.
"probe_android_partitions", "ocr_image",
}
@@ -503,7 +509,7 @@ class LLMClient:
tools: list[dict],
tool_executor: dict[str, Any],
system: str | None = None,
max_iterations: int = 40,
max_iterations: int = 60,
terminal_tools: tuple[str, ...] = (),
) -> tuple[str, list[dict]]:
"""Run a tool-calling loop using OpenAI-native tool calls.

162
main.py
View File

@@ -15,17 +15,21 @@ from pathlib import Path
import yaml
from agent_factory import AgentFactory
from case import (
DISK_IMAGE_EXTS, Case, EvidenceSource, load_case, single_source_case,
)
from evidence_graph import EvidenceGraph
from llm_client import LLMClient
from log_config import setup_logging
from orchestrator import AnalysisAborted, Orchestrator
from tool_registry import register_all_tools
from tools.archive import unzip_archive_sync
RUNS_DIR = Path("runs")
IMAGE_DIR = Path("image")
# Common forensic image extensions (only first segment / single-file formats)
_IMAGE_GLOBS = ["*.001", "*.dd", "*.raw", "*.img", "*.E01", "*.iso"]
# Persistent unpack cache for tree-mode sources (zip extractions). Lives
# at project root so multiple runs can reuse the same unpacked tree.
SOURCE_CACHE_DIR = Path(".cache/sources")
def load_config(path: str = "config.yaml") -> dict:
@@ -38,11 +42,13 @@ def load_config(path: str = "config.yaml") -> dict:
# ---------------------------------------------------------------------------
def _discover_images(search_dir: Path = IMAGE_DIR) -> list[Path]:
"""Find forensic disk image files under *search_dir*."""
images: set[Path] = set()
for glob in _IMAGE_GLOBS:
images.update(search_dir.glob(glob))
return sorted(images)
"""Find forensic disk image files under *search_dir* (case-insensitive ext)."""
if not search_dir.is_dir():
return []
return sorted(
p for p in search_dir.iterdir()
if p.is_file() and p.suffix.lower() in DISK_IMAGE_EXTS
)
def _parse_mmls(output: str) -> list[dict]:
@@ -110,7 +116,7 @@ def select_image_interactive(image_dir: Path | None = None) -> tuple[str, int]:
images = _discover_images(image_dir)
if not images:
print(f"No disk images found in {image_dir}/")
print("Supported formats: " + ", ".join(_IMAGE_GLOBS))
print("Supported extensions: " + ", ".join(sorted(DISK_IMAGE_EXTS)))
sys.exit(1)
if len(images) == 1:
@@ -153,6 +159,118 @@ def select_image_interactive(image_dir: Path | None = None) -> tuple[str, int]:
print("Invalid choice.")
def resolve_case() -> Case:
"""Resolve the Case to analyze.
Priority: an explicit case file given as a CLI argument, then ./case.yaml
in the working directory, then legacy interactive single-image selection.
"""
# 1. Explicit case file passed on the command line
if len(sys.argv) > 1 and sys.argv[1].lower().endswith((".yaml", ".yml")):
case = load_case(sys.argv[1])
if case is None:
print(f"Error: could not load case file {sys.argv[1]}")
sys.exit(1)
print(f"Loaded case: {case.name} ({len(case.sources)} sources)")
return case
# 2. ./case.yaml in the working directory
case = load_case()
if case is not None:
print(f"Loaded case: {case.name} ({len(case.sources)} sources)")
return case
# 3. Legacy interactive single-image selection
cli_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else None
image_path, partition_offset = select_image_interactive(cli_dir)
return single_source_case(image_path, partition_offset)
def _is_analysable(src: EvidenceSource) -> bool:
"""A source is analysable when it has a path AND its mode has tooling.
S4 lights up tree-mode iOS extractions; image-mode disks were already
supported. Media-collection (screenshots) remain skipped until S6.
"""
if not src.path:
return False
if src.access_mode == "image":
return True
if src.access_mode == "tree" and src.type in ("mobile_extraction", "archive"):
return True
return False
def list_analysable_sources(case: Case) -> list[EvidenceSource]:
"""Return every analysable source in the case (orchestrator iterates them).
Pre-S6 main.py used to force-choose one source here; the multi-source
orchestrator (Phase 1 per-source triage) now consumes the full list.
Skipped sources are still reported for visibility.
"""
analysable = [s for s in case.sources if _is_analysable(s)]
skipped = [s for s in case.sources if not _is_analysable(s)]
if skipped:
print(
f"Note: {len(skipped)} source(s) not analysable in this build: "
+ ", ".join(f"{s.label} ({s.type})" for s in skipped)
)
if not analysable:
print("No analysable sources in this case.")
sys.exit(1)
print(f"Analysing {len(analysable)} source(s) — orchestrator will triage each in Phase 1:")
for s in analysable:
print(f" - {s.summary()}")
return analysable
def prepare_source(src: EvidenceSource) -> EvidenceSource:
"""Materialise a tree-mode source for analysis.
Mobile / archive sources arrive as .zip files. We unpack once into a
project-level cache (``.cache/sources/<src.id>/``) and rewrite
``src.path`` to point at the unpacked directory. Idempotent — a
second run with the cache present is a no-op (unzip_archive_sync
skips files that already exist with the matching size).
Disk-image and already-tree sources pass through unchanged.
"""
if src.access_mode != "tree":
return src
p = Path(src.path)
if p.is_dir():
return src # already a directory, nothing to do
if not p.is_file():
print(f"Warning: source path {src.path} does not exist; leaving as-is.")
return src
if p.suffix.lower() != ".zip":
# Other archive types (tar, 7z, ...) — not handled yet.
print(f"Warning: tree-mode source {src.id} is not a .zip "
f"({p.suffix}); leaving as-is.")
return src
dest = SOURCE_CACHE_DIR / src.id
dest.mkdir(parents=True, exist_ok=True)
# Password-protected zips (e.g. CTF artefacts) carry their key in
# case.yaml's meta.password — never logged, never persisted.
password = (src.meta or {}).get("password")
pw_note = " (password from meta)" if password else ""
print(f"Unpacking {p.name}{dest}{pw_note} (idempotent) ...")
result = unzip_archive_sync(str(p), str(dest), password=password)
first_line = result.split("\n", 1)[0]
print(" " + first_line)
if first_line.startswith("Error:"):
# Surface the multi-line guidance from _do_extract verbatim.
for extra in result.split("\n")[1:]:
print(" " + extra)
print(f" Source {src.id} stays unanalysable until this is resolved.")
# Leave src.path unchanged so the source remains marked unanalysable.
return src
src.path = str(dest)
src.access_mode = "tree"
return src
def find_resumable_run() -> Path | None:
"""Find the most recent incomplete run with a saved graph state."""
if not RUNS_DIR.exists():
@@ -225,22 +343,30 @@ async def async_main() -> None:
# Initialize evidence graph
if graph is None:
# CLI arg takes priority, otherwise interactive prompt
cli_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else None
image_path, partition_offset = select_image_interactive(cli_dir)
case = resolve_case()
# case_info derived from THIS case's meta (case.yaml), not from
# config.yaml's legacy `cfreds_hacking_case` block. Without this,
# the old CFReDS evidence MD5s would be embedded in reports for
# every subsequent unrelated case.
graph = EvidenceGraph(
case_info=config.get("cfreds_hacking_case", {}),
case_info=dict(case.meta or {}),
persist_path=run_dir / "graph_state.json",
edge_weights=config.get("hypothesis_edge_weights"),
edge_log_lr=config.get("hypothesis_log_lr"),
)
graph.image_path = image_path
graph.partition_offset = partition_offset
graph.case = case
graph.extracted_dir = str(run_dir / "extracted")
analysable = list_analysable_sources(case)
# Prepare every analysable source up front (unzip tree-mode zips,
# etc.). Idempotent on cache hits — second run is a no-op.
prepared = [prepare_source(s) for s in analysable]
# Seed the active source so tools that resolve lazily have a target
# before Phase 1 begins; the orchestrator resets it per source.
graph.set_active_source(prepared[0])
else:
graph._persist_path = run_dir / "graph_state.json"
# Register all tools with bound image path
register_all_tools(graph.image_path, graph.partition_offset, graph, graph.extracted_dir)
# Register all tools — they resolve the active evidence source at call time
register_all_tools(graph)
# Create agent factory
factory = AgentFactory(llm, graph)

View File

@@ -10,7 +10,7 @@ import time
from datetime import datetime
from pathlib import Path
from agent_factory import AgentFactory
from agent_factory import AgentFactory, get_triage_agent_type
from evidence_graph import EvidenceGraph
from llm_client import LLMClient, _extract_first_balanced, _safe_json_loads
from tool_registry import TOOL_CATALOG
@@ -518,7 +518,7 @@ class Orchestrator:
if not unlinked:
return
valid_types = list(self.graph.edge_weights.keys())
valid_types = list(self.graph.edge_log_lr.keys())
hyp_section = "\n".join(
f" [{h.id}] {h.title}: {h.description}" for h in active
@@ -551,7 +551,7 @@ class Orchestrator:
if (
hyp_id in self.graph.hypotheses
and ph_id in self.graph.phenomena
and edge_type in self.graph.edge_weights
and edge_type in self.graph.edge_log_lr
):
await self.graph.update_hypothesis_confidence(
hyp_id=hyp_id,
@@ -593,7 +593,7 @@ class Orchestrator:
ph_id = j.get("phenomenon_id", "")
edge_type = j.get("edge_type", "")
reason = j.get("reason", "")
if ph_id in self.graph.phenomena and edge_type in self.graph.edge_weights:
if ph_id in self.graph.phenomena and edge_type in self.graph.edge_log_lr:
await self.graph.update_hypothesis_confidence(
hyp_id=hyp.id,
phenomenon_id=ph_id,
@@ -618,7 +618,10 @@ class Orchestrator:
phenomena (deterministic — the canonical tool was actually called).
"""
evidence_text = " ".join(
f"{ph.category} {ph.title} {ph.description}".lower()
(
f"{ph.category} {ph.title} {ph.interpretation} "
+ " ".join(str(f.get("value", "")) for f in ph.verified_facts)
).lower()
for ph in self.graph.phenomena.values()
)
used_tools: set[str] = {
@@ -747,28 +750,103 @@ class Orchestrator:
# ---- Main pipeline -------------------------------------------------------
# ---- Phase 1 helpers (multi-source triage) -------------------------------
@staticmethod
def _is_analysable(src) -> bool:
"""Mirror of main._is_analysable so the orchestrator doesn't depend
on main.py's import. Disk-image sources need a path; tree-mode
sources are analysable when they're mobile_extraction or archive.
"""
if not getattr(src, "path", ""):
return False
if src.access_mode == "image":
return True
if src.access_mode == "tree" and src.type in ("mobile_extraction", "archive"):
return True
# media_collection is analysable too once a MediaAgent is registered.
if src.type == "media_collection":
return True
return False
def _sources_to_triage(self) -> list:
"""Pick every analysable source in the case (or fall back to the
single active_source for the legacy single-image path).
"""
case = self.graph.case
if case is None or not case.sources:
return [self.graph.active_source] if self.graph.active_source else []
return [s for s in case.sources if self._is_analysable(s)]
async def _phase1_triage_source(self, src) -> tuple[int, int]:
"""Run the right triage agent on one source. Returns (Δphenomena, Δleads)."""
ph_before = len(self.graph.phenomena)
leads_before = sum(1 for l in self.graph.leads if l.status == "pending")
self.graph.set_active_source(src)
agent_type = get_triage_agent_type(src)
agent = self.factory.get_or_create_agent(agent_type)
if agent is None:
logger.warning(
"No agent registered for type %s — skipping source %s",
agent_type, src.id,
)
return 0, 0
_log(
f"Phase 1 triage: {src.id} ({src.label}) → {agent_type}",
event="dispatch", agent=agent_type, source=src.id,
)
try:
await agent.run(
f"Perform an initial Phase-1 triage of source {src.id} "
f"({src.label}, type={src.type}). Survey the source's "
f"structure, identify the most interesting artefacts, and "
f"record significant findings via add_phenomenon. Call "
f"observe_identity for any concrete identifiers (email, "
f"phone, Apple ID, IMEI, wallet address, persistent "
f"username) you encounter — that's how this finding will "
f"link across the other sources in the case. Create "
f"add_lead for follow-up that's outside your scope."
)
except Exception as e:
logger.error("Phase 1 agent [%s] failed on %s: %s", agent_type, src.id, e)
return (
len(self.graph.phenomena) - ph_before,
sum(1 for l in self.graph.leads if l.status == "pending") - leads_before,
)
async def run(self, resume_phase: int = 1) -> str:
"""Run the 5-phase hypothesis-driven forensic analysis pipeline."""
_log(f"Phase 1: Filesystem Survey (image: {Path(self.graph.image_path).name})", event="phase")
sources = self._sources_to_triage()
_log(
f"Phase 1: per-source triage ({len(sources)} source(s))",
event="phase",
)
report = ""
try:
# Phase 1: Initial filesystem survey
# Phase 1: Initial per-source triage (S6 multi-source).
# Runs sequentially so each agent gets its own task_id scope —
# the grounding gateway requires that, and shared graph state
# (active_source, partition_offset) would race under parallel
# dispatch anyway.
if resume_phase <= 1:
t0 = time.monotonic()
ph_before = len(self.graph.phenomena)
fs_agent = self.factory.get_or_create_agent("filesystem")
if fs_agent:
await fs_agent.run(
"Perform an initial survey of this disk image. "
"Examine the partition table, filesystem type, and root directory structure. "
"List key user directories and identify interesting files (documents, emails, "
"chat logs, installed programs, registry hives). "
"Create leads for other agents based on what you find."
for src in sources:
new_ph, new_leads = await self._phase1_triage_source(src)
_log(
f" {src.id}: +{new_ph} phenomena, +{new_leads} leads",
event="progress", source=src.id,
)
new_ph = len(self.graph.phenomena) - ph_before
new_leads = sum(1 for l in self.graph.leads if l.status == "pending")
_log(f"+{new_ph} phenomena, +{new_leads} leads", event="progress", elapsed=time.monotonic() - t0)
total_ph = len(self.graph.phenomena) - ph_before
total_leads = sum(1 for l in self.graph.leads if l.status == "pending")
_log(
f"Phase 1 total: +{total_ph} phenomena, {total_leads} pending leads",
event="progress", elapsed=time.monotonic() - t0,
)
# Phase 2: Hypothesis generation
if resume_phase <= 2:
@@ -865,8 +943,15 @@ class Orchestrator:
"6. Conclusions and Recommendations"
)
image_stem = Path(self.graph.image_path).stem
report_name = f"{image_stem}_forensic_report.md"
# Multi-source case → name by case_id (stable across sources).
# Legacy single-image runs without a Case → fall back to the
# last active image's stem so old workflows still produce a
# plausible filename.
if self.graph.case and self.graph.case.case_id:
stem = self.graph.case.case_id
else:
stem = Path(self.graph.image_path).stem or "case"
report_name = f"{stem}_forensic_report.md"
report_path = (self.run_dir / report_name) if self.run_dir else Path(report_name)
try:
report_path.write_text(report)

View File

@@ -6,6 +6,8 @@ requires-python = ">=3.14"
dependencies = [
"httpx[socks]>=0.28.1",
"openai>=2.36.0",
"pillow>=12.2.0",
"pytesseract>=0.3.13",
"pyyaml",
"regipy>=6.2.1",
]

View File

@@ -32,10 +32,10 @@ async def main() -> None:
config = yaml.safe_load(open("config.yaml"))
agent_cfg = config["agent"]
# Load graph (edge_weights from config — applied to the loaded graph)
# Load graph (edge_log_lr from config — applied to the loaded graph)
graph = EvidenceGraph.load_state(
state_path,
edge_weights=config.get("hypothesis_edge_weights"),
edge_log_lr=config.get("hypothesis_log_lr"),
)
print(f"Loaded: {graph.stats_summary()}")
@@ -49,7 +49,7 @@ async def main() -> None:
thinking_enabled=agent_cfg.get("thinking_enabled", False),
)
register_all_tools(graph.image_path, graph.partition_offset, graph)
register_all_tools(graph)
factory = AgentFactory(llm, graph)
# Run only the report agent

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,8 @@
"""Central tool registry — catalogs all available forensic tools.
Tools are registered once at startup with bound image_path and offset.
Tools are registered once at startup. Sleuth Kit tools resolve their image
path and partition offset from graph.active_source at call time, so a single
registered tool follows whichever evidence source is currently active.
The AgentFactory uses this catalog to compose agents dynamically.
"""
@@ -14,6 +16,11 @@ import re
from dataclasses import dataclass, field
from typing import Any
from evidence_graph import GroundingError
from tools import archive as arc
from tools import media as med
from tools import mobile_android as android
from tools import mobile_ios as ios
from tools import parsers
from tools import registry as reg
from tools import sleuthkit as tsk
@@ -35,6 +42,13 @@ CACHEABLE_TOOLS: set[str] = {
"parse_registry_key", "search_registry", "get_user_activity",
"read_text_file", "read_binary_preview", "search_text_file",
"read_text_file_section", "list_extracted_dir", "parse_pcap_strings",
"find_files",
# iOS (read-only file parses):
"parse_plist", "sqlite_tables", "sqlite_query",
"parse_ios_keychain", "read_idevice_info",
# Android + media (read-only):
"probe_android_partitions", "ocr_image",
# NB: unzip_archive and set_active_partition are NOT cached — they have side effects.
}
@@ -45,24 +59,106 @@ def _cache_key(tool_name: str, kwargs: dict) -> str:
return f"{tool_name}:{args_hash}"
def _looks_like_error(text: str) -> bool:
"""Heuristic for unsuccessful tool output (mirrors the prior cache filter)."""
return text.startswith("Error") or text.startswith("[Command failed") or text.startswith("[icat failed")
def _make_cached(tool_name: str, executor: Any) -> Any:
"""Wrap an executor with an in-memory result cache."""
"""Thin in-memory cache wrapper around a tool executor.
Kept as a standalone primitive (no graph dependency) so unit tests can
exercise caching in isolation. Production wiring composes this with
invocation logging via :func:`_make_invocation_executor`.
"""
async def wrapper(**kwargs) -> str:
key = _cache_key(tool_name, kwargs)
cached = _tool_result_cache.get(key)
if cached is not None:
logger.debug("Cache hit: %s(%s)", tool_name, kwargs)
return cached
hit = _tool_result_cache.get(key)
if hit is not None:
return hit
result = await executor(**kwargs)
# Only cache successful results (not errors)
if not result.startswith("Error") and not result.startswith("[Command failed"):
if not _looks_like_error(result):
_tool_result_cache[key] = result
return result
return wrapper
def _make_invocation_executor(
tool_name: str,
executor: Any,
graph: Any,
*,
cacheable: bool,
auto_record_category: str | None = None,
) -> Any:
"""Single uniform wrapper around a forensic tool executor.
Responsibilities (in order):
1. Serve from the result cache when ``cacheable=True`` and the key
is hot. Cached hits still produce a fresh ToolInvocation record
marked ``cached=True`` so the agent can cite their work.
2. Call the underlying executor on cache miss; store on success.
3. Record a :class:`ToolInvocation` on the graph (this is the
provenance unit the grounding gateway looks up).
4. (Optionally) auto-record the raw output as a Phenomenon with a
single ``type=raw`` fact citing the invocation just made. This
replaces the pre-S2 ``_make_auto_record`` shortcut.
5. Return the result with a ``[invocation: inv-xxx]`` header so
the LLM learns the ID to put in ``add_phenomenon`` facts.
"""
async def wrapper(**kwargs) -> str:
cached_flag = False
cache_hit_key: str | None = None
text: str | None = None
if cacheable:
cache_hit_key = _cache_key(tool_name, kwargs)
hit = _tool_result_cache.get(cache_hit_key)
if hit is not None:
logger.debug("Cache hit: %s(%s)", tool_name, kwargs)
text, cached_flag = hit, True
if text is None:
text = await executor(**kwargs)
if cacheable and cache_hit_key and not _looks_like_error(text):
_tool_result_cache[cache_hit_key] = text
inv_id = await graph.record_tool_invocation(
tool=tool_name, args=kwargs, output=text, cached=cached_flag,
)
# Auto-record the raw output as a phenomenon (single grounded fact).
# Skipped on error outputs and when no graph is present.
if auto_record_category and not _looks_like_error(text):
agent = getattr(graph, "_current_agent", "") or "unknown"
first_line = text.split("\n", 1)[0][:80]
try:
await graph.add_phenomenon(
source_agent=agent,
category=auto_record_category,
title=f"{tool_name}: {first_line}",
interpretation="(auto-recorded raw tool output)",
verified_facts=[{
"type": "raw",
"value": text[:2000],
"invocation_id": inv_id,
}],
source_tool=tool_name,
)
except GroundingError as e:
# Should never happen for auto-record (we just wrote the
# invocation; value is a literal prefix of output). Log
# loudly if it does — that's a bug, not a hallucination.
logger.error("Auto-record grounding failed for %s: %s", tool_name, e)
return f"[invocation: {inv_id}]\n{text}"
return wrapper
def get_cache_stats() -> dict[str, int]:
"""Return cache statistics for diagnostics."""
return {"entries": len(_tool_result_cache)}
@@ -77,12 +173,11 @@ ASSET_CATEGORIES = [
]
def _auto_categorize(filename: str) -> str:
"""Infer asset category from filename."""
def _auto_categorize_windows(filename: str) -> str:
"""Original Windows-leaning heuristic for disk-image-extracted artifacts."""
name_lower = filename.lower()
ext = os.path.splitext(name_lower)[1]
# Check full name (with extension) and base name against known hive names
if name_lower in _REGISTRY_HIVE_NAMES:
return "registry_hive"
if ext == ".pf":
@@ -93,7 +188,7 @@ def _auto_categorize(filename: str) -> str:
return "address_book"
if name_lower == "info2" or re.match(r"dc\d+\.exe", name_lower):
return "recycle_bin"
# Extension-based checks before keyword-based (e.g. mirc.ini → config, not chat)
# Extension-based checks before keyword-based (e.g. mirc.ini → config, not chat).
if ext in (".ini", ".csv", ".dat", ".cfg"):
return "config_file"
if ext in (".log", ".lst"):
@@ -107,6 +202,49 @@ def _auto_categorize(filename: str) -> str:
return "other"
def _auto_categorize_ios(filename: str) -> str:
"""iOS extraction heuristic — plist / sqlite / keychain land here.
Domain-rooted iOS extractions yield specific filenames (sms.db,
AddressBook.sqlitedb, keychain-2.db, *.plist) that the Windows
categorizer would dump into 'other' — fixing P4.
"""
name_lower = filename.lower()
ext = os.path.splitext(name_lower)[1]
if name_lower == "keychain-2.db":
return "ios_keychain"
if name_lower in ("sms.db", "chatstorage.sqlite"):
return "messaging_db"
if name_lower in ("addressbook.sqlitedb", "addressbookimages.sqlitedb"):
return "address_book"
if name_lower == "idevice_info.txt":
return "device_info"
if ext in (".sqlite", ".sqlite3", ".sqlitedb", ".db"):
return "sqlite_db"
if ext == ".plist":
return "plist"
if ext in (".log",):
return "text_log"
return "other"
# Per-source-type categorizers — dispatched by _auto_categorize at call time
# based on graph.active_source.type. Solves P4 (Windows-only categorization).
_CATEGORIZERS = {
"disk_image": _auto_categorize_windows,
"mobile_extraction": _auto_categorize_ios,
"archive": _auto_categorize_windows,
"media_collection": lambda fn: "other",
}
def _auto_categorize(filename: str, source_type: str = "disk_image") -> str:
"""Dispatch to a source-type-aware categorizer (defaults to Windows)."""
fn = _CATEGORIZERS.get(source_type, _auto_categorize_windows)
return fn(filename)
@dataclass
class ToolDefinition:
"""A registered tool available for agent composition."""
@@ -123,44 +261,53 @@ class ToolDefinition:
TOOL_CATALOG: dict[str, ToolDefinition] = {}
def _make_auto_record(tool_name: str, category: str, executor: Any, graph: Any) -> Any:
"""Wrap a forensic tool to auto-record its result as a phenomenon."""
async def wrapper(**kwargs) -> str:
result = await executor(**kwargs)
if graph is None or not result or result.startswith("Error") or result.startswith("["):
return result
# Auto-record: the tool produced a forensic fact
agent = getattr(graph, "_current_agent", "") or "unknown"
title = f"{tool_name}: {result.split(chr(10))[0][:80]}"
await graph.add_phenomenon(
source_agent=agent,
category=category,
title=title,
description=result[:2000],
source_tool=tool_name,
)
return result
return wrapper
# Set of (tool_name, category) pairs that auto-record a phenomenon when run.
# Replaces the pre-S2 ``_make_auto_record`` per-tool wrapping; the central
# instrumentation pass at the end of register_all_tools applies these.
AUTO_RECORD_TOOLS: dict[str, str] = {
"list_installed_software": "registry",
"get_system_info": "registry",
"get_timezone_info": "registry",
"get_computer_name": "registry",
"get_shutdown_time": "registry",
"enumerate_users": "registry",
"get_network_interfaces": "registry",
"get_email_config": "registry",
"parse_prefetch": "filesystem",
}
def register_all_tools(
image_path: str,
partition_offset: int,
graph: Any = None,
extracted_dir: str = "extracted",
) -> None:
"""Populate TOOL_CATALOG with all available tools, pre-bound to image/offset."""
def register_all_tools(graph: Any) -> None:
"""Populate TOOL_CATALOG with all available forensic tools.
Tools no longer close over a fixed image path. The Sleuth Kit tools
resolve the image path and partition offset from ``graph.active_source``
at call time, so the same registered tool follows whichever evidence
source the orchestrator has made active.
"""
TOOL_CATALOG.clear()
def _img() -> str:
"""Resolve the active source's image path at tool-call time."""
src = getattr(graph, "active_source", None)
if src is None or not src.path:
raise RuntimeError(
"No active evidence source — call graph.set_active_source() first."
)
return src.path
def _off() -> int:
"""Resolve the active source's partition offset at tool-call time."""
src = getattr(graph, "active_source", None)
return src.partition_offset if src is not None else 0
# ---- Sleuth Kit tools ----
TOOL_CATALOG["partition_info"] = ToolDefinition(
name="partition_info",
description="Get the partition table layout of the disk image. Run this first to understand disk structure.",
input_schema={"type": "object", "properties": {}},
executor=lambda: tsk.partition_info(image_path),
executor=lambda: tsk.partition_info(_img()),
module="sleuthkit",
tags=["filesystem", "disk", "partition"],
)
@@ -169,7 +316,7 @@ def register_all_tools(
name="filesystem_info",
description="Get detailed filesystem information (type, block size, volume name, etc.) for the selected partition.",
input_schema={"type": "object", "properties": {}},
executor=lambda: tsk.filesystem_info(image_path, partition_offset),
executor=lambda: tsk.filesystem_info(_img(), _off()),
module="sleuthkit",
tags=["filesystem", "disk"],
)
@@ -185,7 +332,7 @@ def register_all_tools(
},
},
executor=lambda inode=None, recursive=False: tsk.list_directory(
image_path, partition_offset, inode, recursive
_img(), _off(), inode, recursive
),
module="sleuthkit",
tags=["filesystem", "directory", "listing"],
@@ -204,12 +351,13 @@ def register_all_tools(
)
# Resolve real disk path first
orig_path = (await tsk.find_file(image_path, inode, partition_offset)).strip()
orig_path = (await tsk.find_file(_img(), inode, _off())).strip()
if not orig_path or "not found" in orig_path.lower():
return f"Error: inode {inode} not found on the disk image."
# Derive local filename from real disk path
filename = os.path.basename(orig_path)
extracted_dir = graph.extracted_dir
local_path = os.path.join(extracted_dir, filename)
# Handle name collisions by appending inode
@@ -219,12 +367,15 @@ def register_all_tools(
filename = os.path.basename(local_path)
# Extract
result = await tsk.extract_file(image_path, inode, local_path, partition_offset)
result = await tsk.extract_file(_img(), inode, local_path, _off())
if result.startswith("[icat failed"):
return result
size = os.path.getsize(local_path) if os.path.exists(local_path) else 0
category = _auto_categorize(os.path.basename(orig_path))
src_type = (
graph.active_source.type if graph.active_source else "disk_image"
)
category = _auto_categorize(os.path.basename(orig_path), src_type)
# Register
if graph is not None:
@@ -275,7 +426,7 @@ def register_all_tools(
},
"required": ["inode"],
},
executor=lambda inode: tsk.find_file(image_path, inode, partition_offset),
executor=lambda inode: tsk.find_file(_img(), inode, _off()),
module="sleuthkit",
tags=["filesystem"],
)
@@ -290,7 +441,7 @@ def register_all_tools(
},
"required": ["pattern"],
},
executor=lambda pattern: tsk.search_strings(image_path, pattern),
executor=lambda pattern: tsk.search_strings(_img(), pattern),
module="sleuthkit",
tags=["filesystem", "search", "strings"],
)
@@ -299,7 +450,7 @@ def register_all_tools(
name="count_deleted_files",
description="List and count all deleted files. Shows total count, executables, and extension breakdown.",
input_schema={"type": "object", "properties": {}},
executor=lambda: tsk.count_deleted_files(image_path, partition_offset),
executor=lambda: tsk.count_deleted_files(_img(), _off()),
module="sleuthkit",
tags=["filesystem", "deleted", "recovery"],
)
@@ -308,7 +459,7 @@ def register_all_tools(
name="build_filesystem_timeline",
description="Build a MAC timeline from the filesystem (Modified/Accessed/Changed times for all files).",
input_schema={"type": "object", "properties": {}},
executor=lambda: tsk.build_timeline(image_path, partition_offset),
executor=lambda: tsk.build_timeline(_img(), _off()),
module="sleuthkit",
tags=["filesystem", "timeline"],
)
@@ -341,8 +492,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("list_installed_software", "registry",
lambda hive_path: reg.list_installed_software(hive_path), graph),
executor=lambda hive_path: reg.list_installed_software(hive_path),
module="registry",
tags=["registry", "software", "installed"],
)
@@ -390,8 +540,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_system_info", "registry",
lambda hive_path: reg.get_system_info(hive_path), graph),
executor=lambda hive_path: reg.get_system_info(hive_path),
module="registry",
tags=["registry", "system"],
)
@@ -406,8 +555,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_timezone_info", "registry",
lambda hive_path: reg.get_timezone_info(hive_path), graph),
executor=lambda hive_path: reg.get_timezone_info(hive_path),
module="registry",
tags=["registry", "timezone", "system"],
)
@@ -422,8 +570,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_computer_name", "registry",
lambda hive_path: reg.get_computer_name(hive_path), graph),
executor=lambda hive_path: reg.get_computer_name(hive_path),
module="registry",
tags=["registry", "system", "hostname"],
)
@@ -438,8 +585,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_shutdown_time", "registry",
lambda hive_path: reg.get_shutdown_time(hive_path), graph),
executor=lambda hive_path: reg.get_shutdown_time(hive_path),
module="registry",
tags=["registry", "system", "shutdown"],
)
@@ -454,8 +600,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("enumerate_users", "registry",
lambda hive_path: reg.enumerate_users(hive_path), graph),
executor=lambda hive_path: reg.enumerate_users(hive_path),
module="registry",
tags=["registry", "user", "accounts", "sam"],
)
@@ -470,8 +615,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_network_interfaces", "registry",
lambda hive_path: reg.get_network_interfaces(hive_path), graph),
executor=lambda hive_path: reg.get_network_interfaces(hive_path),
module="registry",
tags=["registry", "network", "adapter", "ip"],
)
@@ -486,8 +630,7 @@ def register_all_tools(
},
"required": ["hive_path"],
},
executor=_make_auto_record("get_email_config", "registry",
lambda hive_path: reg.get_email_config(hive_path), graph),
executor=lambda hive_path: reg.get_email_config(hive_path),
module="registry",
tags=["registry", "email", "account"],
)
@@ -504,8 +647,7 @@ def register_all_tools(
},
"required": ["file_path"],
},
executor=_make_auto_record("parse_prefetch", "filesystem",
lambda file_path: parsers.parse_prefetch(file_path), graph),
executor=lambda file_path: parsers.parse_prefetch(file_path),
module="parsers",
tags=["filesystem", "prefetch", "execution"],
)
@@ -577,7 +719,13 @@ def register_all_tools(
TOOL_CATALOG["list_extracted_dir"] = ToolDefinition(
name="list_extracted_dir",
description="List files in an extracted directory with sizes.",
description=(
"Summarise an extracted directory tree: total counts, "
"extension breakdown, top-level layout, largest files. "
"Scales to 10k+-file trees without truncating into uselessness. "
"For targeted searches (find every *.plist, locate sms.db, ...) "
"use find_files instead."
),
input_schema={
"type": "object",
"properties": {
@@ -590,6 +738,31 @@ def register_all_tools(
tags=["filesystem", "listing", "extracted"],
)
TOOL_CATALOG["find_files"] = ToolDefinition(
name="find_files",
description=(
"Recursively find files under a directory by glob pattern. "
"Use this on tree-mode sources (iOS extractions, archives, "
"Android-mounted partitions) to locate specific artefacts in "
"huge trees. Patterns are fnmatch-style; '**' means 'any "
"depth'. Examples: '**/sms.db', '**/keychain-2.db', "
"'**/ChatStorage.sqlite', '**/*.plist', 'HomeDomain/Library/**'. "
"Results sort by size descending; capped at max_results."
),
input_schema={
"type": "object",
"properties": {
"root": {"type": "string", "description": "Directory to search under."},
"pattern": {"type": "string", "description": "fnmatch glob pattern (use '**' for any depth)."},
"max_results": {"type": "integer", "description": "Result cap (default 500)."},
},
"required": ["root", "pattern"],
},
executor=lambda root, pattern, max_results=500: parsers.find_files(root, pattern, max_results),
module="parsers",
tags=["filesystem", "search", "extracted", "glob"],
)
TOOL_CATALOG["parse_pcap_strings"] = ToolDefinition(
name="parse_pcap_strings",
description="Extract HTTP headers, hosts, User-Agent, cookies, and URLs from a PCAP/capture file.",
@@ -605,11 +778,224 @@ def register_all_tools(
tags=["network", "pcap", "http", "capture"],
)
# ---- Apply result caching to deterministic read-only tools ----
# Must come AFTER all tools are registered. Auto-record wrapped tools
# (e.g. get_system_info) are NOT in CACHEABLE_TOOLS since they write
# to the evidence graph as a side effect.
# ---- Archive tools (tree-mode prep) ----
TOOL_CATALOG["unzip_archive"] = ToolDefinition(
name="unzip_archive",
description=(
"Extract a .zip archive into a target directory. Defensive against "
"zip-slip; skips symlinks. Idempotent on rerun. Pass `password` for "
"password-protected zips — only the legacy ZipCrypto algorithm is "
"supported by stdlib (AES zips need an external `7z x` step)."
),
input_schema={
"type": "object",
"properties": {
"zip_path": {"type": "string", "description": "Path to the .zip file."},
"dest_dir": {"type": "string", "description": "Directory to extract into (created if missing)."},
"password": {"type": "string", "description": "Password for encrypted zips (omit for plain archives)."},
},
"required": ["zip_path", "dest_dir"],
},
executor=lambda zip_path, dest_dir, password=None: arc.unzip_archive(zip_path, dest_dir, password),
module="archive",
tags=["archive", "zip", "extract", "ingest"],
)
# ---- iOS plugin tools (DESIGN.md §4.7) ----
TOOL_CATALOG["parse_plist"] = ToolDefinition(
name="parse_plist",
description=(
"Parse a .plist file (XML or binary) and return its contents as JSON. "
"Bytes are rendered as hex; dates as ISO-8601."
),
input_schema={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to .plist file."},
},
"required": ["file_path"],
},
executor=lambda file_path: ios.parse_plist(file_path),
module="mobile_ios",
tags=["ios", "plist", "parse"],
)
TOOL_CATALOG["sqlite_tables"] = ToolDefinition(
name="sqlite_tables",
description=(
"List user tables in a sqlite database with row counts and column "
"names. Use this to scout an unfamiliar .sqlite / .db file before "
"querying it."
),
input_schema={
"type": "object",
"properties": {
"db_path": {"type": "string", "description": "Path to .sqlite/.db file."},
},
"required": ["db_path"],
},
executor=lambda db_path: ios.sqlite_tables(db_path),
module="mobile_ios",
tags=["sqlite", "schema", "ios", "android"],
)
TOOL_CATALOG["sqlite_query"] = ToolDefinition(
name="sqlite_query",
description=(
"Run a single read-only SELECT against a sqlite file. "
"Multi-statement queries and non-SELECT statements are rejected. "
"Use this for sms.db / ChatStorage.sqlite / AddressBook.sqlitedb / etc."
),
input_schema={
"type": "object",
"properties": {
"db_path": {"type": "string", "description": "Path to .sqlite/.db file."},
"query": {"type": "string", "description": "A single SELECT statement."},
"max_rows": {"type": "integer", "description": "Row cap (default 100)."},
},
"required": ["db_path", "query"],
},
executor=lambda db_path, query, max_rows=100: ios.sqlite_query(db_path, query, max_rows),
module="mobile_ios",
tags=["sqlite", "query", "ios", "android"],
)
TOOL_CATALOG["parse_ios_keychain"] = ToolDefinition(
name="parse_ios_keychain",
description=(
"Locate and summarise iOS keychain entries (keychain-2.db). "
"Pass either the db file directly or the containing directory; "
"dumps accounting metadata from genp/inet/cert/keys tables."
),
input_schema={
"type": "object",
"properties": {
"keychain_root": {
"type": "string",
"description": "Path to keychain-2.db or a directory that contains it.",
},
},
"required": ["keychain_root"],
},
executor=lambda keychain_root: ios.parse_ios_keychain(keychain_root),
module="mobile_ios",
tags=["ios", "keychain", "credentials"],
)
TOOL_CATALOG["read_idevice_info"] = ToolDefinition(
name="read_idevice_info",
description=(
"Read the iDevice_info.txt summary at the root of an iOS extraction. "
"Pass the file path or the extraction root directory."
),
input_schema={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to iDevice_info.txt or extraction root."},
},
"required": ["file_path"],
},
executor=lambda file_path: ios.read_idevice_info(file_path),
module="mobile_ios",
tags=["ios", "device", "metadata"],
)
# ---- Android plugin (DESIGN.md §4.7) ----
TOOL_CATALOG["probe_android_partitions"] = ToolDefinition(
name="probe_android_partitions",
description=(
"Survey every partition on an Android disk dump (mmls + per-"
"partition fsstat). Returns a markdown table with name, native "
"and 512-byte sector offsets, filesystem type, and a strategy "
"hint per partition. Use this BEFORE deciding which partitions "
"to dive into via set_active_partition + list_directory."
),
input_schema={"type": "object", "properties": {}},
executor=lambda: android.probe_android_partitions(_img()),
module="mobile_android",
tags=["android", "partition", "survey"],
)
async def _set_active_partition(partition_offset: int) -> str:
src = getattr(graph, "active_source", None)
if src is None:
return "Error: no active evidence source."
old = src.partition_offset
new = int(partition_offset)
src.partition_offset = new
# Sync the legacy mirror field so older readers stay consistent.
graph.partition_offset = new
return (
f"Active partition offset: {old}{new} (512-byte sectors). "
f"Subsequent list_directory / extract_file / search_strings "
f"calls now target this partition on {src.id} ({src.label})."
)
TOOL_CATALOG["set_active_partition"] = ToolDefinition(
name="set_active_partition",
description=(
"Switch the current partition offset (in 512-byte sectors) on "
"the active disk-image source. Use the values from "
"probe_android_partitions's '512-sector' column. NOT a "
"forensic read — purely repoints the TSK toolset. Mutates "
"shared state; call serially within one agent run."
),
input_schema={
"type": "object",
"properties": {
"partition_offset": {
"type": "integer",
"description": "Partition start in 512-byte sectors.",
},
},
"required": ["partition_offset"],
},
executor=_set_active_partition,
module="android",
tags=["android", "partition", "navigation"],
)
# ---- Media plugin (DESIGN.md §4.7) ----
TOOL_CATALOG["ocr_image"] = ToolDefinition(
name="ocr_image",
description=(
"Extract text from an image via tesseract. The LLM backend has "
"no vision, so this is the only way to read JPEG/PNG evidence "
"(screenshots of chats, transactions, IDs). Default lang covers "
"English + Simplified & Traditional Chinese; override `lang` "
"if you know the artefact's language. Returns 'Error: OCR "
"runtime not available' with an install hint when tesseract "
"isn't on the host — record that absence as a negative "
"finding rather than guessing."
),
input_schema={
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to image file."},
"lang": {"type": "string", "description": "Tesseract language code(s), e.g. 'eng' or 'eng+chi_sim'."},
},
"required": ["file_path"],
},
executor=lambda file_path, lang="eng+chi_sim+chi_tra": med.ocr_image(file_path, lang),
module="media",
tags=["media", "ocr", "image"],
)
# ---- Wrap every executor with invocation logging (+ cache + auto-record) ----
# Must run AFTER all tools are registered. Every tool call now produces
# a ToolInvocation entry on the graph (provenance for grounding), and
# returns the result prefixed with ``[invocation: inv-xxx]`` so the LLM
# can cite the call in add_phenomenon facts.
_tool_result_cache.clear()
for tool_name, td in TOOL_CATALOG.items():
if tool_name in CACHEABLE_TOOLS:
td.executor = _make_cached(tool_name, td.executor)
td.executor = _make_invocation_executor(
tool_name,
td.executor,
graph,
cacheable=(tool_name in CACHEABLE_TOOLS),
auto_record_category=AUTO_RECORD_TOOLS.get(tool_name),
)

156
tools/archive.py Normal file
View File

@@ -0,0 +1,156 @@
"""Archive extraction tools — generic unzip for tree-mode evidence sources.
Mobile extractions (iOS / Android backups), archive sources, and shared
work products all arrive as .zip files. The forensic agents work on the
unpacked tree; this module is the single entry point for safely turning
an archive into a directory.
Stdlib-only. No graph dependency.
"""
from __future__ import annotations
import logging
import os
import zipfile
from pathlib import Path
logger = logging.getLogger(__name__)
def _is_within(base: Path, target: Path) -> bool:
"""True when *target* resolves to a path inside *base* — symlink-safe."""
try:
base_r = base.resolve()
target_r = target.resolve()
except OSError:
return False
try:
target_r.relative_to(base_r)
except ValueError:
return False
return True
def _is_zip_encrypted(zf: zipfile.ZipFile) -> bool:
"""True when any entry has the zip 'encrypted' flag bit set."""
return any(info.flag_bits & 0x1 for info in zf.infolist())
def _do_extract(
zip_path: str,
dest_dir: str,
password: str | None = None,
) -> str:
"""Shared core for unzip_archive (async) and unzip_archive_sync.
Pure stdlib + filesystem I/O — no asyncio. Idempotent on rerun (files
whose target already exists at the matching size are skipped). Returns
a multi-line summary the agent can read directly.
"""
zp = Path(zip_path)
if not zp.is_file():
return f"Error: {zip_path} is not a file."
dest = Path(dest_dir)
dest.mkdir(parents=True, exist_ok=True)
extracted = 0
skipped: list[str] = []
total_bytes = 0
pwd_bytes = password.encode("utf-8") if password else None
try:
with zipfile.ZipFile(zp, "r") as zf:
encrypted = _is_zip_encrypted(zf)
if encrypted and pwd_bytes is None:
return (
f"Error: {zip_path} is password-protected. "
f"Provide the password via case.yaml's "
f"meta.password on this source, or pass `password=` "
f"explicitly. Stdlib zipfile only supports the legacy "
f"ZipCrypto algorithm — AES-encrypted zips (created by "
f"7-Zip / WinZip) need an external tool like 7z."
)
for info in zf.infolist():
name = info.filename
# Block absolute paths and parent-escape attempts up front.
if name.startswith(("/", "\\")) or ".." in Path(name).parts:
skipped.append(f"escape: {name}")
continue
target = dest / name
if not _is_within(dest, target):
skipped.append(f"escape: {name}")
continue
# Symlink entries — skip rather than risk traversing out.
if info.external_attr >> 16 & 0o120000 == 0o120000:
skipped.append(f"symlink: {name}")
continue
if info.is_dir():
target.mkdir(parents=True, exist_ok=True)
continue
# Skip if already extracted with matching size (idempotent rerun).
if target.exists() and target.stat().st_size == info.file_size:
continue
target.parent.mkdir(parents=True, exist_ok=True)
try:
with zf.open(info, "r", pwd=pwd_bytes) as src, open(target, "wb") as out:
while True:
chunk = src.read(65536)
if not chunk:
break
out.write(chunk)
except RuntimeError as e:
# zipfile raises RuntimeError for bad-password / AES-encrypted.
msg = str(e)
if "Bad password" in msg or "password required" in msg:
return (
f"Error: bad or missing password for {zip_path}. "
f"If the zip is AES-encrypted (7-Zip/WinZip), stdlib "
f"cannot decrypt it — use `7z x -p<pwd> ...` "
f"externally and point the source path at the result."
)
raise
extracted += 1
total_bytes += info.file_size
except zipfile.BadZipFile as e:
return f"Error: {zip_path} is not a valid zip archive: {e}"
except Exception as e:
return f"Error extracting {zip_path}: {e}"
parts = [
f"Extracted {extracted} file(s), {total_bytes} bytes, into {dest}",
]
if skipped:
parts.append(f"Skipped {len(skipped)} unsafe entries:")
for s in skipped[:10]:
parts.append(f" - {s}")
if len(skipped) > 10:
parts.append(f" ... ({len(skipped) - 10} more)")
return "\n".join(parts)
async def unzip_archive(
zip_path: str, dest_dir: str, password: str | None = None,
) -> str:
"""Extract *zip_path* into *dest_dir*. Idempotent on rerun.
Defensive: rejects entries with absolute paths, leading '..', or that
would resolve outside *dest_dir* (the classic zip-slip vector). Symlink
entries are skipped (we never follow symlinks into the host filesystem).
Password-protected zips need the password argument (or
``meta.password`` on the source in case.yaml) — stdlib ``zipfile``
only handles the legacy ZipCrypto algorithm.
"""
return _do_extract(zip_path, dest_dir, password)
def unzip_archive_sync(
zip_path: str, dest_dir: str, password: str | None = None,
) -> str:
"""Synchronous variant of :func:`unzip_archive` for startup-time prepare_source.
Same behaviour, just no async wrapping — used before the event loop
starts so we don't have to spin one up just to unpack a zip.
"""
return _do_extract(zip_path, dest_dir, password)

87
tools/media.py Normal file
View File

@@ -0,0 +1,87 @@
"""Media plugin — OCR for image evidence.
DESIGN.md §4.7: the model backend (DeepSeek) has no vision, so we MUST run
OCR locally for any image-bearing evidence. Tesseract via pytesseract is
the default; if the runtime is missing those packages, the tool returns a
clear install hint rather than failing silently.
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
logger = logging.getLogger(__name__)
MAX_OUTPUT = 8000
_INSTALL_HINT = (
"Error: OCR runtime not available. Install with:\n"
" pip install pytesseract pillow\n"
" sudo apt install tesseract-ocr tesseract-ocr-chi-sim tesseract-ocr-chi-tra\n"
"(or the equivalent for your distribution). Then retry."
)
def _has_ocr_runtime() -> tuple[bool, str]:
"""Return (available, reason). reason is empty when available."""
try:
import pytesseract # noqa: F401
from PIL import Image # noqa: F401
except ImportError as e:
return False, f"missing python package: {e.name}"
# Check the tesseract binary too.
import shutil
if shutil.which("tesseract") is None:
return False, "tesseract binary not on PATH"
return True, ""
async def ocr_image(file_path: str, lang: str = "eng+chi_sim+chi_tra") -> str:
"""Extract text from an image via tesseract.
*lang* defaults to English + Simplified + Traditional Chinese, matching
the multi-language artefacts the current case involves. Pass a single
language code (e.g. ``"eng"``) to skip language packs that aren't
installed.
"""
p = Path(file_path)
if not p.is_file():
return f"Error: {file_path} is not a file."
available, reason = _has_ocr_runtime()
if not available:
return f"{_INSTALL_HINT}\n[detail: {reason}]"
import pytesseract
from PIL import Image
try:
img = Image.open(p)
except Exception as e:
return f"Error: could not open image {file_path}: {e}"
try:
text = pytesseract.image_to_string(img, lang=lang)
except pytesseract.TesseractError as e:
msg = str(e)
if "Failed loading language" in msg or "Error opening data file" in msg:
return (
f"Error: tesseract is installed but missing language pack(s) for {lang!r}. "
f"Install the language data (e.g. tesseract-ocr-chi-sim) or pass a "
f"different `lang`. Detail: {msg}"
)
return f"Error running tesseract: {msg}"
except Exception as e:
return f"Error during OCR: {e}"
size = p.stat().st_size
header = (
f"ocr: {file_path} ({size} bytes, lang={lang}, "
f"{len(text.splitlines())} line(s))\n"
)
if len(text) > MAX_OUTPUT - len(header):
body = text[:MAX_OUTPUT - len(header)] + "\n[truncated]"
else:
body = text
return header + body

160
tools/mobile_android.py Normal file
View File

@@ -0,0 +1,160 @@
"""Android plugin tools — partition survey + sector translation.
DESIGN.md §4.7 安卓: ``mmls`` partitions → per-partition image-mode source;
``fsstat`` per partition to classify ext4/F2FS/raw/encrypted. The shared TSK
toolchain already handles ext4/F2FS reads, so once the agent picks a partition
offset the standard list_directory / extract_file / search_strings tools work.
Quirk: Samsung dumps (e.g. ``blk0_sda.bin``) use 4096-byte image sectors but
TSK tool flags accept 512-byte sectors by default. ``probe_android_partitions``
emits BOTH unit systems so the agent can plug the right ``partition_offset``
value into ``set_active_partition``.
"""
from __future__ import annotations
import asyncio
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
MAX_OUTPUT = 8000
# Partitions worth flagging when we encounter them — informs the agent's
# strategy. Not exhaustive; just opinionated hints.
_PARTITION_HINTS: dict[str, str] = {
"EFS": "modem firmware area; often contains IMEI / MAC / serial",
"PARAM": "boot parameters; cmdline + flags",
"BOOT": "kernel + initramfs (raw image)",
"RECOVERY": "recovery image (raw)",
"SYSTEM": "Android /system — read-only OS partition (ext4)",
"CACHE": "downloaded OTA payloads; usually transient",
"USERDATA": "/data — user apps, dbs, accounts; FBE-encrypted on modern devices",
"PERSISTENT": "Samsung persistent partition; carrier/device flags",
"STEADY": "Samsung steady-state config",
"HIDDEN": "Samsung hidden partition; check before assuming empty",
"CP_DEBUG": "modem debug logs",
"TOMBSTONES": "userland crash dumps",
}
def _parse_mmls_with_unit(output: str) -> tuple[int, list[dict]]:
"""Parse mmls output, returning (sector_size_bytes, partitions).
mmls states ``Units are in N-byte sectors`` near the top; we extract N
to translate between image-native units and the 512-byte units TSK
tools accept via ``-o``.
"""
sector_size = 512
m = re.search(r"Units are in (\d+)-byte sectors", output)
if m:
sector_size = int(m.group(1))
parts: list[dict] = []
for line in output.splitlines():
m = re.match(
r"\s*(\d{3}):\s+(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.*)",
line,
)
if not m:
continue
_row, slot, start, end, length, desc = m.groups()
if slot == "Meta" or slot.startswith("---"):
continue
parts.append({
"slot": slot,
"start_native": int(start),
"end_native": int(end),
"length_native": int(length),
"description": desc.strip(),
})
return sector_size, parts
async def _run(cmd: list[str], timeout: int = 30) -> tuple[int, str, str]:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
return 124, "", f"timeout after {timeout}s"
return proc.returncode or 0, stdout.decode("utf-8", "replace"), stderr.decode("utf-8", "replace")
_FS_TYPE_RE = re.compile(r"File System Type:\s*(\S+)", re.IGNORECASE)
async def _classify_partition(image_path: str, sector_offset_512: int) -> str:
"""Run fsstat on a partition; return 'Ext4'/'Yaffs2'/'FAT'/'unknown'/'inaccessible'.
fsstat's "Cannot determine file system type" is treated as 'unknown'
typically means raw image (BOOT/RECOVERY/RADIO/…) or encrypted data
(modern userdata under FBE).
"""
rc, out, _err = await _run(["fsstat", "-o", str(sector_offset_512), image_path], timeout=15)
if rc != 0:
return "unknown"
m = _FS_TYPE_RE.search(out)
if m:
return m.group(1)
return "unknown"
async def probe_android_partitions(image_path: str) -> str:
"""Survey every partition on an Android disk dump and return a table.
The agent reads this once to plan its work: which partitions are
Ext4/F2FS (use TSK), which are raw (extract image / strings only),
which are encrypted (skip until decrypted).
"""
p = Path(image_path)
if not p.is_file():
return f"Error: {image_path} is not a file."
rc, out, err = await _run(["mmls", str(p)], timeout=30)
if rc != 0:
return f"Error: mmls failed (rc={rc}): {err.strip() or out.strip()}"
sector_size, parts = _parse_mmls_with_unit(out)
if not parts:
return f"No partitions detected in {image_path}."
lines = [
f"Android partition survey: {image_path}",
f" mmls reports {sector_size}-byte sectors (TSK -o expects 512-byte sectors)",
f" {len(parts)} data partitions",
"",
"| slot | name | start (native) | start (512-sector) | size | fs_type | hint |",
"|---|---|---:|---:|---|---|---|",
]
for prt in parts:
sector_512 = prt["start_native"] * sector_size // 512
bytes_size = prt["length_native"] * sector_size
# human-readable size
if bytes_size >= 1 << 30:
size_h = f"{bytes_size / (1 << 30):.1f} GB"
elif bytes_size >= 1 << 20:
size_h = f"{bytes_size / (1 << 20):.1f} MB"
else:
size_h = f"{bytes_size // 1024} KB"
fs_type = await _classify_partition(str(p), sector_512)
# Try to extract a friendly partition name from the description
# (mmls description often includes the partition name uppercase).
name_match = re.search(r"[A-Z][A-Z0-9_]{2,}", prt["description"])
pname = name_match.group(0) if name_match else prt["description"][:20]
hint = _PARTITION_HINTS.get(pname, "")
lines.append(
f"| {prt['slot']} | {pname} | {prt['start_native']} | "
f"{sector_512} | {size_h} | {fs_type} | {hint} |"
)
body = "\n".join(lines)
if len(body) > MAX_OUTPUT:
body = body[:MAX_OUTPUT] + "\n\n[truncated]"
return body

274
tools/mobile_ios.py Normal file
View File

@@ -0,0 +1,274 @@
"""iOS extraction parsers — plist / sqlite / keychain / iDevice info.
DESIGN.md §4.7 iOS plugin tools. All tree-mode, path-based — no Sleuth
Kit, no graph dependency. Stdlib + sqlite3 only.
iOS extractions typically arrive as a zip containing domain-rooted trees
(HomeDomain, AppDomain, etc.) with a flat ``iDevice_info.txt`` summary,
binary/XML plists, and several SQLite databases (sms.db, AddressBook,
keychain-2.db, app-specific stores like WhatsApp's ChatStorage.sqlite).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import plistlib
import re
import sqlite3
from pathlib import Path
logger = logging.getLogger(__name__)
# Output cap (chars) — keeps a single tool result under the LLM context budget.
MAX_OUTPUT = 8000
def _trunc(text: str, limit: int = MAX_OUTPUT) -> str:
if len(text) <= limit:
return text
return text[:limit] + f"\n\n[Output truncated: {len(text)} chars total]"
# ---------------------------------------------------------------------------
# plist
# ---------------------------------------------------------------------------
def _to_jsonable(obj):
"""Make plist values JSON-serializable: bytes → hex preview, dates → iso."""
import datetime
if isinstance(obj, bytes):
if len(obj) <= 64:
return {"_bytes_hex": obj.hex()}
return {"_bytes_hex_preview": obj[:64].hex(), "_total_bytes": len(obj)}
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, dict):
return {str(k): _to_jsonable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_to_jsonable(v) for v in obj]
return obj
async def parse_plist(file_path: str) -> str:
"""Parse a .plist file (XML or binary) and return its contents as JSON.
Both formats are handled transparently by ``plistlib.load``.
"""
p = Path(file_path)
if not p.is_file():
return f"Error: {file_path} is not a file."
try:
with open(p, "rb") as f:
data = plistlib.load(f)
except plistlib.InvalidFileException as e:
return f"Error: {file_path} is not a valid plist ({e})"
except Exception as e:
return f"Error parsing plist {file_path}: {e}"
serial = _to_jsonable(data)
rendered = json.dumps(serial, ensure_ascii=False, indent=2, default=str)
header = f"plist: {file_path} ({p.stat().st_size} bytes)\n"
return header + _trunc(rendered)
# ---------------------------------------------------------------------------
# sqlite
# ---------------------------------------------------------------------------
_SELECT_RE = re.compile(r"^\s*SELECT\b", re.IGNORECASE)
async def sqlite_tables(db_path: str) -> str:
"""List user tables in a sqlite file with row counts and column names."""
p = Path(db_path)
if not p.is_file():
return f"Error: {db_path} is not a file."
try:
conn = sqlite3.connect(f"file:{p}?mode=ro", uri=True)
except sqlite3.OperationalError as e:
return f"Error opening {db_path} (read-only): {e}"
try:
cur = conn.cursor()
cur.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
)
tables = [r[0] for r in cur.fetchall()]
if not tables:
return f"No user tables in {db_path}."
lines = [f"sqlite: {db_path} ({len(tables)} tables)"]
for name in tables:
try:
cur.execute(f"SELECT COUNT(*) FROM \"{name}\"")
count = cur.fetchone()[0]
except sqlite3.DatabaseError as e:
count = f"(count failed: {e})"
try:
cur.execute(f"PRAGMA table_info(\"{name}\")")
cols = [r[1] for r in cur.fetchall()]
except sqlite3.DatabaseError:
cols = []
lines.append(f" {name}: {count} row(s); cols: {', '.join(cols)}")
return _trunc("\n".join(lines))
finally:
conn.close()
async def sqlite_query(
db_path: str,
query: str,
max_rows: int = 100,
) -> str:
"""Run a single read-only SELECT against a sqlite file.
Multi-statement queries and anything other than a SELECT are rejected
(we open the database in read-only mode anyway, so writes would fail
too — but the explicit check keeps the agent honest).
"""
if not _SELECT_RE.match(query):
return "Error: only single SELECT statements are allowed."
if ";" in query.rstrip(";"):
return "Error: multi-statement queries are not allowed."
p = Path(db_path)
if not p.is_file():
return f"Error: {db_path} is not a file."
try:
conn = sqlite3.connect(f"file:{p}?mode=ro", uri=True)
except sqlite3.OperationalError as e:
return f"Error opening {db_path} (read-only): {e}"
try:
cur = conn.cursor()
try:
cur.execute(query)
except sqlite3.DatabaseError as e:
return f"Error executing query: {e}"
cols = [d[0] for d in cur.description] if cur.description else []
rows = cur.fetchmany(max(1, int(max_rows)))
lines = [
f"sqlite query: {db_path}",
f"columns: {cols}",
f"rows ({len(rows)}, capped at {max_rows}):",
]
for row in rows:
rendered = [
(v.hex() if isinstance(v, bytes) else str(v))
for v in row
]
lines.append(" " + " | ".join(rendered))
return _trunc("\n".join(lines))
finally:
conn.close()
# ---------------------------------------------------------------------------
# iOS keychain (keychain-2.db)
# ---------------------------------------------------------------------------
# Standard iOS keychain tables. genp = generic passwords, inet = internet
# passwords, cert = certificates, keys = key material. Forensic extractions
# of locked keychains have ``data`` columns NULL but accounting metadata
# (agrp, acct, svce) intact — already useful for attribution work.
_KEYCHAIN_TABLES = ("genp", "inet", "cert", "keys")
async def parse_ios_keychain(keychain_root: str) -> str:
"""Locate and summarize iOS keychain entries under *keychain_root*.
*keychain_root* may be a path to ``keychain-2.db`` directly or to a
directory that contains it (e.g. ``.../var/keychains``).
"""
root = Path(keychain_root)
db: Path | None = None
if root.is_file() and root.name == "keychain-2.db":
db = root
elif root.is_dir():
candidate = root / "keychain-2.db"
if candidate.is_file():
db = candidate
else:
# Fall back to a shallow recursive search.
for found in root.rglob("keychain-2.db"):
db = found
break
if db is None:
return f"No keychain-2.db found under {keychain_root}."
try:
conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True)
except sqlite3.OperationalError as e:
return f"Error opening {db}: {e}"
try:
cur = conn.cursor()
cur.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name IN ({})".format(
",".join("?" * len(_KEYCHAIN_TABLES))
),
_KEYCHAIN_TABLES,
)
present = [r[0] for r in cur.fetchall()]
if not present:
return f"keychain-2.db at {db} has no recognised tables."
lines = [f"keychain: {db}"]
for name in present:
cur.execute(f"SELECT COUNT(*) FROM \"{name}\"")
count = cur.fetchone()[0]
lines.append(f"\n[{name}] {count} row(s)")
cur.execute(f"PRAGMA table_info(\"{name}\")")
cols = [r[1] for r in cur.fetchall()]
# Pick a useful subset of accounting columns when present.
preferred = [
c for c in ("agrp", "acct", "svce", "labl", "desc", "atyp", "srvr")
if c in cols
]
if not preferred:
preferred = cols[:5]
sel = ", ".join(f'"{c}"' for c in preferred)
cur.execute(f"SELECT {sel} FROM \"{name}\" LIMIT 30")
for row in cur.fetchall():
lines.append(" " + " | ".join(
(v.hex() if isinstance(v, bytes) else str(v))
for v in row
))
return _trunc("\n".join(lines))
finally:
conn.close()
# ---------------------------------------------------------------------------
# iDevice_info.txt
# ---------------------------------------------------------------------------
async def read_idevice_info(file_path: str, max_chars: int = 6000) -> str:
"""Read the standard iDevice_info.txt summary at the root of an iOS extraction.
The file is a flat ``Key: value`` dump from libimobiledevice / native
extraction tools. We surface the first *max_chars* of content verbatim
— the agent can search/extract specific keys via search_text_file if
the head isn't enough.
"""
p = Path(file_path)
if p.is_dir():
# Be helpful: if the agent passed the extraction root, find the file.
candidate = p / "iDevice_info.txt"
if candidate.is_file():
p = candidate
if not p.is_file():
return f"Error: {file_path} is not a file."
try:
with open(p, "r", encoding="utf-8", errors="replace") as f:
content = f.read(max_chars)
size = p.stat().st_size
header = f"iDevice_info: {p} ({size} bytes)\n"
if size > max_chars:
content += f"\n\n[Truncated: file is {size} bytes, showing first {max_chars}]"
return header + content
except Exception as e:
return f"Error reading {file_path}: {e}"

View File

@@ -215,20 +215,178 @@ async def parse_prefetch(file_path: str) -> str:
return f"[Error parsing Prefetch: {e}]"
async def list_extracted_dir(dir_path: str) -> str:
"""List files in an extracted directory."""
async def list_extracted_dir(dir_path: str, max_entries: int = 200) -> str:
"""Smart summary of a (potentially huge) extracted tree.
Earlier versions dumped up to 200 random entries then truncated — that
leaves the agent blind on 10k+-file iOS extractions. The new layout
returns a compact summary that scales: total counts, extension
breakdown, top-level directories with their sizes, and the largest
files. For targeted lookups (e.g. find every ``*.sqlite`` under the
tree) the agent should use ``find_files`` instead.
"""
if not os.path.isdir(dir_path):
return f"[Error: {dir_path} is not a directory]"
try:
entries = []
for root, dirs, files in os.walk(dir_path):
total_files = 0
total_bytes = 0
ext_counts: dict[str, int] = {}
ext_bytes: dict[str, int] = {}
top_level_dirs: dict[str, dict] = {}
biggest: list[tuple[int, str]] = [] # (size, relpath)
dir_path_abs = os.path.abspath(dir_path)
for root, dirs, files in os.walk(dir_path_abs):
# Track top-level directory aggregates (cheap; no per-entry cost
# beyond the walk we're already doing).
rel_root = os.path.relpath(root, dir_path_abs)
if rel_root == ".":
top_dirs = {d: {"files": 0, "bytes": 0} for d in dirs}
top_level_dirs.update(top_dirs)
top_key = None
else:
top_key = rel_root.split(os.sep, 1)[0]
if top_key not in top_level_dirs:
top_level_dirs[top_key] = {"files": 0, "bytes": 0}
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, dir_path)
size = os.path.getsize(full)
entries.append(f" {rel} ({size} bytes)")
if len(entries) > 200:
entries.append(f" ... (truncated)")
break
try:
size = os.path.getsize(full)
except OSError:
continue
total_files += 1
total_bytes += size
ext = os.path.splitext(f)[1].lower() or "(no ext)"
ext_counts[ext] = ext_counts.get(ext, 0) + 1
ext_bytes[ext] = ext_bytes.get(ext, 0) + size
if top_key is not None:
top_level_dirs[top_key]["files"] += 1
top_level_dirs[top_key]["bytes"] += size
# Maintain a top-10 largest list cheaply (bounded insertion).
if len(biggest) < 10:
biggest.append((size, os.path.relpath(full, dir_path_abs)))
biggest.sort(reverse=True)
elif size > biggest[-1][0]:
biggest[-1] = (size, os.path.relpath(full, dir_path_abs))
biggest.sort(reverse=True)
return f"Directory: {dir_path}\nFiles ({len(entries)}):\n" + "\n".join(entries)
def _human(n: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f}{unit}" if unit != "B" else f"{n}B"
n /= 1024
return f"{n:.1f}TB"
lines = [
f"Directory: {dir_path}",
f" Total: {total_files} file(s), {_human(total_bytes)}",
]
# Top-level directory layout (immediate children, sorted by file count).
if top_level_dirs:
lines.append(f"\nTop-level layout ({len(top_level_dirs)} dirs at root):")
sorted_tlds = sorted(
top_level_dirs.items(), key=lambda kv: -kv[1]["files"],
)[:15]
for d, stats in sorted_tlds:
lines.append(
f" {d}/ ({stats['files']} files, {_human(stats['bytes'])})"
)
if len(top_level_dirs) > 15:
lines.append(f" ... ({len(top_level_dirs) - 15} more top-level dirs)")
# Extension breakdown.
if ext_counts:
lines.append(f"\nExtension breakdown (top 15):")
for ext, count in sorted(ext_counts.items(), key=lambda kv: -kv[1])[:15]:
lines.append(
f" {ext}: {count} files, {_human(ext_bytes.get(ext, 0))}"
)
# Largest files (often the highest-value forensic targets).
if biggest:
lines.append("\nLargest files:")
for size, rel in biggest:
lines.append(f" {rel} ({_human(size)})")
lines.append(
f"\nNext step: call find_files with a pattern like "
f"'**/*.plist' or '**/keychain-2.db' to locate specific artefacts."
)
return "\n".join(lines)
except Exception as e:
return f"[Error listing {dir_path}: {e}]"
async def find_files(
root: str,
pattern: str,
max_results: int = 500,
) -> str:
"""Recursively find files under *root* whose path matches *pattern*.
Uses fnmatch-style globs against the *full relative path*; ``**`` is
treated as "any number of path segments" (so ``**/*.plist`` finds
every plist no matter how deep). Examples:
- ``**/sms.db`` — iOS SMS database
- ``**/keychain-2.db`` — iOS keychain
- ``**/ChatStorage.sqlite`` — WhatsApp app store
- ``HomeDomain/Library/**`` — anchor at a known iOS domain root
- ``**/*.{plist,sqlite,db}`` — multi-extension (use 2+ calls or a regex if needed)
Results are sorted by size descending — the biggest hits usually
matter most. Capped at *max_results* to keep the LLM context bounded.
"""
import fnmatch
if not os.path.isdir(root):
return f"[Error: {root} is not a directory]"
root_abs = os.path.abspath(root)
# Convert ``**`` (any-depth) to fnmatch's ``*`` (any chars including /).
# fnmatch doesn't natively distinguish segment vs path; expanding ``**``
# to ``*`` and letting fnmatch match the full relpath is good enough for
# forensic lookups.
fn_pattern = pattern.replace("**", "*")
hits: list[tuple[int, str]] = []
truncated = False
try:
for dirpath, _dirs, files in os.walk(root_abs):
for f in files:
full = os.path.join(dirpath, f)
rel = os.path.relpath(full, root_abs)
if fnmatch.fnmatch(rel, fn_pattern) or fnmatch.fnmatch(f, fn_pattern):
try:
size = os.path.getsize(full)
except OSError:
size = 0
hits.append((size, rel))
if len(hits) >= max_results * 4:
# Hard upper bound to keep the walk cheap on huge trees.
truncated = True
break
if truncated:
break
except Exception as e:
return f"[Error searching {root}: {e}]"
hits.sort(reverse=True)
if len(hits) > max_results:
truncated = True
hits = hits[:max_results]
lines = [
f"find_files: pattern={pattern!r} under {root}",
f" matches: {len(hits)}" + (" (truncated)" if truncated else ""),
]
if not hits:
lines.append(" (no matches)")
else:
for size, rel in hits:
lines.append(f" {rel} ({size} bytes)")
return "\n".join(lines)

50
uv.lock generated
View File

@@ -170,6 +170,8 @@ source = { virtual = "." }
dependencies = [
{ name = "httpx", extra = ["socks"] },
{ name = "openai" },
{ name = "pillow" },
{ name = "pytesseract" },
{ name = "pyyaml" },
{ name = "regipy" },
]
@@ -184,6 +186,8 @@ dev = [
requires-dist = [
{ name = "httpx", extras = ["socks"], specifier = ">=0.28.1" },
{ name = "openai", specifier = ">=2.36.0" },
{ name = "pillow", specifier = ">=12.2.0" },
{ name = "pytesseract", specifier = ">=0.3.13" },
{ name = "pyyaml" },
{ name = "regipy", specifier = ">=6.2.1" },
]
@@ -222,6 +226,39 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366, upload-time = "2026-01-21T20:50:37.788Z" },
]
[[package]]
name = "pillow"
version = "12.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8c/21/c2bcdd5906101a30244eaffc1b6e6ce71a31bd0742a01eb89e660ebfac2d/pillow-12.2.0.tar.gz", hash = "sha256:a830b1a40919539d07806aa58e1b114df53ddd43213d9c8b75847eee6c0182b5", size = 46987819, upload-time = "2026-04-01T14:46:17.687Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bf/98/4595daa2365416a86cb0d495248a393dfc84e96d62ad080c8546256cb9c0/pillow-12.2.0-cp314-cp314-ios_13_0_arm64_iphoneos.whl", hash = "sha256:3adc9215e8be0448ed6e814966ecf3d9952f0ea40eb14e89a102b87f450660d8", size = 4100848, upload-time = "2026-04-01T14:44:48.48Z" },
{ url = "https://files.pythonhosted.org/packages/0b/79/40184d464cf89f6663e18dfcf7ca21aae2491fff1a16127681bf1fa9b8cf/pillow-12.2.0-cp314-cp314-ios_13_0_arm64_iphonesimulator.whl", hash = "sha256:6a9adfc6d24b10f89588096364cc726174118c62130c817c2837c60cf08a392b", size = 4176515, upload-time = "2026-04-01T14:44:51.353Z" },
{ url = "https://files.pythonhosted.org/packages/b0/63/703f86fd4c422a9cf722833670f4f71418fb116b2853ff7da722ea43f184/pillow-12.2.0-cp314-cp314-ios_13_0_x86_64_iphonesimulator.whl", hash = "sha256:6a6e67ea2e6feda684ed370f9a1c52e7a243631c025ba42149a2cc5934dec295", size = 3640159, upload-time = "2026-04-01T14:44:53.588Z" },
{ url = "https://files.pythonhosted.org/packages/71/e0/fb22f797187d0be2270f83500aab851536101b254bfa1eae10795709d283/pillow-12.2.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:2bb4a8d594eacdfc59d9e5ad972aa8afdd48d584ffd5f13a937a664c3e7db0ed", size = 5312185, upload-time = "2026-04-01T14:44:56.039Z" },
{ url = "https://files.pythonhosted.org/packages/ba/8c/1a9e46228571de18f8e28f16fabdfc20212a5d019f3e3303452b3f0a580d/pillow-12.2.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:80b2da48193b2f33ed0c32c38140f9d3186583ce7d516526d462645fd98660ae", size = 4695386, upload-time = "2026-04-01T14:44:58.663Z" },
{ url = "https://files.pythonhosted.org/packages/70/62/98f6b7f0c88b9addd0e87c217ded307b36be024d4ff8869a812b241d1345/pillow-12.2.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:22db17c68434de69d8ecfc2fe821569195c0c373b25cccb9cbdacf2c6e53c601", size = 6280384, upload-time = "2026-04-01T14:45:01.5Z" },
{ url = "https://files.pythonhosted.org/packages/5e/03/688747d2e91cfbe0e64f316cd2e8005698f76ada3130d0194664174fa5de/pillow-12.2.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7b14cc0106cd9aecda615dd6903840a058b4700fcb817687d0ee4fc8b6e389be", size = 8091599, upload-time = "2026-04-01T14:45:04.5Z" },
{ url = "https://files.pythonhosted.org/packages/f6/35/577e22b936fcdd66537329b33af0b4ccfefaeabd8aec04b266528cddb33c/pillow-12.2.0-cp314-cp314-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8cbeb542b2ebc6fcdacabf8aca8c1a97c9b3ad3927d46b8723f9d4f033288a0f", size = 6396021, upload-time = "2026-04-01T14:45:07.117Z" },
{ url = "https://files.pythonhosted.org/packages/11/8d/d2532ad2a603ca2b93ad9f5135732124e57811d0168155852f37fbce2458/pillow-12.2.0-cp314-cp314-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4bfd07bc812fbd20395212969e41931001fd59eb55a60658b0e5710872e95286", size = 7083360, upload-time = "2026-04-01T14:45:09.763Z" },
{ url = "https://files.pythonhosted.org/packages/5e/26/d325f9f56c7e039034897e7380e9cc202b1e368bfd04d4cbe6a441f02885/pillow-12.2.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:9aba9a17b623ef750a4d11b742cbafffeb48a869821252b30ee21b5e91392c50", size = 6507628, upload-time = "2026-04-01T14:45:12.378Z" },
{ url = "https://files.pythonhosted.org/packages/5f/f7/769d5632ffb0988f1c5e7660b3e731e30f7f8ec4318e94d0a5d674eb65a4/pillow-12.2.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:deede7c263feb25dba4e82ea23058a235dcc2fe1f6021025dc71f2b618e26104", size = 7209321, upload-time = "2026-04-01T14:45:15.122Z" },
{ url = "https://files.pythonhosted.org/packages/6a/7a/c253e3c645cd47f1aceea6a8bacdba9991bf45bb7dfe927f7c893e89c93c/pillow-12.2.0-cp314-cp314-win32.whl", hash = "sha256:632ff19b2778e43162304d50da0181ce24ac5bb8180122cbe1bf4673428328c7", size = 6479723, upload-time = "2026-04-01T14:45:17.797Z" },
{ url = "https://files.pythonhosted.org/packages/cd/8b/601e6566b957ca50e28725cb6c355c59c2c8609751efbecd980db44e0349/pillow-12.2.0-cp314-cp314-win_amd64.whl", hash = "sha256:4e6c62e9d237e9b65fac06857d511e90d8461a32adcc1b9065ea0c0fa3a28150", size = 7217400, upload-time = "2026-04-01T14:45:20.529Z" },
{ url = "https://files.pythonhosted.org/packages/d6/94/220e46c73065c3e2951bb91c11a1fb636c8c9ad427ac3ce7d7f3359b9b2f/pillow-12.2.0-cp314-cp314-win_arm64.whl", hash = "sha256:b1c1fbd8a5a1af3412a0810d060a78b5136ec0836c8a4ef9aa11807f2a22f4e1", size = 2554835, upload-time = "2026-04-01T14:45:23.162Z" },
{ url = "https://files.pythonhosted.org/packages/b6/ab/1b426a3974cb0e7da5c29ccff4807871d48110933a57207b5a676cccc155/pillow-12.2.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:57850958fe9c751670e49b2cecf6294acc99e562531f4bd317fa5ddee2068463", size = 5314225, upload-time = "2026-04-01T14:45:25.637Z" },
{ url = "https://files.pythonhosted.org/packages/19/1e/dce46f371be2438eecfee2a1960ee2a243bbe5e961890146d2dee1ff0f12/pillow-12.2.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:d5d38f1411c0ed9f97bcb49b7bd59b6b7c314e0e27420e34d99d844b9ce3b6f3", size = 4698541, upload-time = "2026-04-01T14:45:28.355Z" },
{ url = "https://files.pythonhosted.org/packages/55/c3/7fbecf70adb3a0c33b77a300dc52e424dc22ad8cdc06557a2e49523b703d/pillow-12.2.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:5c0a9f29ca8e79f09de89293f82fc9b0270bb4af1d58bc98f540cc4aedf03166", size = 6322251, upload-time = "2026-04-01T14:45:30.924Z" },
{ url = "https://files.pythonhosted.org/packages/1c/3c/7fbc17cfb7e4fe0ef1642e0abc17fc6c94c9f7a16be41498e12e2ba60408/pillow-12.2.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:1610dd6c61621ae1cf811bef44d77e149ce3f7b95afe66a4512f8c59f25d9ebe", size = 8127807, upload-time = "2026-04-01T14:45:33.908Z" },
{ url = "https://files.pythonhosted.org/packages/ff/c3/a8ae14d6defd2e448493ff512fae903b1e9bd40b72efb6ec55ce0048c8ce/pillow-12.2.0-cp314-cp314t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a34329707af4f73cf1782a36cd2289c0368880654a2c11f027bcee9052d35dd", size = 6433935, upload-time = "2026-04-01T14:45:36.623Z" },
{ url = "https://files.pythonhosted.org/packages/6e/32/2880fb3a074847ac159d8f902cb43278a61e85f681661e7419e6596803ed/pillow-12.2.0-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8e9c4f5b3c546fa3458a29ab22646c1c6c787ea8f5ef51300e5a60300736905e", size = 7116720, upload-time = "2026-04-01T14:45:39.258Z" },
{ url = "https://files.pythonhosted.org/packages/46/87/495cc9c30e0129501643f24d320076f4cc54f718341df18cc70ec94c44e1/pillow-12.2.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:fb043ee2f06b41473269765c2feae53fc2e2fbf96e5e22ca94fb5ad677856f06", size = 6540498, upload-time = "2026-04-01T14:45:41.879Z" },
{ url = "https://files.pythonhosted.org/packages/18/53/773f5edca692009d883a72211b60fdaf8871cbef075eaa9d577f0a2f989e/pillow-12.2.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:f278f034eb75b4e8a13a54a876cc4a5ab39173d2cdd93a638e1b467fc545ac43", size = 7239413, upload-time = "2026-04-01T14:45:44.705Z" },
{ url = "https://files.pythonhosted.org/packages/c9/e4/4b64a97d71b2a83158134abbb2f5bd3f8a2ea691361282f010998f339ec7/pillow-12.2.0-cp314-cp314t-win32.whl", hash = "sha256:6bb77b2dcb06b20f9f4b4a8454caa581cd4dd0643a08bacf821216a16d9c8354", size = 6482084, upload-time = "2026-04-01T14:45:47.568Z" },
{ url = "https://files.pythonhosted.org/packages/ba/13/306d275efd3a3453f72114b7431c877d10b1154014c1ebbedd067770d629/pillow-12.2.0-cp314-cp314t-win_amd64.whl", hash = "sha256:6562ace0d3fb5f20ed7290f1f929cae41b25ae29528f2af1722966a0a02e2aa1", size = 7225152, upload-time = "2026-04-01T14:45:50.032Z" },
{ url = "https://files.pythonhosted.org/packages/ff/6e/cf826fae916b8658848d7b9f38d88da6396895c676e8086fc0988073aaf8/pillow-12.2.0-cp314-cp314t-win_arm64.whl", hash = "sha256:aa88ccfe4e32d362816319ed727a004423aab09c5cea43c01a4b435643fa34eb", size = 2556579, upload-time = "2026-04-01T14:45:52.529Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
@@ -296,6 +333,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
]
[[package]]
name = "pytesseract"
version = "0.3.13"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "packaging" },
{ name = "pillow" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/a6/7d679b83c285974a7cb94d739b461fa7e7a9b17a3abfd7bf6cbc5c2394b0/pytesseract-0.3.13.tar.gz", hash = "sha256:4bf5f880c99406f52a3cfc2633e42d9dc67615e69d8a509d74867d3baddb5db9", size = 17689, upload-time = "2024-08-16T02:33:56.762Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7a/33/8312d7ce74670c9d39a532b2c246a853861120486be9443eebf048043637/pytesseract-0.3.13-py3-none-any.whl", hash = "sha256:7a99c6c2ac598360693d83a416e36e0b33a67638bb9d77fdcac094a3589d4b34", size = 14705, upload-time = "2024-08-16T02:36:10.09Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"