Files
MASForensic/tools/sleuthkit.py
BattleTag 097d2ce472 Initial commit
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 17:36:26 +08:00

230 lines
7.4 KiB
Python

"""Wrappers around The Sleuth Kit CLI tools for forensic disk image analysis."""
from __future__ import annotations
import asyncio
import logging
import os
import tempfile
logger = logging.getLogger(__name__)
# Cache for srch_strings dump: keyed by image_path -> dump file path.
# srch_strings scans the entire image regardless of partition, so offset is irrelevant.
_strings_cache: dict[str, str] = {}
# Max output bytes to return to the LLM to avoid context overflow
MAX_OUTPUT = 8000
async def _run(cmd: list[str], max_output: int = MAX_OUTPUT) -> str:
"""Run a command asynchronously and return stdout."""
logger.debug("Running: %s", " ".join(cmd))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
output = stdout.decode("utf-8", errors="replace")
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
return f"[Command failed (rc={proc.returncode})]\n{err}\n{output}"
if len(output) > max_output:
truncated = output[:max_output]
return truncated + f"\n\n[Output truncated: {len(output)} bytes total, showing first {max_output}]"
return output
async def partition_info(image_path: str) -> str:
"""Get partition table layout using mmls."""
return await _run(["mmls", image_path])
async def filesystem_info(image_path: str, offset: int = 0) -> str:
"""Get filesystem details using fsstat."""
cmd = ["fsstat", "-o", str(offset), image_path]
return await _run(cmd)
async def list_directory(
image_path: str,
offset: int = 0,
inode: str | None = None,
recursive: bool = False,
) -> str:
"""List directory contents using fls."""
cmd = ["fls", "-o", str(offset)]
if recursive:
cmd.append("-r")
cmd.append(image_path)
if inode:
cmd.append(inode)
return await _run(cmd, max_output=16000)
async def extract_file(
image_path: str,
inode: str,
output_path: str,
offset: int = 0,
) -> str:
"""Extract a file from the image using icat.
Streams icat stdout directly to the output file to avoid loading
large files entirely into memory.
"""
import os
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
cmd = ["icat", "-o", str(offset), image_path, inode]
with open(output_path, "wb") as out_f:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=out_f,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
# Clean up empty/partial file on failure
if os.path.exists(output_path):
os.unlink(output_path)
return f"[icat failed (rc={proc.returncode})]: {err}"
size = os.path.getsize(output_path)
return f"Extracted {size} bytes to {output_path}"
async def find_file(image_path: str, inode: str, offset: int = 0) -> str:
"""Find the filename for an inode using ffind."""
cmd = ["ffind", "-o", str(offset), image_path, inode]
return await _run(cmd)
async def _ensure_strings_dump(image_path: str) -> str:
"""Run srch_strings once and cache the output to a temp file.
Returns the path to the cached dump file. Subsequent calls with the
same image_path reuse the existing file. srch_strings scans the entire
raw image — partition offset is irrelevant.
"""
cached = _strings_cache.get(image_path)
if cached and os.path.exists(cached):
return cached
logger.info("Building strings dump for %s — this is a one-time cost", image_path)
import shlex
# Write srch_strings output directly to a temp file to avoid holding
# the entire dump in memory.
fd, dump_path = tempfile.mkstemp(prefix="strings_dump_", suffix=".txt")
os.close(fd)
# -a = scan entire file, -t d = print decimal byte offset of each string
cmd_str = (
f"srch_strings -a -t d {shlex.quote(image_path)} "
f"> {shlex.quote(dump_path)}"
)
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="replace")
logger.error("srch_strings failed (rc=%d): %s", proc.returncode, err)
# Fall back: don't cache, let search_strings do a direct pipe
os.unlink(dump_path)
return ""
size_mb = os.path.getsize(dump_path) / (1024 * 1024)
logger.info("Strings dump ready: %s (%.1f MB)", dump_path, size_mb)
_strings_cache[image_path] = dump_path
return dump_path
async def search_strings(
image_path: str,
pattern: str,
) -> str:
"""Search for string patterns in the image.
On first call, builds a strings dump (one-time full scan).
Subsequent calls grep the cached dump — orders of magnitude faster.
"""
import shlex
dump_path = await _ensure_strings_dump(image_path)
if dump_path:
# Fast path: grep the cached dump file
cmd_str = (
f"grep -i {shlex.quote(pattern)} {shlex.quote(dump_path)} | head -100"
)
else:
# Fallback: direct pipe (cache build failed)
cmd_str = (
f"srch_strings -a {shlex.quote(image_path)} "
f"| grep -i {shlex.quote(pattern)} | head -100"
)
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
output = stdout.decode("utf-8", errors="replace")
if not output.strip():
return f"No strings matching '{pattern}' found."
return output[:16000]
async def count_deleted_files(image_path: str, offset: int = 0) -> str:
"""List and count deleted files using fls -rd. Returns total count and extension breakdown."""
cmd = ["fls", "-rd", "-o", str(offset), image_path]
output = await _run(cmd, max_output=64000)
lines = output.strip().splitlines()
ext_counts: dict[str, int] = {}
exe_files = []
total = 0
for line in lines:
if not line.strip():
continue
total += 1
# Extract filename from fls output like "r/r * 1234: filename.ext"
parts = line.split(":", 1)
if len(parts) > 1:
fname = parts[1].strip()
ext = fname.rsplit(".", 1)[-1].lower() if "." in fname else "(no ext)"
ext_counts[ext] = ext_counts.get(ext, 0) + 1
if ext in ("exe", "dll", "com", "bat", "cmd", "scr", "pif"):
exe_files.append(fname)
result = [f"=== Deleted Files Summary ===", f"Total deleted entries: {total}"]
result.append(f"\nExecutable files ({len(exe_files)}):")
for e in exe_files[:50]:
result.append(f" {e}")
if len(exe_files) > 50:
result.append(f" ... ({len(exe_files) - 50} more)")
result.append(f"\nExtension breakdown:")
for ext, count in sorted(ext_counts.items(), key=lambda x: -x[1])[:30]:
result.append(f" .{ext}: {count}")
return "\n".join(result)
async def build_timeline(image_path: str, offset: int = 0) -> str:
"""Build a MAC timeline using fls -m."""
cmd = ["fls", "-m", "/", "-o", str(offset), "-r", image_path]
return await _run(cmd, max_output=32000)