"""Parsers for various forensic artifact formats.""" from __future__ import annotations import asyncio import logging import os import re import struct from datetime import datetime, timedelta, timezone logger = logging.getLogger(__name__) async def read_text_file(file_path: str, max_bytes: int = 8000) -> str: """Read a text file, with size limit.""" try: with open(file_path, "r", errors="replace") as f: content = f.read(max_bytes) size = os.path.getsize(file_path) if size > max_bytes: content += f"\n\n[Truncated: file is {size} bytes, showing first {max_bytes}]" return content except Exception as e: return f"[Error reading {file_path}: {e}]" async def read_binary_preview(file_path: str, max_bytes: int = 2000) -> str: """Read a binary file and show hex + ASCII preview.""" try: with open(file_path, "rb") as f: data = f.read(max_bytes) lines = [] for i in range(0, len(data), 16): chunk = data[i:i + 16] hex_part = " ".join(f"{b:02x}" for b in chunk) ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) lines.append(f"{i:08x} {hex_part:<48} {ascii_part}") size = os.path.getsize(file_path) header = f"File: {file_path} ({size} bytes)\n" return header + "\n".join(lines) except Exception as e: return f"[Error reading {file_path}: {e}]" async def read_text_file_section(file_path: str, start: int = 0, max_bytes: int = 8000) -> str: """Read a section of a text file starting at byte offset `start`.""" try: size = os.path.getsize(file_path) with open(file_path, "r", errors="replace") as f: if start > 0: f.seek(start) content = f.read(max_bytes) remaining = size - start - len(content.encode("utf-8", errors="replace")) header = f"[File: {file_path}, {size} bytes, showing offset {start}–{start + len(content.encode('utf-8', errors='replace'))}]" if remaining > 0: content += f"\n\n[{remaining} bytes remaining after this section]" return header + "\n" + content except Exception as e: return f"[Error reading {file_path}: {e}]" async def search_text_file(file_path: str, pattern: str, max_matches: int = 50) -> str: """Search for a pattern in an extracted text file. Returns matching lines with line numbers.""" try: size = os.path.getsize(file_path) matches = [] try: compiled = re.compile(pattern, re.IGNORECASE) except re.error: compiled = re.compile(re.escape(pattern), re.IGNORECASE) with open(file_path, "r", errors="replace") as f: for lineno, line in enumerate(f, 1): if compiled.search(line): matches.append(f" {lineno}: {line.rstrip()[:200]}") if len(matches) >= max_matches: matches.append(f" [Truncated: more than {max_matches} matches]") break header = f"Search '{pattern}' in {file_path} ({size} bytes): {len(matches)} matches" if not matches: return header + "\n (no matches)" return header + "\n" + "\n".join(matches) except Exception as e: return f"[Error searching {file_path}: {e}]" async def parse_pcap_strings(file_path: str) -> str: """Extract HTTP headers and other readable strings from a PCAP/capture file. Uses the `strings` command to find printable text, then filters for forensically relevant patterns (HTTP headers, URLs, credentials). """ try: proc = await asyncio.create_subprocess_exec( "srch_strings", "-a", "-n", "8", file_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() all_strings = stdout.decode("utf-8", errors="replace").splitlines() hosts = set() user_agents = set() urls = [] cookies = [] http_methods = [] other_interesting = [] for line in all_strings: stripped = line.strip() if stripped.startswith("Host: "): hosts.add(stripped[6:]) elif stripped.startswith("User-Agent: "): user_agents.add(stripped[12:]) elif stripped.startswith("Cookie: "): cookies.append(stripped[:200]) elif re.match(r"^(GET|POST|PUT|DELETE|HEAD) /", stripped): urls.append(stripped[:200]) elif stripped.startswith("HTTP/"): http_methods.append(stripped[:200]) elif any(kw in stripped.lower() for kw in ("password", "login", "username", "email", "set-cookie")): other_interesting.append(stripped[:200]) size = os.path.getsize(file_path) lines = [f"=== PCAP String Analysis: {file_path} ({size} bytes) ==="] lines.append(f"Total printable strings (>=8 chars): {len(all_strings)}") lines.append(f"\nUnique Hosts ({len(hosts)}):") for h in sorted(hosts): lines.append(f" {h}") lines.append(f"\nUser-Agent strings ({len(user_agents)}):") for ua in sorted(user_agents): lines.append(f" {ua}") lines.append(f"\nHTTP Requests ({len(urls)}):") for u in urls[:30]: lines.append(f" {u}") if len(urls) > 30: lines.append(f" ... ({len(urls) - 30} more)") lines.append(f"\nHTTP Responses ({len(http_methods)}):") for m in http_methods[:20]: lines.append(f" {m}") if cookies: lines.append(f"\nCookies ({len(cookies)}):") for c in cookies[:20]: lines.append(f" {c}") if other_interesting: lines.append(f"\nOther interesting strings ({len(other_interesting)}):") for o in other_interesting[:30]: lines.append(f" {o}") return "\n".join(lines) except Exception as e: return f"[Error parsing PCAP strings: {e}]" async def parse_prefetch(file_path: str) -> str: """Parse a Windows XP Prefetch (.pf) file to extract execution info. Returns: executable name, last execution time, and run count. """ try: with open(file_path, "rb") as f: data = f.read() if len(data) < 0x94: return f"[Error: file too small for Prefetch format ({len(data)} bytes)]" version = struct.unpack_from(" 0: if name_end % 2 == 1: name_end += 1 filename = raw_name[:name_end].decode("utf-16-le") else: filename = raw_name.decode("utf-16-le", errors="replace").rstrip("\x00") # Last execution time: FILETIME at offset 0x78 (Windows XP, version 17) ft = struct.unpack_from(" 0: epoch = datetime(1601, 1, 1, tzinfo=timezone.utc) last_run = epoch + timedelta(microseconds=ft // 10) last_run_str = last_run.strftime("%Y-%m-%d %H:%M:%S UTC") else: last_run_str = "(not available)" # Run count at offset 0x90 run_count = struct.unpack_from(" str: """Smart summary of a (potentially huge) extracted tree. Earlier versions dumped up to 200 random entries then truncated — that leaves the agent blind on 10k+-file iOS extractions. The new layout returns a compact summary that scales: total counts, extension breakdown, top-level directories with their sizes, and the largest files. For targeted lookups (e.g. find every ``*.sqlite`` under the tree) the agent should use ``find_files`` instead. """ if not os.path.isdir(dir_path): return f"[Error: {dir_path} is not a directory]" try: total_files = 0 total_bytes = 0 ext_counts: dict[str, int] = {} ext_bytes: dict[str, int] = {} top_level_dirs: dict[str, dict] = {} biggest: list[tuple[int, str]] = [] # (size, relpath) dir_path_abs = os.path.abspath(dir_path) for root, dirs, files in os.walk(dir_path_abs): # Track top-level directory aggregates (cheap; no per-entry cost # beyond the walk we're already doing). rel_root = os.path.relpath(root, dir_path_abs) if rel_root == ".": top_dirs = {d: {"files": 0, "bytes": 0} for d in dirs} top_level_dirs.update(top_dirs) top_key = None else: top_key = rel_root.split(os.sep, 1)[0] if top_key not in top_level_dirs: top_level_dirs[top_key] = {"files": 0, "bytes": 0} for f in files: full = os.path.join(root, f) try: size = os.path.getsize(full) except OSError: continue total_files += 1 total_bytes += size ext = os.path.splitext(f)[1].lower() or "(no ext)" ext_counts[ext] = ext_counts.get(ext, 0) + 1 ext_bytes[ext] = ext_bytes.get(ext, 0) + size if top_key is not None: top_level_dirs[top_key]["files"] += 1 top_level_dirs[top_key]["bytes"] += size # Maintain a top-10 largest list cheaply (bounded insertion). if len(biggest) < 10: biggest.append((size, os.path.relpath(full, dir_path_abs))) biggest.sort(reverse=True) elif size > biggest[-1][0]: biggest[-1] = (size, os.path.relpath(full, dir_path_abs)) biggest.sort(reverse=True) def _human(n: int) -> str: for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f}{unit}" if unit != "B" else f"{n}B" n /= 1024 return f"{n:.1f}TB" lines = [ f"Directory: {dir_path}", f" Total: {total_files} file(s), {_human(total_bytes)}", ] # Top-level directory layout (immediate children, sorted by file count). if top_level_dirs: lines.append(f"\nTop-level layout ({len(top_level_dirs)} dirs at root):") sorted_tlds = sorted( top_level_dirs.items(), key=lambda kv: -kv[1]["files"], )[:15] for d, stats in sorted_tlds: lines.append( f" {d}/ ({stats['files']} files, {_human(stats['bytes'])})" ) if len(top_level_dirs) > 15: lines.append(f" ... ({len(top_level_dirs) - 15} more top-level dirs)") # Extension breakdown. if ext_counts: lines.append(f"\nExtension breakdown (top 15):") for ext, count in sorted(ext_counts.items(), key=lambda kv: -kv[1])[:15]: lines.append( f" {ext}: {count} files, {_human(ext_bytes.get(ext, 0))}" ) # Largest files (often the highest-value forensic targets). if biggest: lines.append("\nLargest files:") for size, rel in biggest: lines.append(f" {rel} ({_human(size)})") lines.append( f"\nNext step: call find_files with a pattern like " f"'**/*.plist' or '**/keychain-2.db' to locate specific artefacts." ) return "\n".join(lines) except Exception as e: return f"[Error listing {dir_path}: {e}]" async def find_files( root: str, pattern: str, max_results: int = 500, ) -> str: """Recursively find files under *root* whose path matches *pattern*. Uses fnmatch-style globs against the *full relative path*; ``**`` is treated as "any number of path segments" (so ``**/*.plist`` finds every plist no matter how deep). Examples: - ``**/sms.db`` — iOS SMS database - ``**/keychain-2.db`` — iOS keychain - ``**/ChatStorage.sqlite`` — WhatsApp app store - ``HomeDomain/Library/**`` — anchor at a known iOS domain root - ``**/*.{plist,sqlite,db}`` — multi-extension (use 2+ calls or a regex if needed) Results are sorted by size descending — the biggest hits usually matter most. Capped at *max_results* to keep the LLM context bounded. """ import fnmatch if not os.path.isdir(root): return f"[Error: {root} is not a directory]" root_abs = os.path.abspath(root) # Convert ``**`` (any-depth) to fnmatch's ``*`` (any chars including /). # fnmatch doesn't natively distinguish segment vs path; expanding ``**`` # to ``*`` and letting fnmatch match the full relpath is good enough for # forensic lookups. fn_pattern = pattern.replace("**", "*") hits: list[tuple[int, str]] = [] truncated = False try: for dirpath, _dirs, files in os.walk(root_abs): for f in files: full = os.path.join(dirpath, f) rel = os.path.relpath(full, root_abs) if fnmatch.fnmatch(rel, fn_pattern) or fnmatch.fnmatch(f, fn_pattern): try: size = os.path.getsize(full) except OSError: size = 0 hits.append((size, rel)) if len(hits) >= max_results * 4: # Hard upper bound to keep the walk cheap on huge trees. truncated = True break if truncated: break except Exception as e: return f"[Error searching {root}: {e}]" hits.sort(reverse=True) if len(hits) > max_results: truncated = True hits = hits[:max_results] lines = [ f"find_files: pattern={pattern!r} under {root}", f" matches: {len(hits)}" + (" (truncated)" if truncated else ""), ] if not hits: lines.append(" (no matches)") else: for size, rel in hits: lines.append(f" {rel} ({size} bytes)") return "\n".join(lines)