Files
MASForensic/tools/parsers.py
BattleTag 097d2ce472 Initial commit
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 17:36:26 +08:00

235 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Parsers for various forensic artifact formats."""
from __future__ import annotations
import asyncio
import logging
import os
import re
import struct
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
async def read_text_file(file_path: str, max_bytes: int = 8000) -> str:
"""Read a text file, with size limit."""
try:
with open(file_path, "r", errors="replace") as f:
content = f.read(max_bytes)
size = os.path.getsize(file_path)
if size > max_bytes:
content += f"\n\n[Truncated: file is {size} bytes, showing first {max_bytes}]"
return content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_binary_preview(file_path: str, max_bytes: int = 2000) -> str:
"""Read a binary file and show hex + ASCII preview."""
try:
with open(file_path, "rb") as f:
data = f.read(max_bytes)
lines = []
for i in range(0, len(data), 16):
chunk = data[i:i + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f"{i:08x} {hex_part:<48} {ascii_part}")
size = os.path.getsize(file_path)
header = f"File: {file_path} ({size} bytes)\n"
return header + "\n".join(lines)
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def read_text_file_section(file_path: str, start: int = 0, max_bytes: int = 8000) -> str:
"""Read a section of a text file starting at byte offset `start`."""
try:
size = os.path.getsize(file_path)
with open(file_path, "r", errors="replace") as f:
if start > 0:
f.seek(start)
content = f.read(max_bytes)
remaining = size - start - len(content.encode("utf-8", errors="replace"))
header = f"[File: {file_path}, {size} bytes, showing offset {start}{start + len(content.encode('utf-8', errors='replace'))}]"
if remaining > 0:
content += f"\n\n[{remaining} bytes remaining after this section]"
return header + "\n" + content
except Exception as e:
return f"[Error reading {file_path}: {e}]"
async def search_text_file(file_path: str, pattern: str, max_matches: int = 50) -> str:
"""Search for a pattern in an extracted text file. Returns matching lines with line numbers."""
try:
size = os.path.getsize(file_path)
matches = []
try:
compiled = re.compile(pattern, re.IGNORECASE)
except re.error:
compiled = re.compile(re.escape(pattern), re.IGNORECASE)
with open(file_path, "r", errors="replace") as f:
for lineno, line in enumerate(f, 1):
if compiled.search(line):
matches.append(f" {lineno}: {line.rstrip()[:200]}")
if len(matches) >= max_matches:
matches.append(f" [Truncated: more than {max_matches} matches]")
break
header = f"Search '{pattern}' in {file_path} ({size} bytes): {len(matches)} matches"
if not matches:
return header + "\n (no matches)"
return header + "\n" + "\n".join(matches)
except Exception as e:
return f"[Error searching {file_path}: {e}]"
async def parse_pcap_strings(file_path: str) -> str:
"""Extract HTTP headers and other readable strings from a PCAP/capture file.
Uses the `strings` command to find printable text, then filters for
forensically relevant patterns (HTTP headers, URLs, credentials).
"""
try:
proc = await asyncio.create_subprocess_exec(
"srch_strings", "-a", "-n", "8", file_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
all_strings = stdout.decode("utf-8", errors="replace").splitlines()
hosts = set()
user_agents = set()
urls = []
cookies = []
http_methods = []
other_interesting = []
for line in all_strings:
stripped = line.strip()
if stripped.startswith("Host: "):
hosts.add(stripped[6:])
elif stripped.startswith("User-Agent: "):
user_agents.add(stripped[12:])
elif stripped.startswith("Cookie: "):
cookies.append(stripped[:200])
elif re.match(r"^(GET|POST|PUT|DELETE|HEAD) /", stripped):
urls.append(stripped[:200])
elif stripped.startswith("HTTP/"):
http_methods.append(stripped[:200])
elif any(kw in stripped.lower() for kw in ("password", "login", "username", "email", "set-cookie")):
other_interesting.append(stripped[:200])
size = os.path.getsize(file_path)
lines = [f"=== PCAP String Analysis: {file_path} ({size} bytes) ==="]
lines.append(f"Total printable strings (>=8 chars): {len(all_strings)}")
lines.append(f"\nUnique Hosts ({len(hosts)}):")
for h in sorted(hosts):
lines.append(f" {h}")
lines.append(f"\nUser-Agent strings ({len(user_agents)}):")
for ua in sorted(user_agents):
lines.append(f" {ua}")
lines.append(f"\nHTTP Requests ({len(urls)}):")
for u in urls[:30]:
lines.append(f" {u}")
if len(urls) > 30:
lines.append(f" ... ({len(urls) - 30} more)")
lines.append(f"\nHTTP Responses ({len(http_methods)}):")
for m in http_methods[:20]:
lines.append(f" {m}")
if cookies:
lines.append(f"\nCookies ({len(cookies)}):")
for c in cookies[:20]:
lines.append(f" {c}")
if other_interesting:
lines.append(f"\nOther interesting strings ({len(other_interesting)}):")
for o in other_interesting[:30]:
lines.append(f" {o}")
return "\n".join(lines)
except Exception as e:
return f"[Error parsing PCAP strings: {e}]"
async def parse_prefetch(file_path: str) -> str:
"""Parse a Windows XP Prefetch (.pf) file to extract execution info.
Returns: executable name, last execution time, and run count.
"""
try:
with open(file_path, "rb") as f:
data = f.read()
if len(data) < 0x94:
return f"[Error: file too small for Prefetch format ({len(data)} bytes)]"
version = struct.unpack_from("<I", data, 0)[0]
sig = data[4:8]
if sig != b"SCCA":
return f"[Error: not a Prefetch file — signature is {sig!r}, expected b'SCCA']"
# Filename: null-terminated UTF-16LE at offset 0x10
raw_name = data[0x10:0x4C]
name_end = raw_name.find(b"\x00\x00")
if name_end > 0:
if name_end % 2 == 1:
name_end += 1
filename = raw_name[:name_end].decode("utf-16-le")
else:
filename = raw_name.decode("utf-16-le", errors="replace").rstrip("\x00")
# Last execution time: FILETIME at offset 0x78 (Windows XP, version 17)
ft = struct.unpack_from("<Q", data, 0x78)[0]
if ft > 0:
epoch = datetime(1601, 1, 1, tzinfo=timezone.utc)
last_run = epoch + timedelta(microseconds=ft // 10)
last_run_str = last_run.strftime("%Y-%m-%d %H:%M:%S UTC")
else:
last_run_str = "(not available)"
# Run count at offset 0x90
run_count = struct.unpack_from("<I", data, 0x90)[0]
lines = [
f"=== Prefetch Analysis: {file_path} ===",
f"Prefetch Version: {version}",
f"Executable: {filename}",
f"Last Execution: {last_run_str}",
f"Run Count: {run_count}",
f"File Size: {len(data)} bytes",
]
return "\n".join(lines)
except Exception as e:
return f"[Error parsing Prefetch: {e}]"
async def list_extracted_dir(dir_path: str) -> str:
"""List files in an extracted directory."""
try:
entries = []
for root, dirs, files in os.walk(dir_path):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, dir_path)
size = os.path.getsize(full)
entries.append(f" {rel} ({size} bytes)")
if len(entries) > 200:
entries.append(f" ... (truncated)")
break
return f"Directory: {dir_path}\nFiles ({len(entries)}):\n" + "\n".join(entries)
except Exception as e:
return f"[Error listing {dir_path}: {e}]"