"""Central tool registry — catalogs all available forensic tools. Tools are registered once at startup. Sleuth Kit tools resolve their image path and partition offset from graph.active_source at call time, so a single registered tool follows whichever evidence source is currently active. The AgentFactory uses this catalog to compose agents dynamically. """ from __future__ import annotations import hashlib import json import logging import os import re from dataclasses import dataclass, field from typing import Any from evidence_graph import GroundingError from tools import archive as arc from tools import media as med from tools import mobile_android as android from tools import mobile_ios as ios from tools import parsers from tools import registry as reg from tools import sleuthkit as tsk from tools import strategy as strat logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool result cache — keyed by (tool_name, args_hash). # Disk image tools are deterministic (image is read-only), so identical # calls always produce the same output. # --------------------------------------------------------------------------- _tool_result_cache: dict[str, str] = {} # Tools safe to cache: deterministic reads with no side effects. CACHEABLE_TOOLS: set[str] = { "partition_info", "filesystem_info", "list_directory", "find_file", "search_strings", "count_deleted_files", "build_filesystem_timeline", "parse_registry_key", "search_registry", "get_user_activity", "read_text_file", "read_binary_preview", "search_text_file", "read_text_file_section", "list_extracted_dir", "parse_pcap_strings", "find_files", # iOS (read-only file parses): "parse_plist", "sqlite_tables", "sqlite_query", "parse_ios_keychain", "read_idevice_info", # Android + media (read-only): "probe_android_partitions", "ocr_image", # NB: unzip_archive and set_active_partition are NOT cached — they have side effects. } def _cache_key(tool_name: str, kwargs: dict) -> str: """Build a deterministic cache key from tool name + arguments.""" args_str = json.dumps(kwargs, sort_keys=True, ensure_ascii=False) args_hash = hashlib.md5(args_str.encode()).hexdigest() return f"{tool_name}:{args_hash}" def _looks_like_error(text: str) -> bool: """Heuristic for unsuccessful tool output (mirrors the prior cache filter).""" return text.startswith("Error") or text.startswith("[Command failed") or text.startswith("[icat failed") def _make_cached(tool_name: str, executor: Any) -> Any: """Thin in-memory cache wrapper around a tool executor. Kept as a standalone primitive (no graph dependency) so unit tests can exercise caching in isolation. Production wiring composes this with invocation logging via :func:`_make_invocation_executor`. """ async def wrapper(**kwargs) -> str: key = _cache_key(tool_name, kwargs) hit = _tool_result_cache.get(key) if hit is not None: return hit result = await executor(**kwargs) if not _looks_like_error(result): _tool_result_cache[key] = result return result return wrapper def _make_invocation_executor( tool_name: str, executor: Any, graph: Any, *, cacheable: bool, auto_record_category: str | None = None, ) -> Any: """Single uniform wrapper around a forensic tool executor. Responsibilities (in order): 1. Serve from the result cache when ``cacheable=True`` and the key is hot. Cached hits still produce a fresh ToolInvocation record marked ``cached=True`` so the agent can cite their work. 2. Call the underlying executor on cache miss; store on success. 3. Record a :class:`ToolInvocation` on the graph (this is the provenance unit the grounding gateway looks up). 4. (Optionally) auto-record the raw output as a Phenomenon with a single ``type=raw`` fact citing the invocation just made. This replaces the pre-S2 ``_make_auto_record`` shortcut. 5. Return the result with a ``[invocation: inv-xxx]`` header so the LLM learns the ID to put in ``add_phenomenon`` facts. """ async def wrapper(**kwargs) -> str: cached_flag = False cache_hit_key: str | None = None text: str | None = None if cacheable: cache_hit_key = _cache_key(tool_name, kwargs) hit = _tool_result_cache.get(cache_hit_key) if hit is not None: logger.debug("Cache hit: %s(%s)", tool_name, kwargs) text, cached_flag = hit, True if text is None: text = await executor(**kwargs) if cacheable and cache_hit_key and not _looks_like_error(text): _tool_result_cache[cache_hit_key] = text inv_id = await graph.record_tool_invocation( tool=tool_name, args=kwargs, output=text, cached=cached_flag, ) # Auto-record the raw output as a phenomenon (single grounded fact). # Skipped on error outputs and when no graph is present. if auto_record_category and not _looks_like_error(text): agent = getattr(graph, "_current_agent", "") or "unknown" first_line = text.split("\n", 1)[0][:80] try: await graph.add_phenomenon( source_agent=agent, category=auto_record_category, title=f"{tool_name}: {first_line}", interpretation="(auto-recorded raw tool output)", verified_facts=[{ "type": "raw", "value": text[:2000], "invocation_id": inv_id, }], source_tool=tool_name, ) except GroundingError as e: # Should never happen for auto-record (we just wrote the # invocation; value is a literal prefix of output). Log # loudly if it does — that's a bug, not a hallucination. logger.error("Auto-record grounding failed for %s: %s", tool_name, e) return f"[invocation: {inv_id}]\n{text}" return wrapper def get_cache_stats() -> dict[str, int]: """Return cache statistics for diagnostics.""" return {"entries": len(_tool_result_cache)} # Category auto-detection patterns (filename → category) _REGISTRY_HIVE_NAMES = {"system", "software", "sam", "ntuser.dat", "security", "default"} ASSET_CATEGORIES = [ "registry_hive", "chat_log", "prefetch", "network_capture", "config_file", "address_book", "recycle_bin", "executable", "text_log", "other", ] def _auto_categorize_windows(filename: str) -> str: """Original Windows-leaning heuristic for disk-image-extracted artifacts.""" name_lower = filename.lower() ext = os.path.splitext(name_lower)[1] if name_lower in _REGISTRY_HIVE_NAMES: return "registry_hive" if ext == ".pf": return "prefetch" if ext in (".pcap", ".cap") or name_lower == "interception": return "network_capture" if ext == ".wab": return "address_book" if name_lower == "info2" or re.match(r"dc\d+\.exe", name_lower): return "recycle_bin" # Extension-based checks before keyword-based (e.g. mirc.ini → config, not chat). if ext in (".ini", ".csv", ".dat", ".cfg"): return "config_file" if ext in (".log", ".lst"): if any(kw in name_lower for kw in ("irc", "mirc", "channel", "chat")): return "chat_log" return "text_log" if any(kw in name_lower for kw in ("irc", "mirc", "channel", "chat")): return "chat_log" if ext in (".exe", ".dll", ".com"): return "executable" return "other" def _auto_categorize_ios(filename: str) -> str: """iOS extraction heuristic — plist / sqlite / keychain land here. Domain-rooted iOS extractions yield specific filenames (sms.db, AddressBook.sqlitedb, keychain-2.db, *.plist) that the Windows categorizer would dump into 'other' — fixing P4. """ name_lower = filename.lower() ext = os.path.splitext(name_lower)[1] if name_lower == "keychain-2.db": return "ios_keychain" if name_lower in ("sms.db", "chatstorage.sqlite"): return "messaging_db" if name_lower in ("addressbook.sqlitedb", "addressbookimages.sqlitedb"): return "address_book" if name_lower == "idevice_info.txt": return "device_info" if ext in (".sqlite", ".sqlite3", ".sqlitedb", ".db"): return "sqlite_db" if ext == ".plist": return "plist" if ext in (".log",): return "text_log" return "other" # Per-source-type categorizers — dispatched by _auto_categorize at call time # based on graph.active_source.type. Solves P4 (Windows-only categorization). _CATEGORIZERS = { "disk_image": _auto_categorize_windows, "mobile_extraction": _auto_categorize_ios, "archive": _auto_categorize_windows, "media_collection": lambda fn: "other", } def _auto_categorize(filename: str, source_type: str = "disk_image") -> str: """Dispatch to a source-type-aware categorizer (defaults to Windows).""" fn = _CATEGORIZERS.get(source_type, _auto_categorize_windows) return fn(filename) @dataclass class ToolDefinition: """A registered tool available for agent composition.""" name: str description: str input_schema: dict executor: Any # async callable (or sync for some parsers) module: str # "sleuthkit", "registry", "parsers" tags: list[str] = field(default_factory=list) # Global tool catalog, populated by register_all_tools(). TOOL_CATALOG: dict[str, ToolDefinition] = {} # Set of (tool_name, category) pairs that auto-record a phenomenon when run. # Replaces the pre-S2 ``_make_auto_record`` per-tool wrapping; the central # instrumentation pass at the end of register_all_tools applies these. AUTO_RECORD_TOOLS: dict[str, str] = { "list_installed_software": "registry", "get_system_info": "registry", "get_timezone_info": "registry", "get_computer_name": "registry", "get_shutdown_time": "registry", "enumerate_users": "registry", "get_network_interfaces": "registry", "get_email_config": "registry", "parse_prefetch": "filesystem", } def register_all_tools(graph: Any) -> None: """Populate TOOL_CATALOG with all available forensic tools. Tools no longer close over a fixed image path. The Sleuth Kit tools resolve the image path and partition offset from ``graph.active_source`` at call time, so the same registered tool follows whichever evidence source the orchestrator has made active. """ TOOL_CATALOG.clear() def _img() -> str: """Resolve the active source's image path at tool-call time.""" src = getattr(graph, "active_source", None) if src is None or not src.path: raise RuntimeError( "No active evidence source — call graph.set_active_source() first." ) return src.path def _off() -> int: """Resolve the active source's partition offset at tool-call time.""" src = getattr(graph, "active_source", None) return src.partition_offset if src is not None else 0 # ---- Sleuth Kit tools ---- TOOL_CATALOG["partition_info"] = ToolDefinition( name="partition_info", description="Get the partition table layout of the disk image. Run this first to understand disk structure.", input_schema={"type": "object", "properties": {}}, executor=lambda: tsk.partition_info(_img()), module="sleuthkit", tags=["filesystem", "disk", "partition"], ) TOOL_CATALOG["filesystem_info"] = ToolDefinition( name="filesystem_info", description="Get detailed filesystem information (type, block size, volume name, etc.) for the selected partition.", input_schema={"type": "object", "properties": {}}, executor=lambda: tsk.filesystem_info(_img(), _off()), module="sleuthkit", tags=["filesystem", "disk"], ) TOOL_CATALOG["list_directory"] = ToolDefinition( name="list_directory", description="List files and directories. Without inode, lists root. Use recursive=true for all files.", input_schema={ "type": "object", "properties": { "inode": {"type": "string", "description": "Inode of directory. Omit for root."}, "recursive": {"type": "boolean", "description": "List all files recursively."}, }, }, executor=lambda inode=None, recursive=False: tsk.list_directory( _img(), _off(), inode, recursive ), module="sleuthkit", tags=["filesystem", "directory", "listing"], ) async def _extract_with_tracking(inode: str) -> str: """Extract a file by inode. Name and category are derived from the real disk path.""" # Dedup if graph is not None: existing = graph.lookup_asset_by_inode(inode) if existing is not None: return ( f"Already extracted: {existing.local_path} " f"({existing.size_bytes} bytes, {existing.category}). " f"Disk path: {existing.original_path}" ) # Resolve real disk path first orig_path = (await tsk.find_file(_img(), inode, _off())).strip() if not orig_path or "not found" in orig_path.lower(): return f"Error: inode {inode} not found on the disk image." # Derive local filename from real disk path filename = os.path.basename(orig_path) extracted_dir = graph.extracted_dir local_path = os.path.join(extracted_dir, filename) # Handle name collisions by appending inode if os.path.exists(local_path): base, ext = os.path.splitext(filename) local_path = os.path.join(extracted_dir, f"{base}_{inode.replace('-', '_')}{ext}") filename = os.path.basename(local_path) # Extract result = await tsk.extract_file(_img(), inode, local_path, _off()) if result.startswith("[icat failed"): return result size = os.path.getsize(local_path) if os.path.exists(local_path) else 0 src_type = ( graph.active_source.type if graph.active_source else "disk_image" ) category = _auto_categorize(os.path.basename(orig_path), src_type) # Register if graph is not None: agent_name = getattr(graph, "_current_agent", "") or "unknown" await graph.register_asset( inode=inode, original_path=orig_path, local_path=local_path, category=category, filename=filename, size_bytes=size, extracted_by=agent_name, ) logger.info("Asset registered: %s (%s, %d bytes)", local_path, category, size) return ( f"Extracted to {local_path} ({size} bytes, {category})\n" f"Disk path: {orig_path}" ) TOOL_CATALOG["extract_file"] = ToolDefinition( name="extract_file", description=( "Extract a file from the disk image by inode number. " "The filename is automatically determined from the disk path. " "Checks if already extracted (returns existing path if so). " "Returns the local path and the original disk path." ), input_schema={ "type": "object", "properties": { "inode": {"type": "string", "description": "Inode number of the file (e.g. '334-128-4' or '334')."}, }, "required": ["inode"], }, executor=_extract_with_tracking, module="sleuthkit", tags=["filesystem", "extraction"], ) TOOL_CATALOG["find_file"] = ToolDefinition( name="find_file", description="Find the file path for a given inode number.", input_schema={ "type": "object", "properties": { "inode": {"type": "string", "description": "Inode number to look up."}, }, "required": ["inode"], }, executor=lambda inode: tsk.find_file(_img(), inode, _off()), module="sleuthkit", tags=["filesystem"], ) TOOL_CATALOG["search_strings"] = ToolDefinition( name="search_strings", description="Search for a string pattern across the entire disk image (slow on first call, fast after). Prefer search_text_file on already-extracted files when possible.", input_schema={ "type": "object", "properties": { "pattern": {"type": "string", "description": "String pattern (case-insensitive grep)."}, }, "required": ["pattern"], }, executor=lambda pattern: tsk.search_strings(_img(), pattern), module="sleuthkit", tags=["filesystem", "search", "strings"], ) TOOL_CATALOG["count_deleted_files"] = ToolDefinition( name="count_deleted_files", description="List and count all deleted files. Shows total count, executables, and extension breakdown.", input_schema={"type": "object", "properties": {}}, executor=lambda: tsk.count_deleted_files(_img(), _off()), module="sleuthkit", tags=["filesystem", "deleted", "recovery"], ) TOOL_CATALOG["build_filesystem_timeline"] = ToolDefinition( name="build_filesystem_timeline", description="Build a MAC timeline from the filesystem (Modified/Accessed/Changed times for all files).", input_schema={"type": "object", "properties": {}}, executor=lambda: tsk.build_timeline(_img(), _off()), module="sleuthkit", tags=["filesystem", "timeline"], ) # ---- Registry tools ---- TOOL_CATALOG["parse_registry_key"] = ToolDefinition( name="parse_registry_key", description="Parse a registry hive file and list subkeys/values at a given path.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to extracted hive file."}, "key_path": {"type": "string", "description": "Registry key path to inspect."}, }, "required": ["hive_path", "key_path"], }, executor=lambda hive_path, key_path: reg.parse_registry_key(hive_path, key_path), module="registry", tags=["registry", "hive"], ) TOOL_CATALOG["list_installed_software"] = ToolDefinition( name="list_installed_software", description="List installed software from a SOFTWARE registry hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SOFTWARE hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.list_installed_software(hive_path), module="registry", tags=["registry", "software", "installed"], ) TOOL_CATALOG["get_user_activity"] = ToolDefinition( name="get_user_activity", description="Extract user activity from NTUSER.DAT (recent docs, typed URLs, run dialog history).", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to NTUSER.DAT."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_user_activity(hive_path), module="registry", tags=["registry", "user", "activity"], ) TOOL_CATALOG["search_registry"] = ToolDefinition( name="search_registry", description="Search for a pattern in registry key names and values.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to hive file."}, "pattern": {"type": "string", "description": "Search pattern."}, }, "required": ["hive_path", "pattern"], }, executor=lambda hive_path, pattern: reg.search_registry(hive_path, pattern), module="registry", tags=["registry", "search"], ) # ---- Registry tools (auto-record: results are forensic facts) ---- TOOL_CATALOG["get_system_info"] = ToolDefinition( name="get_system_info", description="Extract OS version, install date, and registered owner from a SOFTWARE hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SOFTWARE hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_system_info(hive_path), module="registry", tags=["registry", "system"], ) TOOL_CATALOG["get_timezone_info"] = ToolDefinition( name="get_timezone_info", description="Extract timezone settings from a SYSTEM hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SYSTEM hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_timezone_info(hive_path), module="registry", tags=["registry", "timezone", "system"], ) TOOL_CATALOG["get_computer_name"] = ToolDefinition( name="get_computer_name", description="Extract computer/host name from a SYSTEM hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SYSTEM hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_computer_name(hive_path), module="registry", tags=["registry", "system", "hostname"], ) TOOL_CATALOG["get_shutdown_time"] = ToolDefinition( name="get_shutdown_time", description="Extract last shutdown time from a SYSTEM hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SYSTEM hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_shutdown_time(hive_path), module="registry", tags=["registry", "system", "shutdown"], ) TOOL_CATALOG["enumerate_users"] = ToolDefinition( name="enumerate_users", description="List all user accounts and RIDs from a SAM hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SAM hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.enumerate_users(hive_path), module="registry", tags=["registry", "user", "accounts", "sam"], ) TOOL_CATALOG["get_network_interfaces"] = ToolDefinition( name="get_network_interfaces", description="Extract network adapter and TCP/IP config from a SYSTEM hive.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to SYSTEM hive."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_network_interfaces(hive_path), module="registry", tags=["registry", "network", "adapter", "ip"], ) TOOL_CATALOG["get_email_config"] = ToolDefinition( name="get_email_config", description="Extract email account configuration (SMTP, POP3, NNTP) from NTUSER.DAT.", input_schema={ "type": "object", "properties": { "hive_path": {"type": "string", "description": "Path to NTUSER.DAT."}, }, "required": ["hive_path"], }, executor=lambda hive_path: reg.get_email_config(hive_path), module="registry", tags=["registry", "email", "account"], ) # ---- Parser tools ---- TOOL_CATALOG["parse_prefetch"] = ToolDefinition( name="parse_prefetch", description="Parse a Windows Prefetch (.pf) file to extract executable name, last execution time, and run count.", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to extracted .pf file."}, }, "required": ["file_path"], }, executor=lambda file_path: parsers.parse_prefetch(file_path), module="parsers", tags=["filesystem", "prefetch", "execution"], ) TOOL_CATALOG["read_text_file"] = ToolDefinition( name="read_text_file", description="Read an extracted text file (configs, logs, chat logs, etc.).", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Local path to the file."}, }, "required": ["file_path"], }, executor=lambda file_path: parsers.read_text_file(file_path), module="parsers", tags=["text", "read"], ) TOOL_CATALOG["read_binary_preview"] = ToolDefinition( name="read_binary_preview", description="Preview a binary file in hex+ASCII format.", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Local path to the file."}, }, "required": ["file_path"], }, executor=lambda file_path: parsers.read_binary_preview(file_path), module="parsers", tags=["binary", "hex", "preview"], ) TOOL_CATALOG["search_text_file"] = ToolDefinition( name="search_text_file", description="Search for a regex pattern in an extracted text file. Returns matching lines with line numbers.", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to extracted file."}, "pattern": {"type": "string", "description": "Regex pattern."}, }, "required": ["file_path", "pattern"], }, executor=lambda file_path, pattern: parsers.search_text_file(file_path, pattern), module="parsers", tags=["text", "search", "regex"], ) TOOL_CATALOG["read_text_file_section"] = ToolDefinition( name="read_text_file_section", description="Read a section of a large text file starting at a byte offset.", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to file."}, "start": {"type": "integer", "description": "Byte offset to start reading."}, "max_bytes": {"type": "integer", "description": "Maximum bytes to read."}, }, "required": ["file_path"], }, executor=lambda file_path, start=0, max_bytes=8000: parsers.read_text_file_section( file_path, start, max_bytes ), module="parsers", tags=["text", "read", "section"], ) TOOL_CATALOG["list_extracted_dir"] = ToolDefinition( name="list_extracted_dir", description=( "Summarise an extracted directory tree: total counts, " "extension breakdown, top-level layout, largest files. " "Scales to 10k+-file trees without truncating into uselessness. " "For targeted searches (find every *.plist, locate sms.db, ...) " "use find_files instead." ), input_schema={ "type": "object", "properties": { "dir_path": {"type": "string", "description": "Directory path."}, }, "required": ["dir_path"], }, executor=lambda dir_path: parsers.list_extracted_dir(dir_path), module="parsers", tags=["filesystem", "listing", "extracted"], ) TOOL_CATALOG["find_files"] = ToolDefinition( name="find_files", description=( "Recursively find files under a directory by glob pattern. " "Use this on tree-mode sources (iOS extractions, archives, " "Android-mounted partitions) to locate specific artefacts in " "huge trees. Patterns are fnmatch-style; '**' means 'any " "depth'. Examples: '**/sms.db', '**/keychain-2.db', " "'**/ChatStorage.sqlite', '**/*.plist', 'HomeDomain/Library/**'. " "Results sort by size descending; capped at max_results." ), input_schema={ "type": "object", "properties": { "root": {"type": "string", "description": "Directory to search under."}, "pattern": {"type": "string", "description": "fnmatch glob pattern (use '**' for any depth)."}, "max_results": {"type": "integer", "description": "Result cap (default 500)."}, }, "required": ["root", "pattern"], }, executor=lambda root, pattern, max_results=500: parsers.find_files(root, pattern, max_results), module="parsers", tags=["filesystem", "search", "extracted", "glob"], ) TOOL_CATALOG["parse_pcap_strings"] = ToolDefinition( name="parse_pcap_strings", description="Extract HTTP headers, hosts, User-Agent, cookies, and URLs from a PCAP/capture file.", input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to PCAP file."}, }, "required": ["file_path"], }, executor=lambda file_path: parsers.parse_pcap_strings(file_path), module="parsers", tags=["network", "pcap", "http", "capture"], ) # ---- Archive tools (tree-mode prep) ---- TOOL_CATALOG["unzip_archive"] = ToolDefinition( name="unzip_archive", description=( "Extract a .zip archive into a target directory. Defensive against " "zip-slip; skips symlinks. Idempotent on rerun. Pass `password` for " "password-protected zips — only the legacy ZipCrypto algorithm is " "supported by stdlib (AES zips need an external `7z x` step)." ), input_schema={ "type": "object", "properties": { "zip_path": {"type": "string", "description": "Path to the .zip file."}, "dest_dir": {"type": "string", "description": "Directory to extract into (created if missing)."}, "password": {"type": "string", "description": "Password for encrypted zips (omit for plain archives)."}, }, "required": ["zip_path", "dest_dir"], }, executor=lambda zip_path, dest_dir, password=None: arc.unzip_archive(zip_path, dest_dir, password), module="archive", tags=["archive", "zip", "extract", "ingest"], ) # ---- iOS plugin tools (DESIGN.md §4.7) ---- TOOL_CATALOG["parse_plist"] = ToolDefinition( name="parse_plist", description=( "Parse a .plist file (XML or binary) and return its contents as JSON. " "Bytes are rendered as hex; dates as ISO-8601." ), input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to .plist file."}, }, "required": ["file_path"], }, executor=lambda file_path: ios.parse_plist(file_path), module="mobile_ios", tags=["ios", "plist", "parse"], ) TOOL_CATALOG["sqlite_tables"] = ToolDefinition( name="sqlite_tables", description=( "List user tables in a sqlite database with row counts and column " "names. Use this to scout an unfamiliar .sqlite / .db file before " "querying it." ), input_schema={ "type": "object", "properties": { "db_path": {"type": "string", "description": "Path to .sqlite/.db file."}, }, "required": ["db_path"], }, executor=lambda db_path: ios.sqlite_tables(db_path), module="mobile_ios", tags=["sqlite", "schema", "ios", "android"], ) TOOL_CATALOG["sqlite_query"] = ToolDefinition( name="sqlite_query", description=( "Run a single read-only SELECT against a sqlite file. " "Multi-statement queries and non-SELECT statements are rejected. " "Use this for sms.db / ChatStorage.sqlite / AddressBook.sqlitedb / etc." ), input_schema={ "type": "object", "properties": { "db_path": {"type": "string", "description": "Path to .sqlite/.db file."}, "query": {"type": "string", "description": "A single SELECT statement."}, "max_rows": {"type": "integer", "description": "Row cap (default 100)."}, }, "required": ["db_path", "query"], }, executor=lambda db_path, query, max_rows=100: ios.sqlite_query(db_path, query, max_rows), module="mobile_ios", tags=["sqlite", "query", "ios", "android"], ) TOOL_CATALOG["parse_ios_keychain"] = ToolDefinition( name="parse_ios_keychain", description=( "Locate and summarise iOS keychain entries (keychain-2.db). " "Pass either the db file directly or the containing directory; " "dumps accounting metadata from genp/inet/cert/keys tables." ), input_schema={ "type": "object", "properties": { "keychain_root": { "type": "string", "description": "Path to keychain-2.db or a directory that contains it.", }, }, "required": ["keychain_root"], }, executor=lambda keychain_root: ios.parse_ios_keychain(keychain_root), module="mobile_ios", tags=["ios", "keychain", "credentials"], ) TOOL_CATALOG["read_idevice_info"] = ToolDefinition( name="read_idevice_info", description=( "Read the iDevice_info.txt summary at the root of an iOS extraction. " "Pass the file path or the extraction root directory." ), input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to iDevice_info.txt or extraction root."}, }, "required": ["file_path"], }, executor=lambda file_path: ios.read_idevice_info(file_path), module="mobile_ios", tags=["ios", "device", "metadata"], ) # ---- Android plugin (DESIGN.md §4.7) ---- TOOL_CATALOG["probe_android_partitions"] = ToolDefinition( name="probe_android_partitions", description=( "Survey every partition on an Android disk dump (mmls + per-" "partition fsstat). Returns a markdown table with name, native " "and 512-byte sector offsets, filesystem type, and a strategy " "hint per partition. Use this BEFORE deciding which partitions " "to dive into via set_active_partition + list_directory." ), input_schema={"type": "object", "properties": {}}, executor=lambda: android.probe_android_partitions(_img()), module="mobile_android", tags=["android", "partition", "survey"], ) async def _set_active_partition(partition_offset: int) -> str: src = getattr(graph, "active_source", None) if src is None: return "Error: no active evidence source." old = src.partition_offset new = int(partition_offset) src.partition_offset = new # Sync the legacy mirror field so older readers stay consistent. graph.partition_offset = new return ( f"Active partition offset: {old} → {new} (512-byte sectors). " f"Subsequent list_directory / extract_file / search_strings " f"calls now target this partition on {src.id} ({src.label})." ) TOOL_CATALOG["set_active_partition"] = ToolDefinition( name="set_active_partition", description=( "Switch the current partition offset (in 512-byte sectors) on " "the active disk-image source. Use the values from " "probe_android_partitions's '512-sector' column. NOT a " "forensic read — purely repoints the TSK toolset. Mutates " "shared state; call serially within one agent run." ), input_schema={ "type": "object", "properties": { "partition_offset": { "type": "integer", "description": "Partition start in 512-byte sectors.", }, }, "required": ["partition_offset"], }, executor=_set_active_partition, module="android", tags=["android", "partition", "navigation"], ) # ---- Media plugin (DESIGN.md §4.7) ---- TOOL_CATALOG["ocr_image"] = ToolDefinition( name="ocr_image", description=( "Extract text from an image via tesseract. The LLM backend has " "no vision, so this is the only way to read JPEG/PNG evidence " "(screenshots of chats, transactions, IDs). Default lang covers " "English + Simplified & Traditional Chinese; override `lang` " "if you know the artefact's language. Returns 'Error: OCR " "runtime not available' with an install hint when tesseract " "isn't on the host — record that absence as a negative " "finding rather than guessing." ), input_schema={ "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to image file."}, "lang": {"type": "string", "description": "Tesseract language code(s), e.g. 'eng' or 'eng+chi_sim'."}, }, "required": ["file_path"], }, executor=lambda file_path, lang="eng+chi_sim+chi_tra": med.ocr_image(file_path, lang), module="media", tags=["media", "ocr", "image"], ) # ---- Strategist-loop view tools (DESIGN_STRATEGIST.md §2) ---- # Pure read-only renders over graph state. The strategist agent uses # these to decide whether to keep investigating or to declare complete. # They go through invocation logging like every other tool (so the # strategist's reads are auditable) but are NOT cacheable — graph # state changes between calls and a stale snapshot would mislead. async def _exec_graph_overview() -> str: return strat.graph_overview(graph) TOOL_CATALOG["graph_overview"] = ToolDefinition( name="graph_overview", description=( "Top-level investigation state: hypotheses (with log-odds, " "confidence, edges_in, distinct_sources contributing, recent " "status flips), sources (phenomena/identity counts, last-touched " "round), and pending leads. Always call this first when deciding " "the next strategist action." ), input_schema={"type": "object", "properties": {}}, executor=_exec_graph_overview, module="strategy", tags=["strategy", "overview", "read-only"], ) async def _exec_source_coverage(source_id: str) -> str: return strat.source_coverage(graph, source_id) TOOL_CATALOG["source_coverage"] = ToolDefinition( name="source_coverage", description=( "Per-source artefact coverage report: which expected categories " "have been touched (✓) vs not (✗) on the given source. Coverage " "items are heuristic hints, not requirements — investigate ✗ " "items only when an active hypothesis depends on them." ), input_schema={ "type": "object", "properties": { "source_id": {"type": "string", "description": "Source id, e.g. 'src-ios-chan'."}, }, "required": ["source_id"], }, executor=_exec_source_coverage, module="strategy", tags=["strategy", "coverage", "read-only"], ) async def _exec_marginal_yield(last_n_rounds: int = 2) -> str: return strat.marginal_yield(graph, int(last_n_rounds)) TOOL_CATALOG["marginal_yield"] = ToolDefinition( name="marginal_yield", description=( "How much information the last N investigation rounds added: " "new phenomena, new edges, and hypothesis status flips per round. " "Two consecutive zero-yield rounds means diminishing returns are " "decisive — declare_investigation_complete with reason " "marginal_yield_zero." ), input_schema={ "type": "object", "properties": { "last_n_rounds": {"type": "integer", "description": "How many recent rounds to summarise (default 2)."}, }, }, executor=_exec_marginal_yield, module="strategy", tags=["strategy", "yield", "read-only"], ) async def _exec_budget_status() -> str: return strat.budget_status( graph, getattr(graph, "budgets", None), getattr(graph, "run_start_monotonic", None), ) TOOL_CATALOG["budget_status"] = ToolDefinition( name="budget_status", description=( "Budget vs caps: tool_calls, strategist_rounds, wall_clock_minutes. " "Includes pacing hints when usage crosses 70% / 90% thresholds. " "Use this to decide whether to keep proposing leads or to wind down." ), input_schema={"type": "object", "properties": {}}, executor=_exec_budget_status, module="strategy", tags=["strategy", "budget", "read-only"], ) # ---- Strategist decision actions (DESIGN_STRATEGIST.md §2.5) ---- # propose_lead is the strategist's tool for "go deeper here"; # declare_investigation_complete is its tool for "we're done". # Both are required to be in BaseAgent.mandatory_record_tools for the # strategist subclass so the agent can't return without taking a # documented decision. _ALLOWED_EVIDENCE_EDGE_TYPES = ( "direct_evidence", "supports", "contradicts", "weakens", "prerequisite_met", "consequence_observed", ) async def _exec_propose_lead( description: str, target_agent: str, motivating_hypothesis: str, expected_evidence_type: str, rationale: str = "", source_id: str = "", ) -> str: """Propose a new lead from the strategist. Idempotent on the (motivating_hypothesis, expected_evidence_type, target_agent, source_id) tuple within a single run. """ # Validate refs early so the strategist gets a fast, specific error. if motivating_hypothesis and motivating_hypothesis not in graph.hypotheses: return ( f"Error: motivating_hypothesis {motivating_hypothesis!r} is " f"not in graph.hypotheses. Call graph_overview to see the " f"current hypothesis ids." ) if expected_evidence_type not in _ALLOWED_EVIDENCE_EDGE_TYPES: return ( f"Error: expected_evidence_type {expected_evidence_type!r} is " f"not one of {list(_ALLOWED_EVIDENCE_EDGE_TYPES)}." ) if source_id: src_obj = graph.case.get_source(source_id) if graph.case else None if src_obj is None: return ( f"Error: source_id {source_id!r} is not in the case. " f"Valid ids: {[s.id for s in (graph.case.sources if graph.case else [])]}" ) lid = await graph.add_lead( target_agent=target_agent, description=description, proposed_by="strategist", motivating_hypothesis=motivating_hypothesis, expected_evidence_type=expected_evidence_type, round_number=graph.current_strategist_round, hypothesis_id=motivating_hypothesis or None, context={"source_id": source_id, "rationale": rationale} if source_id or rationale else {}, ) return ( f"Lead {lid} proposed: target_agent={target_agent}, " f"motivating_hypothesis={motivating_hypothesis}, " f"expected={expected_evidence_type}, source={source_id or '—'}." ) TOOL_CATALOG["propose_lead"] = ToolDefinition( name="propose_lead", description=( "Propose a specific investigation lead that will be dispatched " "after this strategist round. Each lead MUST name a motivating " "hypothesis it expects to move and the kind of edge it expects " "to produce. Do NOT propose a lead that just adds more same-" "direction evidence to an already-supported hypothesis — harmonic " "damping makes repeats cheap. DO propose leads when (a) a " "hypothesis is supported only by one source — get cross-source " "corroboration; (b) a hypothesis is in the active band — give it " "the deciding evidence; (c) a high-value artefact is uncovered on " "a source where an active hypothesis suggests it matters. " "Idempotent on (motivating_hypothesis, expected_evidence_type, " "target_agent, source_id) — re-proposing the same triple while " "pending is a no-op that returns the existing lead's id." ), input_schema={ "type": "object", "properties": { "description": { "type": "string", "description": "1-2 sentence specific investigation request, including target source/artefact.", }, "target_agent": { "type": "string", "enum": [ "filesystem", "registry", "communication", "network", "ios_artifact", "android_artifact", "media", "hypothesis", "timeline", ], "description": "Which worker agent should pick this up.", }, "source_id": { "type": "string", "description": "Which evidence source to investigate (e.g. 'src-ios-chan'). Optional for cross-source leads.", }, "motivating_hypothesis": { "type": "string", "description": "hyp-id this lead is meant to corroborate or refute.", }, "expected_evidence_type": { "type": "string", "enum": list(_ALLOWED_EVIDENCE_EDGE_TYPES), "description": "What kind of P→H edge you expect this lead to produce.", }, "rationale": { "type": "string", "description": "Why this fills a real gap — referenced in audit + worker prompt.", }, }, "required": [ "description", "target_agent", "motivating_hypothesis", "expected_evidence_type", ], }, executor=_exec_propose_lead, module="strategy", tags=["strategy", "lead", "decision"], ) _COMPLETE_REASONS = ( "marginal_yield_zero", "budget_exhausted", "all_hypotheses_resolved", "coverage_saturated", "other", ) async def _exec_declare_investigation_complete( reason: str, rationale: str = "", ) -> str: """Terminal strategist action: signal "we're done" to the orchestrator.""" if reason not in _COMPLETE_REASONS: return ( f"Error: reason {reason!r} not in " f"{list(_COMPLETE_REASONS)}." ) graph.strategist_complete_requested = True return ( f"Investigation marked complete in round " f"{graph.current_strategist_round}. reason={reason}. " f"rationale={rationale or '(none)'}. The orchestrator will exit " f"the strategist loop after this round." ) TOOL_CATALOG["declare_investigation_complete"] = ToolDefinition( name="declare_investigation_complete", description=( "Terminal strategist action. Call this when (a) marginal_yield " "shows zero across 2+ rounds, (b) budget is exhausted, (c) all " "active hypotheses are resolved, or (d) coverage is saturated " "with respect to the active hypotheses. After this call, the " "orchestrator finishes the strategist loop and proceeds to " "Phase 4 (timeline) and Phase 5 (report). The current round's " "in-flight work still completes." ), input_schema={ "type": "object", "properties": { "reason": { "type": "string", "enum": list(_COMPLETE_REASONS), "description": "Termination cause — picked from a closed set so the audit log is consistent.", }, "rationale": { "type": "string", "description": "Free-text justification — quoted into the InvestigationRound's decision_rationale.", }, }, "required": ["reason"], }, executor=_exec_declare_investigation_complete, module="strategy", tags=["strategy", "terminal", "decision"], ) # ---- Wrap every executor with invocation logging (+ cache + auto-record) ---- # Must run AFTER all tools are registered. Every tool call now produces # a ToolInvocation entry on the graph (provenance for grounding), and # returns the result prefixed with ``[invocation: inv-xxx]`` so the LLM # can cite the call in add_phenomenon facts. _tool_result_cache.clear() for tool_name, td in TOOL_CATALOG.items(): td.executor = _make_invocation_executor( tool_name, td.executor, graph, cacheable=(tool_name in CACHEABLE_TOOLS), auto_record_category=AUTO_RECORD_TOOLS.get(tool_name), )