Files
MASForensic/log_config.py
BattleTag 097d2ce472 Initial commit
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 17:36:26 +08:00

244 lines
9.6 KiB
Python

"""Logging configuration — colored terminal output + detailed log file.
Terminal: compact, colored, hierarchical display with tool-call folding.
File: full-detail timestamped log for post-run analysis.
"""
from __future__ import annotations
import logging
import sys
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# ANSI color codes
# ---------------------------------------------------------------------------
class _C:
"""ANSI escape sequences for terminal colors."""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
# Foreground
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
GREY = "\033[90m"
# Bright foreground
B_RED = "\033[91m"
B_GREEN = "\033[92m"
B_YELLOW = "\033[93m"
B_BLUE = "\033[94m"
B_MAGENTA = "\033[95m"
B_CYAN = "\033[96m"
# Agent name → color mapping
_AGENT_COLORS: dict[str, str] = {
"filesystem": _C.B_CYAN,
"registry": _C.B_MAGENTA,
"communication": _C.B_YELLOW,
"network": _C.B_GREEN,
"timeline": _C.B_BLUE,
"hypothesis": _C.MAGENTA,
"report": _C.CYAN,
}
def _agent_color(name: str) -> str:
return _AGENT_COLORS.get(name, _C.WHITE)
def _format_elapsed(seconds: float) -> str:
"""Format elapsed seconds as human-readable string."""
if seconds < 1:
return f"{seconds * 1000:.0f}ms"
if seconds < 60:
return f"{seconds:.1f}s"
m, s = divmod(int(seconds), 60)
if m < 60:
return f"{m}m {s:02d}s"
h, m = divmod(m, 60)
return f"{h}h {m:02d}m {s:02d}s"
# ---------------------------------------------------------------------------
# Terminal formatter — compact, colored, hierarchical
# ---------------------------------------------------------------------------
class TerminalFormatter(logging.Formatter):
"""Colored, compact formatter for terminal output.
Recognizes structured log messages via extra fields:
- extra['event'] : event type for special formatting
- extra['agent'] : agent name for coloring
- extra['elapsed'] : elapsed seconds for timing display
- extra['tool_name'] : tool name for tool-call lines
- extra['tool_count'] : count for folded tool calls
"""
def format(self, record: logging.LogRecord) -> str:
ts = time.strftime("%H:%M:%S", time.localtime(record.created))
ts_str = f"{_C.GREY}[{ts}]{_C.RESET}"
event = getattr(record, "event", None)
# ── Phase banner ──────────────────────────────────────────
if event == "phase":
phase_name = record.getMessage()
line = f"\n{ts_str} {_C.BOLD}{_C.WHITE}══ {phase_name} {'' * max(1, 52 - len(phase_name))}{_C.RESET}"
return line
# ── Agent start ───────────────────────────────────────────
if event == "agent_start":
agent = getattr(record, "agent", "?")
task = record.getMessage()
color = _agent_color(agent)
return f"{ts_str} {color}{agent:<14}{_C.RESET} {task}"
# ── Agent done ────────────────────────────────────────────
if event == "agent_done":
agent = getattr(record, "agent", "?")
elapsed = getattr(record, "elapsed", 0)
summary = record.getMessage()
color = _agent_color(agent)
elapsed_str = f"{_C.GREY}({_format_elapsed(elapsed)}){_C.RESET}" if elapsed else ""
return f"{ts_str} {color}└─{_C.RESET} {summary} {elapsed_str}"
# ── Tool calls (folded) ───────────────────────────────────
if event == "tool_calls":
summary = record.getMessage()
elapsed = getattr(record, "elapsed", 0)
elapsed_str = f"{_C.GREY}({_format_elapsed(elapsed)}){_C.RESET}" if elapsed else ""
return f"{ts_str} {_C.DIM}├─ {summary}{_C.RESET} {elapsed_str}"
# ── Lead dispatch ─────────────────────────────────────────
if event == "dispatch":
msg = record.getMessage()
return f"{ts_str} {_C.BLUE}{msg}{_C.RESET}"
# ── Evidence progress ─────────────────────────────────────
if event == "progress":
msg = record.getMessage()
elapsed = getattr(record, "elapsed", 0)
elapsed_str = f" {_C.GREY}({_format_elapsed(elapsed)}){_C.RESET}" if elapsed else ""
return f"{ts_str} {_C.GREEN}{msg}{_C.RESET}{elapsed_str}"
# ── Hypothesis update ─────────────────────────────────────
if event == "hypothesis":
msg = record.getMessage()
return f"{ts_str} {_C.MAGENTA}{msg}{_C.RESET}"
# ── Warnings ──────────────────────────────────────────────
if record.levelno >= logging.WARNING:
msg = record.getMessage()
return f"{ts_str} {_C.B_YELLOW}WARN{_C.RESET} {msg}"
# ── Errors ────────────────────────────────────────────────
if record.levelno >= logging.ERROR:
msg = record.getMessage()
return f"{ts_str} {_C.B_RED}ERROR{_C.RESET} {msg}"
# ── Default (suppressed unless DEBUG) ─────────────────────
# Most raw INFO messages (httpx, debug-level tool calls) are
# handled by the level filter and never reach here.
msg = record.getMessage()
return f"{ts_str} {_C.DIM}{msg}{_C.RESET}"
class FileFormatter(logging.Formatter):
"""Detailed formatter for log files — full timestamps, all fields."""
def format(self, record: logging.LogRecord) -> str:
ts = self.formatTime(record, "%Y-%m-%d %H:%M:%S")
ms = f"{record.created % 1:.3f}"[1:] # .NNN
level = record.levelname[0] # I/W/E/D
name = record.name
event = getattr(record, "event", "")
agent = getattr(record, "agent", "")
elapsed = getattr(record, "elapsed", "")
# Build context tags
tags = ""
if event:
tags += f" @{event}"
if agent:
tags += f" [{agent}]"
if elapsed:
tags += f" ({_format_elapsed(elapsed)})"
msg = record.getMessage()
return f"{ts}{ms} {level} {name}{tags}: {msg}"
# ---------------------------------------------------------------------------
# Filters
# ---------------------------------------------------------------------------
class TerminalFilter(logging.Filter):
"""Filter for terminal handler — suppress noisy loggers and low-value messages."""
# Logger names to suppress from terminal entirely
_SUPPRESSED = {"httpx", "httpcore"}
def filter(self, record: logging.LogRecord) -> bool:
# Suppress httpx noise
if record.name in self._SUPPRESSED:
return False
# Suppress DEBUG from all loggers in terminal
if record.levelno < logging.INFO:
return False
# Suppress raw llm_client tool-call lines (we show folded summaries instead)
if record.name == "llm_client" and "Calling tool" in record.getMessage():
return False
if record.name == "llm_client" and "(parallel)" in record.getMessage():
return False
# Suppress raw LLM request/response debug lines
if record.name == "llm_client" and record.getMessage().startswith("LLM"):
return False
return True
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
def setup_logging(run_dir: Path, verbose: bool = False) -> None:
"""Configure logging with colored terminal + detailed file output.
Args:
run_dir: Directory for the log file.
verbose: If True, show all messages in terminal (no filtering).
"""
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# Remove any existing handlers (e.g., from basicConfig)
root.handlers.clear()
# ── Terminal handler ──────────────────────────────────────
term_handler = logging.StreamHandler(sys.stderr)
term_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
term_handler.setFormatter(TerminalFormatter())
if not verbose:
term_handler.addFilter(TerminalFilter())
root.addHandler(term_handler)
# ── File handler (full detail) ────────────────────────────
file_handler = logging.FileHandler(run_dir / "masforensics.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(FileFormatter())
root.addHandler(file_handler)
# Suppress overly chatty third-party loggers even in the file
logging.getLogger("httpcore").setLevel(logging.WARNING)