"""Wrappers around The Sleuth Kit CLI tools for forensic disk image analysis.""" from __future__ import annotations import asyncio import logging import os import tempfile logger = logging.getLogger(__name__) # Cache for srch_strings dump: keyed by image_path -> dump file path. # srch_strings scans the entire image regardless of partition, so offset is irrelevant. _strings_cache: dict[str, str] = {} # Max output bytes to return to the LLM to avoid context overflow MAX_OUTPUT = 8000 async def _run(cmd: list[str], max_output: int = MAX_OUTPUT) -> str: """Run a command asynchronously and return stdout.""" logger.debug("Running: %s", " ".join(cmd)) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() output = stdout.decode("utf-8", errors="replace") if proc.returncode != 0: err = stderr.decode("utf-8", errors="replace") return f"[Command failed (rc={proc.returncode})]\n{err}\n{output}" if len(output) > max_output: truncated = output[:max_output] return truncated + f"\n\n[Output truncated: {len(output)} bytes total, showing first {max_output}]" return output async def partition_info(image_path: str) -> str: """Get partition table layout using mmls.""" return await _run(["mmls", image_path]) async def filesystem_info(image_path: str, offset: int = 0) -> str: """Get filesystem details using fsstat.""" cmd = ["fsstat", "-o", str(offset), image_path] return await _run(cmd) async def list_directory( image_path: str, offset: int = 0, inode: str | None = None, recursive: bool = False, ) -> str: """List directory contents using fls.""" cmd = ["fls", "-o", str(offset)] if recursive: cmd.append("-r") cmd.append(image_path) if inode: cmd.append(inode) return await _run(cmd, max_output=16000) async def extract_file( image_path: str, inode: str, output_path: str, offset: int = 0, ) -> str: """Extract a file from the image using icat. Streams icat stdout directly to the output file to avoid loading large files entirely into memory. """ import os os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) cmd = ["icat", "-o", str(offset), image_path, inode] with open(output_path, "wb") as out_f: proc = await asyncio.create_subprocess_exec( *cmd, stdout=out_f, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: err = stderr.decode("utf-8", errors="replace") # Clean up empty/partial file on failure if os.path.exists(output_path): os.unlink(output_path) return f"[icat failed (rc={proc.returncode})]: {err}" size = os.path.getsize(output_path) return f"Extracted {size} bytes to {output_path}" async def find_file(image_path: str, inode: str, offset: int = 0) -> str: """Find the filename for an inode using ffind.""" cmd = ["ffind", "-o", str(offset), image_path, inode] return await _run(cmd) async def _ensure_strings_dump(image_path: str) -> str: """Run srch_strings once and cache the output to a temp file. Returns the path to the cached dump file. Subsequent calls with the same image_path reuse the existing file. srch_strings scans the entire raw image — partition offset is irrelevant. """ cached = _strings_cache.get(image_path) if cached and os.path.exists(cached): return cached logger.info("Building strings dump for %s — this is a one-time cost", image_path) import shlex # Write srch_strings output directly to a temp file to avoid holding # the entire dump in memory. fd, dump_path = tempfile.mkstemp(prefix="strings_dump_", suffix=".txt") os.close(fd) # -a = scan entire file, -t d = print decimal byte offset of each string cmd_str = ( f"srch_strings -a -t d {shlex.quote(image_path)} " f"> {shlex.quote(dump_path)}" ) proc = await asyncio.create_subprocess_shell( cmd_str, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: err = stderr.decode("utf-8", errors="replace") logger.error("srch_strings failed (rc=%d): %s", proc.returncode, err) # Fall back: don't cache, let search_strings do a direct pipe os.unlink(dump_path) return "" size_mb = os.path.getsize(dump_path) / (1024 * 1024) logger.info("Strings dump ready: %s (%.1f MB)", dump_path, size_mb) _strings_cache[image_path] = dump_path return dump_path async def search_strings( image_path: str, pattern: str, ) -> str: """Search for string patterns in the image. On first call, builds a strings dump (one-time full scan). Subsequent calls grep the cached dump — orders of magnitude faster. """ import shlex dump_path = await _ensure_strings_dump(image_path) if dump_path: # Fast path: grep the cached dump file cmd_str = ( f"grep -i {shlex.quote(pattern)} {shlex.quote(dump_path)} | head -100" ) else: # Fallback: direct pipe (cache build failed) cmd_str = ( f"srch_strings -a {shlex.quote(image_path)} " f"| grep -i {shlex.quote(pattern)} | head -100" ) proc = await asyncio.create_subprocess_shell( cmd_str, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() output = stdout.decode("utf-8", errors="replace") if not output.strip(): return f"No strings matching '{pattern}' found." return output[:16000] async def count_deleted_files(image_path: str, offset: int = 0) -> str: """List and count deleted files using fls -rd. Returns total count and extension breakdown.""" cmd = ["fls", "-rd", "-o", str(offset), image_path] output = await _run(cmd, max_output=64000) lines = output.strip().splitlines() ext_counts: dict[str, int] = {} exe_files = [] total = 0 for line in lines: if not line.strip(): continue total += 1 # Extract filename from fls output like "r/r * 1234: filename.ext" parts = line.split(":", 1) if len(parts) > 1: fname = parts[1].strip() ext = fname.rsplit(".", 1)[-1].lower() if "." in fname else "(no ext)" ext_counts[ext] = ext_counts.get(ext, 0) + 1 if ext in ("exe", "dll", "com", "bat", "cmd", "scr", "pif"): exe_files.append(fname) result = [f"=== Deleted Files Summary ===", f"Total deleted entries: {total}"] result.append(f"\nExecutable files ({len(exe_files)}):") for e in exe_files[:50]: result.append(f" {e}") if len(exe_files) > 50: result.append(f" ... ({len(exe_files) - 50} more)") result.append(f"\nExtension breakdown:") for ext, count in sorted(ext_counts.items(), key=lambda x: -x[1])[:30]: result.append(f" .{ext}: {count}") return "\n".join(result) async def build_timeline(image_path: str, offset: int = 0) -> str: """Build a MAC timeline using fls -m.""" cmd = ["fls", "-m", "/", "-o", str(offset), "-r", image_path] return await _run(cmd, max_output=32000)