Initial commit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
BattleTag
2026-05-09 17:36:26 +08:00
commit 097d2ce472
25 changed files with 5944 additions and 0 deletions

0
tools/__init__.py Normal file
View File

234
tools/parsers.py Normal file
View File

@@ -0,0 +1,234 @@
"""Parsers for various forensic artifact formats."""
from __future__ import annotations
import asyncio
import logging
import os
import re
import struct
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
async def read_text_file(file_path: str, max_bytes: int = 8000) -> str:
"""Read a text file, with size limit."""
try:
with open(file_path, "r", errors="replace") as f:
content = f.read(max_bytes)
size = os.path.getsize(file_path)
if size > max_bytes:
content += f"\n\n[Truncated: file is {size} bytes, showing first {max_bytes}]"
return content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_binary_preview(file_path: str, max_bytes: int = 2000) -> str:
"""Read a binary file and show hex + ASCII preview."""
try:
with open(file_path, "rb") as f:
data = f.read(max_bytes)
lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f"{i:08x} {hex_part:<48} {ascii_part}")
size = os.path.getsize(file_path)
header = f"File: {file_path} ({size} bytes)\n"
return header + "\n".join(lines)
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_text_file_section(file_path: str, start: int = 0, max_bytes: int = 8000) -> str:
"""Read a section of a text file starting at byte offset `start`."""
try:
size = os.path.getsize(file_path)
with open(file_path, "r", errors="replace") as f:
if start > 0:
f.seek(start)
content = f.read(max_bytes)
remaining = size - start - len(content.encode("utf-8", errors="replace"))
header = f"[File: {file_path}, {size} bytes, showing offset {start}{start + len(content.encode('utf-8', errors='replace'))}]"
if remaining > 0:
content += f"\n\n[{remaining} bytes remaining after this section]"
return header + "\n" + content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def search_text_file(file_path: str, pattern: str, max_matches: int = 50) -> str:
"""Search for a pattern in an extracted text file. Returns matching lines with line numbers."""
try:
size = os.path.getsize(file_path)
matches = []
try:
compiled = re.compile(pattern, re.IGNORECASE)
except re.error:
compiled = re.compile(re.escape(pattern), re.IGNORECASE)
with open(file_path, "r", errors="replace") as f:
for lineno, line in enumerate(f, 1):
if compiled.search(line):
matches.append(f" {lineno}: {line.rstrip()[:200]}")
if len(matches) >= max_matches:
matches.append(f" [Truncated: more than {max_matches} matches]")
break
header = f"Search '{pattern}' in {file_path} ({size} bytes): {len(matches)} matches"
if not matches:
return header + "\n (no matches)"
return header + "\n" + "\n".join(matches)
except Exception as e:
return f"[Error searching {file_path}: {e}]"
async def parse_pcap_strings(file_path: str) -> str:
"""Extract HTTP headers and other readable strings from a PCAP/capture file.
Uses the `strings` command to find printable text, then filters for
forensically relevant patterns (HTTP headers, URLs, credentials).
"""
try:
proc = await asyncio.create_subprocess_exec(
"srch_strings", "-a", "-n", "8", file_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
all_strings = stdout.decode("utf-8", errors="replace").splitlines()
hosts = set()
user_agents = set()
urls = []
cookies = []
http_methods = []
other_interesting = []
for line in all_strings:
stripped = line.strip()
if stripped.startswith("Host: "):
hosts.add(stripped[6:])
elif stripped.startswith("User-Agent: "):
user_agents.add(stripped[12:])
elif stripped.startswith("Cookie: "):
cookies.append(stripped[:200])
elif re.match(r"^(GET|POST|PUT|DELETE|HEAD) /", stripped):
urls.append(stripped[:200])
elif stripped.startswith("HTTP/"):
http_methods.append(stripped[:200])
elif any(kw in stripped.lower() for kw in ("password", "login", "username", "email", "set-cookie")):
other_interesting.append(stripped[:200])
size = os.path.getsize(file_path)
lines = [f"=== PCAP String Analysis: {file_path} ({size} bytes) ==="]
lines.append(f"Total printable strings (>=8 chars): {len(all_strings)}")
lines.append(f"\nUnique Hosts ({len(hosts)}):")
for h in sorted(hosts):
lines.append(f" {h}")
lines.append(f"\nUser-Agent strings ({len(user_agents)}):")
for ua in sorted(user_agents):
lines.append(f" {ua}")
lines.append(f"\nHTTP Requests ({len(urls)}):")
for u in urls[:30]:
lines.append(f" {u}")
if len(urls) > 30:
lines.append(f" ... ({len(urls) - 30} more)")
lines.append(f"\nHTTP Responses ({len(http_methods)}):")
for m in http_methods[:20]:
lines.append(f" {m}")
if cookies:
lines.append(f"\nCookies ({len(cookies)}):")
for c in cookies[:20]:
lines.append(f" {c}")
if other_interesting:
lines.append(f"\nOther interesting strings ({len(other_interesting)}):")
for o in other_interesting[:30]:
lines.append(f" {o}")
return "\n".join(lines)
except Exception as e:
return f"[Error parsing PCAP strings: {e}]"
async def parse_prefetch(file_path: str) -> str:
"""Parse a Windows XP Prefetch (.pf) file to extract execution info.
Returns: executable name, last execution time, and run count.
"""
try:
with open(file_path, "rb") as f:
data = f.read()
if len(data) < 0x94:
return f"[Error: file too small for Prefetch format ({len(data)} bytes)]"
version = struct.unpack_from("<I", data, 0)[0]
sig = data[4:8]
if sig != b"SCCA":
return f"[Error: not a Prefetch file — signature is {sig!r}, expected b'SCCA']"
# Filename: null-terminated UTF-16LE at offset 0x10
raw_name = data[0x10:0x4C]
name_end = raw_name.find(b"\x00\x00")
if name_end > 0:
if name_end % 2 == 1:
name_end += 1
filename = raw_name[:name_end].decode("utf-16-le")
else:
filename = raw_name.decode("utf-16-le", errors="replace").rstrip("\x00")
# Last execution time: FILETIME at offset 0x78 (Windows XP, version 17)
ft = struct.unpack_from("<Q", data, 0x78)[0]
if ft > 0:
epoch = datetime(1601, 1, 1, tzinfo=timezone.utc)
last_run = epoch + timedelta(microseconds=ft // 10)
last_run_str = last_run.strftime("%Y-%m-%d %H:%M:%S UTC")
else:
last_run_str = "(not available)"
# Run count at offset 0x90
run_count = struct.unpack_from("<I", data, 0x90)[0]
lines = [
f"=== Prefetch Analysis: {file_path} ===",
f"Prefetch Version: {version}",
f"Executable: {filename}",
f"Last Execution: {last_run_str}",
f"Run Count: {run_count}",
f"File Size: {len(data)} bytes",
]
return "\n".join(lines)
except Exception as e:
return f"[Error parsing Prefetch: {e}]"
async def list_extracted_dir(dir_path: str) -> str:
"""List files in an extracted directory."""
try:
entries = []
for root, dirs, files in os.walk(dir_path):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, dir_path)
size = os.path.getsize(full)
entries.append(f" {rel} ({size} bytes)")
if len(entries) > 200:
entries.append(f" ... (truncated)")
break
return f"Directory: {dir_path}\nFiles ({len(entries)}):\n" + "\n".join(entries)
except Exception as e:
return f"[Error listing {dir_path}: {e}]"

449
tools/registry.py Normal file
View File

@@ -0,0 +1,449 @@
"""Windows registry parsing tools."""
from __future__ import annotations
import logging
import struct
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
# Suppress noisy regipy warnings (hive-type identification + binary encoding fallbacks)
logging.getLogger("regipy.registry").setLevel(logging.WARNING)
logging.getLogger("regipy.utils").setLevel(logging.ERROR)
async def parse_registry_key(hive_path: str, key_path: str = "") -> str:
"""Parse a registry hive and list subkeys/values at the given path.
Uses regipy for pure-Python registry parsing.
"""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed. Run: uv add regipy]"
try:
reg = RegistryHive(hive_path)
if key_path:
key = reg.get_key(key_path)
else:
key = reg.root_key()
lines = [f"Key: {key.path}", f"Timestamp: {key.header.last_modified}", ""]
# Subkeys
subkeys = list(key.iter_subkeys())
if subkeys:
lines.append(f"Subkeys ({len(subkeys)}):")
for sk in subkeys[:50]:
lines.append(f" {sk.name}")
if len(subkeys) > 50:
lines.append(f" ... ({len(subkeys) - 50} more)")
lines.append("")
# Values
values = list(key.iter_values())
if values:
lines.append(f"Values ({len(values)}):")
for v in values[:30]:
val_data = str(v.value)
if len(val_data) > 200:
val_data = val_data[:200] + "..."
lines.append(f" {v.name} ({v.value_type}) = {val_data}")
return "\n".join(lines)
except Exception as e:
return f"[Error parsing registry: {e}]"
async def list_installed_software(hive_path: str) -> str:
"""List installed software from a SOFTWARE registry hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(hive_path)
uninstall_path = "\\Microsoft\\Windows\\CurrentVersion\\Uninstall"
key = reg.get_key(uninstall_path)
programs = []
for sk in key.iter_subkeys():
name = sk.name
display_name = None
for v in sk.iter_values():
if v.name == "DisplayName":
display_name = v.value
break
programs.append(display_name or name)
lines = [f"Installed Software ({len(programs)} entries):", ""]
for p in sorted(programs):
lines.append(f" - {p}")
return "\n".join(lines)
except Exception as e:
return f"[Error listing software: {e}]"
async def get_user_activity(hive_path: str) -> str:
"""Extract user activity indicators from NTUSER.DAT."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(hive_path)
lines = ["=== User Activity from NTUSER.DAT ===", ""]
# Recent documents
try:
key = reg.get_key("\\Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\RecentDocs")
lines.append("Recent Documents:")
for v in key.iter_values():
if v.name != "MRUListEx":
lines.append(f" {v.name}")
lines.append("")
except Exception:
lines.append("Recent Documents: [not found]")
# Run MRU (commands typed in Run dialog)
try:
key = reg.get_key("\\Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\RunMRU")
lines.append("Run Dialog MRU:")
for v in key.iter_values():
if v.name not in ("MRUList",):
lines.append(f" {v.name}: {v.value}")
lines.append("")
except Exception:
lines.append("Run MRU: [not found]")
# Typed URLs
try:
key = reg.get_key("\\Software\\Microsoft\\Internet Explorer\\TypedURLs")
lines.append("Typed URLs:")
for v in key.iter_values():
lines.append(f" {v.value}")
lines.append("")
except Exception:
lines.append("Typed URLs: [not found]")
return "\n".join(lines)
except Exception as e:
return f"[Error analyzing user activity: {e}]"
def _filetime_to_datetime(ft: int) -> str:
"""Convert a Windows FILETIME (100-nanosecond intervals since 1601-01-01) to ISO string."""
if ft <= 0:
return "(not set)"
try:
epoch = datetime(1601, 1, 1, tzinfo=timezone.utc)
dt = epoch + timedelta(microseconds=ft // 10)
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
except (ValueError, OverflowError):
return f"(invalid FILETIME: {ft})"
async def get_system_info(software_hive_path: str) -> str:
"""Extract OS version, install date, registered owner from SOFTWARE hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(software_hive_path)
key = reg.get_key("\\Microsoft\\Windows NT\\CurrentVersion")
data = {}
for v in key.iter_values():
data[v.name] = v.value
lines = ["=== System Information (SOFTWARE hive) ==="]
lines.append(f"Product Name: {data.get('ProductName', 'N/A')}")
lines.append(f"Current Version: {data.get('CurrentVersion', 'N/A')}")
lines.append(f"Build Number: {data.get('CurrentBuildNumber', 'N/A')}")
lines.append(f"CSD Version (Service Pack): {data.get('CSDVersion', 'None')}")
lines.append(f"Registered Owner: {data.get('RegisteredOwner', 'N/A')}")
lines.append(f"Registered Organization: {data.get('RegisteredOrganization', 'N/A')}")
lines.append(f"Product ID: {data.get('ProductId', 'N/A')}")
lines.append(f"System Root: {data.get('SystemRoot', 'N/A')}")
install_epoch = data.get("InstallDate")
if install_epoch and isinstance(install_epoch, int):
install_dt = datetime.fromtimestamp(install_epoch, tz=timezone.utc)
lines.append(f"Install Date: {install_dt.strftime('%Y-%m-%d %H:%M:%S UTC')} (epoch: {install_epoch})")
else:
lines.append(f"Install Date: {install_epoch}")
return "\n".join(lines)
except Exception as e:
return f"[Error: {e}]"
async def get_timezone_info(system_hive_path: str) -> str:
"""Extract timezone settings from SYSTEM hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(system_hive_path)
key = reg.get_key("\\ControlSet001\\Control\\TimeZoneInformation")
data = {}
for v in key.iter_values():
data[v.name] = v.value
lines = ["=== Timezone Information (SYSTEM hive) ==="]
lines.append(f"Standard Name: {data.get('StandardName', 'N/A')}")
lines.append(f"Daylight Name: {data.get('DaylightName', 'N/A')}")
bias = data.get("Bias", "N/A")
if isinstance(bias, int):
hours = bias // 60
lines.append(f"Bias: {bias} minutes (UTC{-hours:+d}:00)")
else:
lines.append(f"Bias: {bias}")
lines.append(f"Active Time Bias: {data.get('ActiveTimeBias', 'N/A')}")
return "\n".join(lines)
except Exception as e:
return f"[Error: {e}]"
async def get_computer_name(system_hive_path: str) -> str:
"""Extract computer name from SYSTEM hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(system_hive_path)
lines = ["=== Computer Name (SYSTEM hive) ==="]
for path_label, path in [
("ComputerName", "\\ControlSet001\\Control\\ComputerName\\ComputerName"),
("ActiveComputerName", "\\ControlSet001\\Control\\ComputerName\\ActiveComputerName"),
]:
try:
key = reg.get_key(path)
for v in key.iter_values():
if v.name == "ComputerName":
lines.append(f"{path_label}: {v.value}")
except Exception:
pass
# Also try Tcpip hostname
try:
key = reg.get_key("\\ControlSet001\\Services\\Tcpip\\Parameters")
for v in key.iter_values():
if v.name in ("Hostname", "Domain", "NV Hostname"):
lines.append(f"TCP/IP {v.name}: {v.value}")
except Exception:
pass
return "\n".join(lines) if len(lines) > 1 else "Computer name not found in SYSTEM hive."
except Exception as e:
return f"[Error: {e}]"
async def get_shutdown_time(system_hive_path: str) -> str:
"""Extract last shutdown time from SYSTEM hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(system_hive_path)
lines = ["=== Shutdown Time (SYSTEM hive) ==="]
try:
key = reg.get_key("\\ControlSet001\\Control\\Windows")
for v in key.iter_values():
if v.name == "ShutdownTime":
raw = v.value
if isinstance(raw, bytes) and len(raw) >= 8:
ft = struct.unpack("<Q", raw[:8])[0]
lines.append(f"Last Shutdown: {_filetime_to_datetime(ft)}")
elif isinstance(raw, int):
lines.append(f"Last Shutdown: {_filetime_to_datetime(raw)}")
elif isinstance(raw, str):
# regipy may return hex-encoded string for REG_BINARY
try:
raw_bytes = bytes.fromhex(raw)
ft = struct.unpack("<Q", raw_bytes[:8])[0]
lines.append(f"Last Shutdown: {_filetime_to_datetime(ft)}")
except (ValueError, struct.error):
lines.append(f"ShutdownTime (raw): {raw!r}")
else:
lines.append(f"ShutdownTime (raw): {raw!r}")
except Exception:
lines.append("ShutdownTime value not found at ControlSet001\\Control\\Windows")
# Also show all values from the Windows key for context
try:
key = reg.get_key("\\ControlSet001\\Control\\Windows")
lines.append("\nAll values at ControlSet001\\Control\\Windows:")
for v in key.iter_values():
lines.append(f" {v.name} = {v.value}")
except Exception:
pass
return "\n".join(lines)
except Exception as e:
return f"[Error: {e}]"
async def enumerate_users(sam_hive_path: str) -> str:
"""Enumerate all user accounts from SAM hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(sam_hive_path)
key = reg.get_key("\\SAM\\Domains\\Account\\Users\\Names")
accounts = []
for sk in key.iter_subkeys():
accounts.append(sk.name)
lines = [f"=== User Accounts (SAM hive) — {len(accounts)} total ==="]
for acct in accounts:
lines.append(f" - {acct}")
# Try to get RIDs from the Users key
try:
users_key = reg.get_key("\\SAM\\Domains\\Account\\Users")
rid_entries = []
for sk in users_key.iter_subkeys():
if sk.name != "Names" and sk.name.startswith("0"):
rid = int(sk.name, 16)
rid_entries.append(f" RID {rid} (0x{sk.name})")
if rid_entries:
lines.append("\nUser RIDs:")
lines.extend(rid_entries)
except Exception:
pass
return "\n".join(lines)
except Exception as e:
return f"[Error: {e}]"
async def get_network_interfaces(system_hive_path: str) -> str:
"""Extract network adapter and TCP/IP configuration from SYSTEM hive."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(system_hive_path)
lines = ["=== Network Interfaces (SYSTEM hive) ==="]
# Try TCP/IP interfaces
try:
key = reg.get_key("\\ControlSet001\\Services\\Tcpip\\Parameters\\Interfaces")
for sk in key.iter_subkeys():
lines.append(f"\nInterface: {sk.name}")
for v in sk.iter_values():
if v.name in (
"IPAddress", "SubnetMask", "DefaultGateway",
"DhcpIPAddress", "DhcpSubnetMask", "DhcpDefaultGateway",
"DhcpServer", "NameServer", "Domain", "EnableDHCP",
):
lines.append(f" {v.name} = {v.value}")
except Exception as e:
lines.append(f"TCP/IP Interfaces: {e}")
# Try network adapter class
adapter_class = "\\ControlSet001\\Control\\Class\\{4D36E972-E325-11CE-BFC1-08002bE10318}"
try:
key = reg.get_key(adapter_class)
lines.append("\nNetwork Adapters:")
for sk in key.iter_subkeys():
if sk.name == "Properties":
continue
desc = None
for v in sk.iter_values():
if v.name == "DriverDesc":
desc = v.value
if desc:
lines.append(f" [{sk.name}] {desc}")
except Exception as e:
lines.append(f"Network Adapters: {e}")
# Try NetworkCards
try:
key = reg.get_key("\\ControlSet001\\Control\\NetworkCards")
for sk in key.iter_subkeys():
for v in sk.iter_values():
if v.name == "Description":
lines.append(f" NetworkCard {sk.name}: {v.value}")
except Exception:
pass
return "\n".join(lines) if len(lines) > 1 else "No network interface data found in SYSTEM hive."
except Exception as e:
return f"[Error: {e}]"
async def get_email_config(ntuser_hive_path: str) -> str:
"""Extract email account configuration (SMTP, POP3, NNTP) from NTUSER.DAT."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(ntuser_hive_path)
lines = ["=== Email Account Configuration (NTUSER.DAT) ==="]
try:
key = reg.get_key("\\Software\\Microsoft\\Internet Account Manager\\Accounts")
for sk in key.iter_subkeys():
lines.append(f"\n--- Account: {sk.name} ---")
for v in sk.iter_values():
# Skip binary password hash fields (but keep "Prompt for Password" flags)
if "Password" in v.name and "Prompt" not in v.name:
lines.append(f" {v.name} = [present, redacted]")
else:
lines.append(f" {v.name} = {v.value}")
except Exception as e:
lines.append(f"Internet Account Manager: {e}")
return "\n".join(lines)
except Exception as e:
return f"[Error: {e}]"
async def search_registry(hive_path: str, pattern: str) -> str:
"""Search for a pattern in registry key names and values."""
try:
from regipy.registry import RegistryHive
except ImportError:
return "[Error: regipy not installed]"
try:
reg = RegistryHive(hive_path)
pattern_lower = pattern.lower()
matches = []
for entry in reg.recurse_subkeys(as_json=True):
path = entry.path or ""
if pattern_lower in path.lower():
matches.append(f"KEY: {path}")
if hasattr(entry, "values") and entry.values:
for v in entry.values:
name = v.get("name", "")
value = str(v.get("value", ""))
if pattern_lower in name.lower() or pattern_lower in value.lower():
matches.append(f" {path}\\{name} = {value[:200]}")
if len(matches) >= 50:
matches.append(f"[Truncated: more than 50 matches for '{pattern}']")
break
if not matches:
return f"No registry entries matching '{pattern}' found."
return "\n".join(matches)
except Exception as e:
return f"[Error searching registry: {e}]"

229
tools/sleuthkit.py Normal file
View File

@@ -0,0 +1,229 @@
"""Wrappers around The Sleuth Kit CLI tools for forensic disk image analysis."""
from __future__ import annotations
import asyncio
import logging
import os
import tempfile
logger = logging.getLogger(__name__)
# Cache for srch_strings dump: keyed by image_path -> dump file path.
# srch_strings scans the entire image regardless of partition, so offset is irrelevant.
_strings_cache: dict[str, str] = {}
# Max output bytes to return to the LLM to avoid context overflow
MAX_OUTPUT = 8000
async def _run(cmd: list[str], max_output: int = MAX_OUTPUT) -> str:
"""Run a command asynchronously and return stdout."""
logger.debug("Running: %s", " ".join(cmd))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
output = stdout.decode("utf-8", errors="replace")
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
return f"[Command failed (rc={proc.returncode})]\n{err}\n{output}"
if len(output) > max_output:
truncated = output[:max_output]
return truncated + f"\n\n[Output truncated: {len(output)} bytes total, showing first {max_output}]"
return output
async def partition_info(image_path: str) -> str:
"""Get partition table layout using mmls."""
return await _run(["mmls", image_path])
async def filesystem_info(image_path: str, offset: int = 0) -> str:
"""Get filesystem details using fsstat."""
cmd = ["fsstat", "-o", str(offset), image_path]
return await _run(cmd)
async def list_directory(
image_path: str,
offset: int = 0,
inode: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents using fls."""
cmd = ["fls", "-o", str(offset)]
if recursive:
cmd.append("-r")
cmd.append(image_path)
if inode:
cmd.append(inode)
return await _run(cmd, max_output=16000)
async def extract_file(
image_path: str,
inode: str,
output_path: str,
offset: int = 0,
) -> str:
"""Extract a file from the image using icat.
Streams icat stdout directly to the output file to avoid loading
large files entirely into memory.
"""
import os
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
cmd = ["icat", "-o", str(offset), image_path, inode]
with open(output_path, "wb") as out_f:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=out_f,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
# Clean up empty/partial file on failure
if os.path.exists(output_path):
os.unlink(output_path)
return f"[icat failed (rc={proc.returncode})]: {err}"
size = os.path.getsize(output_path)
return f"Extracted {size} bytes to {output_path}"
async def find_file(image_path: str, inode: str, offset: int = 0) -> str:
"""Find the filename for an inode using ffind."""
cmd = ["ffind", "-o", str(offset), image_path, inode]
return await _run(cmd)
async def _ensure_strings_dump(image_path: str) -> str:
"""Run srch_strings once and cache the output to a temp file.
Returns the path to the cached dump file. Subsequent calls with the
same image_path reuse the existing file. srch_strings scans the entire
raw image — partition offset is irrelevant.
"""
cached = _strings_cache.get(image_path)
if cached and os.path.exists(cached):
return cached
logger.info("Building strings dump for %s — this is a one-time cost", image_path)
import shlex
# Write srch_strings output directly to a temp file to avoid holding
# the entire dump in memory.
fd, dump_path = tempfile.mkstemp(prefix="strings_dump_", suffix=".txt")
os.close(fd)
# -a = scan entire file, -t d = print decimal byte offset of each string
cmd_str = (
f"srch_strings -a -t d {shlex.quote(image_path)} "
f"> {shlex.quote(dump_path)}"
)
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
logger.error("srch_strings failed (rc=%d): %s", proc.returncode, err)
# Fall back: don't cache, let search_strings do a direct pipe
os.unlink(dump_path)
return ""
size_mb = os.path.getsize(dump_path) / (1024 * 1024)
logger.info("Strings dump ready: %s (%.1f MB)", dump_path, size_mb)
_strings_cache[image_path] = dump_path
return dump_path
async def search_strings(
image_path: str,
pattern: str,
) -> str:
"""Search for string patterns in the image.
On first call, builds a strings dump (one-time full scan).
Subsequent calls grep the cached dump — orders of magnitude faster.
"""
import shlex
dump_path = await _ensure_strings_dump(image_path)
if dump_path:
# Fast path: grep the cached dump file
cmd_str = (
f"grep -i {shlex.quote(pattern)} {shlex.quote(dump_path)} | head -100"
)
else:
# Fallback: direct pipe (cache build failed)
cmd_str = (
f"srch_strings -a {shlex.quote(image_path)} "
f"| grep -i {shlex.quote(pattern)} | head -100"
)
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
output = stdout.decode("utf-8", errors="replace")
if not output.strip():
return f"No strings matching '{pattern}' found."
return output[:16000]
async def count_deleted_files(image_path: str, offset: int = 0) -> str:
"""List and count deleted files using fls -rd. Returns total count and extension breakdown."""
cmd = ["fls", "-rd", "-o", str(offset), image_path]
output = await _run(cmd, max_output=64000)
lines = output.strip().splitlines()
ext_counts: dict[str, int] = {}
exe_files = []
total = 0
for line in lines:
if not line.strip():
continue
total += 1
# Extract filename from fls output like "r/r * 1234: filename.ext"
parts = line.split(":", 1)
if len(parts) > 1:
fname = parts[1].strip()
ext = fname.rsplit(".", 1)[-1].lower() if "." in fname else "(no ext)"
ext_counts[ext] = ext_counts.get(ext, 0) + 1
if ext in ("exe", "dll", "com", "bat", "cmd", "scr", "pif"):
exe_files.append(fname)
result = [f"=== Deleted Files Summary ===", f"Total deleted entries: {total}"]
result.append(f"\nExecutable files ({len(exe_files)}):")
for e in exe_files[:50]:
result.append(f" {e}")
if len(exe_files) > 50:
result.append(f" ... ({len(exe_files) - 50} more)")
result.append(f"\nExtension breakdown:")
for ext, count in sorted(ext_counts.items(), key=lambda x: -x[1])[:30]:
result.append(f" .{ext}: {count}")
return "\n".join(result)
async def build_timeline(image_path: str, offset: int = 0) -> str:
"""Build a MAC timeline using fls -m."""
cmd = ["fls", "-m", "/", "-o", str(offset), "-r", image_path]
return await _run(cmd, max_output=32000)