Files
MASForensic/evidence_graph.py
BattleTag 444d58726a refactor: native tool calling + generic forced-retry + terminal exit
- llm_client: switch tool_call_loop from text-based <tool_call> regex
  to OpenAI-native tools=[...] / structured tool_calls field; accumulate
  delta.reasoning_content for DeepSeek thinking-mode echo-back; fold
  preserves system msg and aligns boundary to never orphan role:tool
- base_agent: generic forced-retry via mandatory_record_tools class attr
  (filesystem -> add_phenomenon, timeline -> add_temporal_edge,
  hypothesis -> add_hypothesis, report -> save_report); count via
  executor wrapper
- terminal_tools class attr + loop short-circuit: when a terminal tool
  is called, loop exits with its raw return as final_text. ReportAgent
  declares save_report as terminal - replaces the <answer>-tag stop
  signal that native tool calling broke
- _execute_*: return (raw, formatted) - terminal exit uses untruncated
  raw, conversation history uses 3000-char-capped formatted
- evidence_graph + orchestrator: LLM-derived InvestigationArea support
  (hypothesis-driven coverage check, replaces hardcoded _AREA_KEYWORDS /
  _AREA_TOOLS); manual yaml block kept as optional seed
- strip <answer> references from agent prompts (no longer load-bearing)

Verified on CFReDS image across 4 smoke runs: 0 JSON parse failures
(was 3); 22 temporal edges from Phase 4 (was 0); ReportAgent exits via
save_report (was max_iterations regression). 78/78 unit tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 13:51:19 +08:00

914 lines
33 KiB
Python

"""Evidence Knowledge Graph for multi-agent forensic analysis.
Replaces the flat Blackboard with a graph-based evidence store.
Nodes: Phenomenon (observable artifacts), Hypothesis (interpretive claims), Entity (recurring objects).
Edges: typed relationships with predefined weights for hypothesis confidence computation.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Default edge weights for Phenomenon → Hypothesis relationships.
# LLM only picks the edge type (categorical); the weight is looked up here.
# Override per-graph via EvidenceGraph(edge_weights=...) or config.yaml's
# `hypothesis_edge_weights` section.
# ---------------------------------------------------------------------------
_DEFAULT_EDGE_WEIGHTS: dict[str, float] = {
"direct_evidence": +0.25,
"supports": +0.15,
"prerequisite_met": +0.10,
"consequence_observed": +0.15,
"contradicts": -0.20,
"weakens": -0.10,
}
# All valid edge types across the graph.
VALID_EDGE_TYPES: set[str] = {
# Phenomenon → Hypothesis
"direct_evidence", "supports", "prerequisite_met",
"consequence_observed", "contradicts", "weakens",
# Phenomenon → Phenomenon
"temporal", "causal", "input_to", "modifies", "co_located", "corroborates",
# Phenomenon → Entity
"created_by", "executed_by", "owned_by", "targets",
"associated_with", "found_on", "used_by",
# Hypothesis → Hypothesis
"refines", "conflicts", "depends_on",
}
# ---------------------------------------------------------------------------
# Graph node types
# ---------------------------------------------------------------------------
def _compute_quality_score(
source_tool: str,
timestamp: str | None,
raw_data: dict,
description: str,
related_ids: list[str],
) -> float:
"""Compute a quality score (0.0-1.0) based on evidence completeness."""
score = 0.0
if source_tool:
score += 0.25
if timestamp is not None:
score += 0.20
if raw_data:
score += 0.25
if len(description) >= 50:
score += 0.15
if related_ids:
score += 0.15
return score
def _jaccard_similarity(a: str, b: str) -> float:
"""Token-level Jaccard similarity between two strings."""
tokens_a = set(a.lower().split())
tokens_b = set(b.lower().split())
if not tokens_a or not tokens_b:
return 0.0
return len(tokens_a & tokens_b) / len(tokens_a | tokens_b)
@dataclass
class Phenomenon:
"""Raw observable artifact found on disk."""
id: str # "ph-{uuid8}"
source_agent: str
category: str # filesystem, registry, email, network, timeline
title: str
description: str
raw_data: dict = field(default_factory=dict)
timestamp: str | None = None
confidence: float = 1.0
source_tool: str = ""
corroborating_agents: list[str] = field(default_factory=list)
from_lead_id: str | None = None
created_at: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Phenomenon:
return cls(**d)
def summary(self) -> str:
ts = f" @ {self.timestamp}" if self.timestamp else ""
return f"[{self.id}] [{self.category}] {self.title}{ts} (conf={self.confidence:.2f})"
@dataclass
class Hypothesis:
"""Interpretive claim about what happened on the system."""
id: str # "hyp-{uuid8}"
title: str
description: str
confidence: float = 0.5
status: str = "active" # active, supported, refuted, inconclusive
parent_id: str | None = None
created_by: str = "" # "manual", "hypothesis_agent", agent name
created_at: str = ""
confidence_log: list[dict] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Hypothesis:
return cls(**d)
def summary(self) -> str:
return f"[{self.id}] {self.title} (conf={self.confidence:.2f}, {self.status})"
@dataclass
class Entity:
"""Recurring actor or object across phenomena."""
id: str # "ent-{uuid8}"
name: str
entity_type: str # person, program, file, host, ip_address
description: str = ""
created_at: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Entity:
return cls(**d)
def summary(self) -> str:
return f"[{self.id}] {self.entity_type}: {self.name}"
@dataclass
class Edge:
"""Directed edge in the evidence graph."""
id: str # "edge-{uuid8}"
source_id: str
target_id: str
edge_type: str
metadata: dict = field(default_factory=dict)
created_by: str = ""
created_at: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Edge:
return cls(**d)
@dataclass
class Lead:
"""An investigative lead that should be followed up by an agent."""
id: str
target_agent: str
description: str
priority: int = 5 # 1 (highest) - 10 (lowest)
context: dict = field(default_factory=dict)
status: str = "pending" # pending, assigned, completed, failed
hypothesis_id: str | None = None
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> Lead:
return cls(**d)
@dataclass
class InvestigationArea:
"""An area to investigate to confirm/refute one or more hypotheses.
Derived by the orchestrator from active hypotheses after Phase 2; also
seeded from config.yaml:investigation_areas as an optional manual
override. Each area carries its own keywords + expected tools so the
gap-analysis coverage check is generic, not tied to hard-coded constants.
"""
id: str # "area-{slug}"
area: str # snake_case slug (dedupe key)
description: str
suggested_agent: str # filesystem / registry / communication / network / timeline
expected_keywords: list[str] = field(default_factory=list)
expected_tools: list[str] = field(default_factory=list)
priority: int = 5 # 1 (highest) - 10 (lowest)
motivating_hypothesis_ids: list[str] = field(default_factory=list)
created_by: str = "" # "manual" | "llm_derive" | "fallback"
created_at: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> InvestigationArea:
return cls(**d)
def summary(self) -> str:
return (
f"[{self.area}] P{self.priority} agent={self.suggested_agent} "
f"(motivating: {len(self.motivating_hypothesis_ids)})"
)
@dataclass
class ExtractedAsset:
"""A file extracted from the disk image and tracked in the asset library."""
id: str # "asset-{uuid8}"
inode: str # e.g. "334-128-4"
original_path: str # disk image path from ffind
local_path: str # "extracted/SYSTEM"
category: str # registry_hive, chat_log, prefetch, ...
filename: str # "SYSTEM"
size_bytes: int
extracted_by: str # agent name
extracted_at: str # ISO timestamp
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> ExtractedAsset:
return cls(**d)
def summary(self) -> str:
size_kb = self.size_bytes / 1024
return (
f"[{self.id}] {self.filename} ({self.category}) "
f"{size_kb:.1f}KB @ {self.local_path} [inode:{self.inode}]"
)
# ---------------------------------------------------------------------------
# Evidence Graph
# ---------------------------------------------------------------------------
class EvidenceGraph:
"""Graph-based evidence store for multi-agent forensic analysis.
Agents interact with the graph via query tools (list_phenomena,
get_phenomenon, search_graph, get_related) rather than reading
a full dump in the system prompt.
"""
def __init__(
self,
case_info: dict | None = None,
persist_path: Path | None = None,
edge_weights: dict[str, float] | None = None,
) -> None:
self.case_info: dict = case_info or {}
self.edge_weights: dict[str, float] = (
dict(edge_weights) if edge_weights else dict(_DEFAULT_EDGE_WEIGHTS)
)
self.image_path: str = ""
self.partition_offset: int = 0
self.extracted_dir: str = "extracted"
# Graph storage
self.phenomena: dict[str, Phenomenon] = {}
self.hypotheses: dict[str, Hypothesis] = {}
self.entities: dict[str, Entity] = {}
self.edges: list[Edge] = []
# Adjacency index for fast traversal
self._adj: dict[str, list[Edge]] = {} # node_id → outgoing edges
self._adj_rev: dict[str, list[Edge]] = {} # node_id → incoming edges
# Lead / status management (carried over from Blackboard)
self.leads: list[Lead] = []
self.agent_status: dict[str, str] = {}
# Asset library — tracks all files extracted from the disk image
self.asset_library: dict[str, ExtractedAsset] = {}
self._inode_index: dict[str, str] = {} # inode → asset_id
# Investigation areas — derived from hypotheses (LLM) and/or seeded
# from config.yaml:investigation_areas (manual override). Drives the
# gap-analysis coverage check.
self.investigation_areas: dict[str, InvestigationArea] = {}
# Set by BaseAgent.run() before each agent execution
self._current_agent: str = ""
self._lock = asyncio.Lock()
self._persist_path: Path | None = persist_path
# ---- Persistence -------------------------------------------------------
def _auto_save(self) -> None:
"""Persist full state to disk. Must be called inside _lock."""
if self._persist_path is None:
return
try:
state = {
"case_info": self.case_info,
"image_path": self.image_path,
"partition_offset": self.partition_offset,
"extracted_dir": self.extracted_dir,
"phenomena": {pid: p.to_dict() for pid, p in self.phenomena.items()},
"hypotheses": {hid: h.to_dict() for hid, h in self.hypotheses.items()},
"entities": {eid: e.to_dict() for eid, e in self.entities.items()},
"edges": [e.to_dict() for e in self.edges],
"leads": [l.to_dict() for l in self.leads],
"agent_status": dict(self.agent_status),
"asset_library": {aid: a.to_dict() for aid, a in self.asset_library.items()},
"investigation_areas": {
aid: a.to_dict() for aid, a in self.investigation_areas.items()
},
"saved_at": datetime.now().isoformat(),
}
tmp = self._persist_path.with_suffix(".tmp")
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2))
tmp.replace(self._persist_path)
except Exception as e:
logger.error("EvidenceGraph auto-save failed: %s", e)
def save_state(self, path: Path) -> None:
"""Explicitly save state to the given path."""
old = self._persist_path
self._persist_path = path
self._auto_save()
self._persist_path = old
@classmethod
def load_state(
cls,
path: Path,
edge_weights: dict[str, float] | None = None,
) -> EvidenceGraph:
"""Restore an EvidenceGraph from a saved JSON state file."""
data = json.loads(path.read_text())
graph = cls(
case_info=data.get("case_info", {}),
persist_path=path,
edge_weights=edge_weights,
)
graph.image_path = data.get("image_path", "")
graph.partition_offset = data.get("partition_offset", 0)
graph.extracted_dir = data.get("extracted_dir", "extracted")
graph.phenomena = {
pid: Phenomenon.from_dict(p)
for pid, p in data.get("phenomena", {}).items()
}
graph.hypotheses = {
hid: Hypothesis.from_dict(h)
for hid, h in data.get("hypotheses", {}).items()
}
graph.entities = {
eid: Entity.from_dict(e)
for eid, e in data.get("entities", {}).items()
}
graph.edges = [Edge.from_dict(e) for e in data.get("edges", [])]
graph.leads = [Lead.from_dict(l) for l in data.get("leads", [])]
graph.agent_status = data.get("agent_status", {})
for aid, a_data in data.get("asset_library", {}).items():
asset = ExtractedAsset.from_dict(a_data)
graph.asset_library[aid] = asset
graph._inode_index[asset.inode] = aid
graph.investigation_areas = {
aid: InvestigationArea.from_dict(a)
for aid, a in data.get("investigation_areas", {}).items()
}
graph._rebuild_adjacency()
logger.info(
"EvidenceGraph restored: %d phenomena, %d hypotheses, %d entities, "
"%d edges, %d assets",
len(graph.phenomena), len(graph.hypotheses),
len(graph.entities), len(graph.edges), len(graph.asset_library),
)
return graph
def _rebuild_adjacency(self) -> None:
"""Rebuild adjacency index from edges list."""
self._adj.clear()
self._adj_rev.clear()
for edge in self.edges:
self._adj.setdefault(edge.source_id, []).append(edge)
self._adj_rev.setdefault(edge.target_id, []).append(edge)
# ---- Node helpers -------------------------------------------------------
def _node_exists(self, node_id: str) -> bool:
if node_id.startswith("ph-"):
return node_id in self.phenomena
if node_id.startswith("hyp-"):
return node_id in self.hypotheses
if node_id.startswith("ent-"):
return node_id in self.entities
return False
def get_node(self, node_id: str) -> Phenomenon | Hypothesis | Entity | None:
if node_id.startswith("ph-"):
return self.phenomena.get(node_id)
if node_id.startswith("hyp-"):
return self.hypotheses.get(node_id)
if node_id.startswith("ent-"):
return self.entities.get(node_id)
return None
# ---- Similarity merging (Phenomenon only) --------------------------------
def _find_similar_phenomenon(
self, title: str, description: str, category: str,
) -> Phenomenon | None:
best_match: Phenomenon | None = None
best_score = 0.0
for ph in self.phenomena.values():
if ph.category != category:
continue
title_sim = _jaccard_similarity(ph.title, title)
if title_sim <= 0.6:
continue
desc_sim = _jaccard_similarity(ph.description[:200], description[:200])
if desc_sim <= 0.4:
continue
combined = title_sim * 0.6 + desc_sim * 0.4
if combined > best_score:
best_score = combined
best_match = ph
return best_match
# ---- Mutation methods (async, under lock) --------------------------------
async def add_phenomenon(
self,
source_agent: str,
category: str,
title: str,
description: str,
raw_data: dict | None = None,
timestamp: str | None = None,
source_tool: str = "",
from_lead_id: str | None = None,
) -> tuple[str, bool]:
"""Add a phenomenon. Returns (id, was_merged).
Confidence is auto-computed from evidence completeness (source_tool,
timestamp, raw_data, description length).
"""
async with self._lock:
similar = self._find_similar_phenomenon(title, description, category)
if similar is not None:
similar.confidence = min(1.0, similar.confidence + 0.15)
if source_agent not in similar.corroborating_agents:
similar.corroborating_agents.append(source_agent)
if raw_data:
for k, v in raw_data.items():
if k not in similar.raw_data:
similar.raw_data[k] = v
if from_lead_id and similar.from_lead_id is None:
similar.from_lead_id = from_lead_id
self._auto_save()
return similar.id, True
pid = f"ph-{uuid.uuid4().hex[:8]}"
confidence = _compute_quality_score(
source_tool, timestamp, raw_data or {},
description, [],
)
ph = Phenomenon(
id=pid,
source_agent=source_agent,
category=category,
title=title,
description=description,
raw_data=raw_data or {},
timestamp=timestamp,
confidence=confidence,
source_tool=source_tool,
from_lead_id=from_lead_id,
created_at=datetime.now().isoformat(),
)
self.phenomena[pid] = ph
self._auto_save()
return pid, False
async def add_hypothesis(
self,
title: str,
description: str,
created_by: str = "",
parent_id: str | None = None,
) -> str:
"""Add a hypothesis. Returns the hypothesis ID."""
async with self._lock:
hid = f"hyp-{uuid.uuid4().hex[:8]}"
hyp = Hypothesis(
id=hid,
title=title,
description=description,
confidence=0.5,
status="active",
parent_id=parent_id,
created_by=created_by,
created_at=datetime.now().isoformat(),
)
self.hypotheses[hid] = hyp
self._auto_save()
return hid
async def add_entity(
self,
name: str,
entity_type: str,
description: str = "",
) -> tuple[str, bool]:
"""Add an entity. Deduplicates on (name, entity_type). Returns (id, was_existing)."""
async with self._lock:
for ent in self.entities.values():
if ent.name == name and ent.entity_type == entity_type:
return ent.id, True
eid = f"ent-{uuid.uuid4().hex[:8]}"
self.entities[eid] = Entity(
id=eid,
name=name,
entity_type=entity_type,
description=description,
created_at=datetime.now().isoformat(),
)
self._auto_save()
return eid, False
async def add_edge(
self,
source_id: str,
target_id: str,
edge_type: str,
metadata: dict | None = None,
created_by: str = "",
) -> str:
"""Add a directed edge. Validates nodes exist and edge type is valid."""
async with self._lock:
if not self._node_exists(source_id):
raise ValueError(f"Source node not found: {source_id}")
if not self._node_exists(target_id):
raise ValueError(f"Target node not found: {target_id}")
if edge_type not in VALID_EDGE_TYPES:
raise ValueError(f"Invalid edge type: {edge_type}")
eid = f"edge-{uuid.uuid4().hex[:8]}"
edge = Edge(
id=eid,
source_id=source_id,
target_id=target_id,
edge_type=edge_type,
metadata=metadata or {},
created_by=created_by,
created_at=datetime.now().isoformat(),
)
self.edges.append(edge)
self._adj.setdefault(source_id, []).append(edge)
self._adj_rev.setdefault(target_id, []).append(edge)
self._auto_save()
return eid
async def update_hypothesis_confidence(
self,
hyp_id: str,
phenomenon_id: str,
edge_type: str,
reason: str = "",
) -> float:
"""Update hypothesis confidence based on a phenomenon linkage.
The edge_type must be one of self.edge_weights keys.
Weight is looked up from the configured table, NOT judged by LLM.
Returns the new confidence value.
"""
if edge_type not in self.edge_weights:
raise ValueError(
f"Invalid hypothesis edge type: {edge_type}. "
f"Must be one of: {list(self.edge_weights.keys())}"
)
async with self._lock:
if not self._node_exists(phenomenon_id):
raise ValueError(f"Phenomenon not found: {phenomenon_id}")
hyp = self.hypotheses.get(hyp_id)
if hyp is None:
raise ValueError(f"Hypothesis not found: {hyp_id}")
weight = self.edge_weights[edge_type]
old_conf = hyp.confidence
if weight > 0:
delta = weight * (1 - old_conf)
else:
delta = weight * old_conf
new_conf = max(0.0, min(1.0, old_conf + delta))
hyp.confidence = new_conf
if new_conf >= 0.8:
hyp.status = "supported"
elif new_conf <= 0.2:
hyp.status = "refuted"
hyp.confidence_log.append({
"timestamp": datetime.now().isoformat(),
"phenomenon_id": phenomenon_id,
"edge_type": edge_type,
"weight": weight,
"old_confidence": round(old_conf, 4),
"new_confidence": round(new_conf, 4),
"reason": reason,
})
# Also create the edge in the graph
eid = f"edge-{uuid.uuid4().hex[:8]}"
edge = Edge(
id=eid,
source_id=phenomenon_id,
target_id=hyp_id,
edge_type=edge_type,
metadata={"reason": reason},
created_by="hypothesis_engine",
created_at=datetime.now().isoformat(),
)
self.edges.append(edge)
self._adj.setdefault(phenomenon_id, []).append(edge)
self._adj_rev.setdefault(hyp_id, []).append(edge)
self._auto_save()
return new_conf
# ---- Lead management (same as old Blackboard) ----------------------------
async def add_lead(
self,
target_agent: str,
description: str,
priority: int = 5,
context: dict | None = None,
hypothesis_id: str | None = None,
) -> str:
async with self._lock:
lid = f"lead-{uuid.uuid4().hex[:8]}"
self.leads.append(Lead(
id=lid,
target_agent=target_agent,
description=description,
priority=priority,
context=context or {},
hypothesis_id=hypothesis_id,
))
self._auto_save()
return lid
async def get_pending_leads(self, agent_type: str | None = None) -> list[Lead]:
async with self._lock:
leads = [l for l in self.leads if l.status == "pending"]
if agent_type:
leads = [l for l in leads if l.target_agent == agent_type]
return sorted(leads, key=lambda l: l.priority)
async def mark_lead_completed(self, lead_id: str) -> None:
async with self._lock:
for lead in self.leads:
if lead.id == lead_id:
lead.status = "completed"
break
self._auto_save()
async def mark_lead_failed(self, lead_id: str, error: str = "") -> None:
async with self._lock:
for lead in self.leads:
if lead.id == lead_id:
lead.status = "failed"
lead.context["failure_reason"] = error
break
self._auto_save()
# ---- Investigation areas -------------------------------------------------
async def add_investigation_area(
self,
area: str,
description: str,
suggested_agent: str,
expected_keywords: list[str] | None = None,
expected_tools: list[str] | None = None,
priority: int = 5,
motivating_hypothesis_ids: list[str] | None = None,
created_by: str = "",
) -> tuple[str, bool]:
"""Add or merge an investigation area. Dedupe key is the `area` slug.
On collision, union the three list fields (keywords / tools /
motivating_hypothesis_ids); description / suggested_agent / priority
are preserved from the first writer (manual seed wins over LLM derive).
Returns (id, was_existing).
"""
async with self._lock:
for existing in self.investigation_areas.values():
if existing.area == area:
for kw in (expected_keywords or []):
if kw not in existing.expected_keywords:
existing.expected_keywords.append(kw)
for t in (expected_tools or []):
if t not in existing.expected_tools:
existing.expected_tools.append(t)
for hid in (motivating_hypothesis_ids or []):
if hid not in existing.motivating_hypothesis_ids:
existing.motivating_hypothesis_ids.append(hid)
self._auto_save()
return existing.id, True
aid = f"area-{area}"
self.investigation_areas[aid] = InvestigationArea(
id=aid,
area=area,
description=description,
suggested_agent=suggested_agent,
expected_keywords=list(expected_keywords or []),
expected_tools=list(expected_tools or []),
priority=priority,
motivating_hypothesis_ids=list(motivating_hypothesis_ids or []),
created_by=created_by,
created_at=datetime.now().isoformat(),
)
self._auto_save()
return aid, False
# ---- Asset library -------------------------------------------------------
async def register_asset(
self,
inode: str,
original_path: str,
local_path: str,
category: str,
filename: str,
size_bytes: int,
extracted_by: str,
) -> tuple[str, bool]:
"""Register an extracted file. Deduplicates by inode. Returns (id, already_existed)."""
async with self._lock:
if inode in self._inode_index:
return self._inode_index[inode], True
aid = f"asset-{uuid.uuid4().hex[:8]}"
asset = ExtractedAsset(
id=aid,
inode=inode,
original_path=original_path,
local_path=local_path,
category=category,
filename=filename,
size_bytes=size_bytes,
extracted_by=extracted_by,
extracted_at=datetime.now().isoformat(),
)
self.asset_library[aid] = asset
self._inode_index[inode] = aid
self._auto_save()
return aid, False
def lookup_asset_by_inode(self, inode: str) -> ExtractedAsset | None:
"""Look up an extracted asset by inode (synchronous, no lock needed for reads)."""
aid = self._inode_index.get(inode)
return self.asset_library.get(aid) if aid else None
def list_assets(self, category: str | None = None) -> list[str]:
"""Return one-line summaries of all assets, optionally filtered."""
results = []
for asset in self.asset_library.values():
if category and asset.category != category:
continue
results.append(asset.summary())
return results
def query_assets(
self,
category: str | None = None,
filename_pattern: str | None = None,
) -> list[ExtractedAsset]:
"""Query the asset library with optional filters."""
results = []
for asset in self.asset_library.values():
if category and asset.category != category:
continue
if filename_pattern and filename_pattern.lower() not in asset.filename.lower():
continue
results.append(asset)
return results
# ---- Query methods (for agent tools) ------------------------------------
def list_phenomena(self, category: str | None = None) -> list[str]:
"""Return one-line summaries of all phenomena, optionally filtered."""
results = []
for ph in self.phenomena.values():
if category and ph.category != category:
continue
results.append(ph.summary())
return results
def get_phenomenon(self, ph_id: str) -> dict | None:
"""Return full phenomenon details as dict, or None."""
ph = self.phenomena.get(ph_id)
return ph.to_dict() if ph else None
def search_graph(self, keyword: str) -> list[str]:
"""Search across all node types by keyword. Returns summaries."""
kw = keyword.lower()
results = []
for ph in self.phenomena.values():
if kw in ph.title.lower() or kw in ph.description.lower():
results.append(ph.summary())
for hyp in self.hypotheses.values():
if kw in hyp.title.lower() or kw in hyp.description.lower():
results.append(hyp.summary())
for ent in self.entities.values():
if kw in ent.name.lower() or kw in ent.description.lower():
results.append(ent.summary())
return results
def get_related(
self,
node_id: str,
edge_type: str | None = None,
direction: str = "both",
) -> list[dict]:
"""Get nodes connected to the given node. Returns list of {node_summary, edge_type, direction}."""
results = []
if direction in ("out", "both"):
for edge in self._adj.get(node_id, []):
if edge_type and edge.edge_type != edge_type:
continue
node = self.get_node(edge.target_id)
if node:
results.append({
"node": node.summary(),
"edge_type": edge.edge_type,
"direction": "outgoing",
"metadata": edge.metadata,
})
if direction in ("in", "both"):
for edge in self._adj_rev.get(node_id, []):
if edge_type and edge.edge_type != edge_type:
continue
node = self.get_node(edge.source_id)
if node:
results.append({
"node": node.summary(),
"edge_type": edge.edge_type,
"direction": "incoming",
"metadata": edge.metadata,
})
return results
def get_hypothesis_status(self) -> list[str]:
"""Return summaries of all hypotheses."""
return [h.summary() for h in self.hypotheses.values()]
def get_phenomena_by_category(self, category: str) -> list[Phenomenon]:
return [p for p in self.phenomena.values() if p.category == category]
def hypotheses_converged(self) -> bool:
"""True if no hypotheses are still active."""
return all(h.status != "active" for h in self.hypotheses.values())
def mark_remaining_inconclusive(self) -> None:
"""Mark all still-active hypotheses as inconclusive."""
for h in self.hypotheses.values():
if h.status == "active":
h.status = "inconclusive"
# ---- Summary (lightweight, for system prompt) ----------------------------
def stats_summary(self) -> str:
"""Ultra-compact stats for inclusion in system prompt."""
active_hyp = sum(1 for h in self.hypotheses.values() if h.status == "active")
return (
f"Graph: {len(self.phenomena)} phenomena, "
f"{len(self.hypotheses)} hypotheses ({active_hyp} active), "
f"{len(self.entities)} entities, {len(self.edges)} edges. "
f"Asset library: {len(self.asset_library)} extracted files. "
f"Pending leads: {sum(1 for l in self.leads if l.status == 'pending')}."
)