230 lines
7.4 KiB
Python
230 lines
7.4 KiB
Python
"""Wrappers around The Sleuth Kit CLI tools for forensic disk image analysis."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache for srch_strings dump: keyed by image_path -> dump file path.
|
|
# srch_strings scans the entire image regardless of partition, so offset is irrelevant.
|
|
_strings_cache: dict[str, str] = {}
|
|
|
|
# Max output bytes to return to the LLM to avoid context overflow
|
|
MAX_OUTPUT = 8000
|
|
|
|
|
|
async def _run(cmd: list[str], max_output: int = MAX_OUTPUT) -> str:
|
|
"""Run a command asynchronously and return stdout."""
|
|
logger.debug("Running: %s", " ".join(cmd))
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
output = stdout.decode("utf-8", errors="replace")
|
|
|
|
if proc.returncode != 0:
|
|
err = stderr.decode("utf-8", errors="replace")
|
|
return f"[Command failed (rc={proc.returncode})]\n{err}\n{output}"
|
|
|
|
if len(output) > max_output:
|
|
truncated = output[:max_output]
|
|
return truncated + f"\n\n[Output truncated: {len(output)} bytes total, showing first {max_output}]"
|
|
return output
|
|
|
|
|
|
async def partition_info(image_path: str) -> str:
|
|
"""Get partition table layout using mmls."""
|
|
return await _run(["mmls", image_path])
|
|
|
|
|
|
async def filesystem_info(image_path: str, offset: int = 0) -> str:
|
|
"""Get filesystem details using fsstat."""
|
|
cmd = ["fsstat", "-o", str(offset), image_path]
|
|
return await _run(cmd)
|
|
|
|
|
|
async def list_directory(
|
|
image_path: str,
|
|
offset: int = 0,
|
|
inode: str | None = None,
|
|
recursive: bool = False,
|
|
) -> str:
|
|
"""List directory contents using fls."""
|
|
cmd = ["fls", "-o", str(offset)]
|
|
if recursive:
|
|
cmd.append("-r")
|
|
cmd.append(image_path)
|
|
if inode:
|
|
cmd.append(inode)
|
|
return await _run(cmd, max_output=16000)
|
|
|
|
|
|
async def extract_file(
|
|
image_path: str,
|
|
inode: str,
|
|
output_path: str,
|
|
offset: int = 0,
|
|
) -> str:
|
|
"""Extract a file from the image using icat.
|
|
|
|
Streams icat stdout directly to the output file to avoid loading
|
|
large files entirely into memory.
|
|
"""
|
|
import os
|
|
|
|
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
|
|
|
|
cmd = ["icat", "-o", str(offset), image_path, inode]
|
|
with open(output_path, "wb") as out_f:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=out_f,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
err = stderr.decode("utf-8", errors="replace")
|
|
# Clean up empty/partial file on failure
|
|
if os.path.exists(output_path):
|
|
os.unlink(output_path)
|
|
return f"[icat failed (rc={proc.returncode})]: {err}"
|
|
|
|
size = os.path.getsize(output_path)
|
|
return f"Extracted {size} bytes to {output_path}"
|
|
|
|
|
|
async def find_file(image_path: str, inode: str, offset: int = 0) -> str:
|
|
"""Find the filename for an inode using ffind."""
|
|
cmd = ["ffind", "-o", str(offset), image_path, inode]
|
|
return await _run(cmd)
|
|
|
|
|
|
async def _ensure_strings_dump(image_path: str) -> str:
|
|
"""Run srch_strings once and cache the output to a temp file.
|
|
|
|
Returns the path to the cached dump file. Subsequent calls with the
|
|
same image_path reuse the existing file. srch_strings scans the entire
|
|
raw image — partition offset is irrelevant.
|
|
"""
|
|
cached = _strings_cache.get(image_path)
|
|
if cached and os.path.exists(cached):
|
|
return cached
|
|
|
|
logger.info("Building strings dump for %s — this is a one-time cost", image_path)
|
|
import shlex
|
|
|
|
# Write srch_strings output directly to a temp file to avoid holding
|
|
# the entire dump in memory.
|
|
fd, dump_path = tempfile.mkstemp(prefix="strings_dump_", suffix=".txt")
|
|
os.close(fd)
|
|
|
|
# -a = scan entire file, -t d = print decimal byte offset of each string
|
|
cmd_str = (
|
|
f"srch_strings -a -t d {shlex.quote(image_path)} "
|
|
f"> {shlex.quote(dump_path)}"
|
|
)
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd_str,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
err = stderr.decode("utf-8", errors="replace")
|
|
logger.error("srch_strings failed (rc=%d): %s", proc.returncode, err)
|
|
# Fall back: don't cache, let search_strings do a direct pipe
|
|
os.unlink(dump_path)
|
|
return ""
|
|
|
|
size_mb = os.path.getsize(dump_path) / (1024 * 1024)
|
|
logger.info("Strings dump ready: %s (%.1f MB)", dump_path, size_mb)
|
|
_strings_cache[image_path] = dump_path
|
|
return dump_path
|
|
|
|
|
|
async def search_strings(
|
|
image_path: str,
|
|
pattern: str,
|
|
) -> str:
|
|
"""Search for string patterns in the image.
|
|
|
|
On first call, builds a strings dump (one-time full scan).
|
|
Subsequent calls grep the cached dump — orders of magnitude faster.
|
|
"""
|
|
import shlex
|
|
|
|
dump_path = await _ensure_strings_dump(image_path)
|
|
|
|
if dump_path:
|
|
# Fast path: grep the cached dump file
|
|
cmd_str = (
|
|
f"grep -i {shlex.quote(pattern)} {shlex.quote(dump_path)} | head -100"
|
|
)
|
|
else:
|
|
# Fallback: direct pipe (cache build failed)
|
|
cmd_str = (
|
|
f"srch_strings -a {shlex.quote(image_path)} "
|
|
f"| grep -i {shlex.quote(pattern)} | head -100"
|
|
)
|
|
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd_str,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
output = stdout.decode("utf-8", errors="replace")
|
|
if not output.strip():
|
|
return f"No strings matching '{pattern}' found."
|
|
return output[:16000]
|
|
|
|
|
|
async def count_deleted_files(image_path: str, offset: int = 0) -> str:
|
|
"""List and count deleted files using fls -rd. Returns total count and extension breakdown."""
|
|
cmd = ["fls", "-rd", "-o", str(offset), image_path]
|
|
output = await _run(cmd, max_output=64000)
|
|
|
|
lines = output.strip().splitlines()
|
|
ext_counts: dict[str, int] = {}
|
|
exe_files = []
|
|
total = 0
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
total += 1
|
|
# Extract filename from fls output like "r/r * 1234: filename.ext"
|
|
parts = line.split(":", 1)
|
|
if len(parts) > 1:
|
|
fname = parts[1].strip()
|
|
ext = fname.rsplit(".", 1)[-1].lower() if "." in fname else "(no ext)"
|
|
ext_counts[ext] = ext_counts.get(ext, 0) + 1
|
|
if ext in ("exe", "dll", "com", "bat", "cmd", "scr", "pif"):
|
|
exe_files.append(fname)
|
|
|
|
result = [f"=== Deleted Files Summary ===", f"Total deleted entries: {total}"]
|
|
result.append(f"\nExecutable files ({len(exe_files)}):")
|
|
for e in exe_files[:50]:
|
|
result.append(f" {e}")
|
|
if len(exe_files) > 50:
|
|
result.append(f" ... ({len(exe_files) - 50} more)")
|
|
|
|
result.append(f"\nExtension breakdown:")
|
|
for ext, count in sorted(ext_counts.items(), key=lambda x: -x[1])[:30]:
|
|
result.append(f" .{ext}: {count}")
|
|
|
|
return "\n".join(result)
|
|
|
|
|
|
async def build_timeline(image_path: str, offset: int = 0) -> str:
|
|
"""Build a MAC timeline using fls -m."""
|
|
cmd = ["fls", "-m", "/", "-o", str(offset), "-r", image_path]
|
|
return await _run(cmd, max_output=32000)
|