Files
ER-TP-DGP/tests/test_landmark.py
BattleTag b86ae87b75 Initial commit: ER-TP-DGP research prototype
Event-Reified Temporal Provenance Dual-Granularity Prompting for
LLM-based APT detection on DARPA provenance datasets.

Includes phase 0-14 method spec, IR/graph/metapath/trimming/prompt
modules, scripts for THEIA candidate universe, landmark CSG construction,
hybrid prompting, and LLM inference. Excludes data/, reports/, and
local LLM config from version control.
2026-05-15 16:53:57 +08:00

325 lines
12 KiB
Python

"""Tests for the Landmark-Bridged Causal Story Graph."""
from __future__ import annotations
import json
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from er_tp_dgp.landmark import (
StreamingLandmarkGraphBuilder,
build_landmark_graph,
compute_landmark_communities,
read_communities_jsonl,
read_edges_jsonl,
read_landmarks_jsonl,
write_communities_jsonl,
write_edges_jsonl,
write_landmarks_jsonl,
)
from er_tp_dgp.landmark_prompt import (
CommunityPromptSwitches,
LandmarkCommunityPromptBuilder,
)
PREFIX = "com.bbn.tc.schema.avro.cdm18."
def _wrap(record_type, payload):
return {"datum": {PREFIX + record_type: payload}}
def _make_synthetic_jsonl(path: Path) -> None:
"""Synthetic mini-attack: a process recv's, writes /tmp/payload, execs it,
the child connects to an external IP. Plus a benign sshd doing a routine
file read that should not produce a meaningful community."""
records = [
_wrap(
"Subject",
{
"uuid": "subj-attacker",
"type": "SUBJECT_PROCESS",
"hostId": "host-1",
"properties": {"map": {"path": "/tmp/dropper"}},
"cmdLine": {"string": "/tmp/dropper --foo"},
},
),
_wrap(
"Subject",
{
"uuid": "subj-child",
"type": "SUBJECT_PROCESS",
"hostId": "host-1",
"properties": {"map": {"path": "/tmp/payload"}},
"cmdLine": {"string": "/tmp/payload --beacon"},
},
),
_wrap(
"Subject",
{
"uuid": "subj-sshd",
"type": "SUBJECT_PROCESS",
"hostId": "host-1",
"properties": {"map": {"path": "/usr/sbin/sshd"}},
"cmdLine": {"string": "/usr/sbin/sshd -D"},
},
),
_wrap(
"NetFlowObject",
{
"uuid": "flow-incoming",
"remoteAddress": "192.168.1.5",
"remotePort": 4444,
"localAddress": "10.0.0.10",
"localPort": 5555,
},
),
_wrap(
"NetFlowObject",
{
"uuid": "flow-c2",
"remoteAddress": "8.8.4.4",
"remotePort": 443,
"localAddress": "10.0.0.10",
"localPort": 50001,
},
),
_wrap(
"FileObject",
{
"uuid": "file-payload",
"baseObject": {"properties": {"map": {"path": "/tmp/payload"}}},
},
),
_wrap(
"FileObject",
{
"uuid": "file-sshd-cfg",
"baseObject": {"properties": {"map": {"path": "/etc/ssh/sshd_config"}}},
},
),
# 1) attacker recv from incoming flow
_wrap(
"Event",
{
"uuid": "evt-recv",
"type": "EVENT_RECVFROM",
"timestampNanos": 1_000_000_000,
"subject": {PREFIX + "UUID": "subj-attacker"},
"predicateObject": {PREFIX + "UUID": "flow-incoming"},
},
),
# 2) attacker writes /tmp/payload
_wrap(
"Event",
{
"uuid": "evt-write",
"type": "EVENT_WRITE",
"timestampNanos": 2_000_000_000,
"subject": {PREFIX + "UUID": "subj-attacker"},
"predicateObject": {PREFIX + "UUID": "file-payload"},
},
),
# 3) attacker forks child
_wrap(
"Event",
{
"uuid": "evt-fork",
"type": "EVENT_FORK",
"timestampNanos": 3_000_000_000,
"subject": {PREFIX + "UUID": "subj-attacker"},
"predicateObject": {PREFIX + "UUID": "subj-child"},
},
),
# 4) child execs the payload
_wrap(
"Event",
{
"uuid": "evt-exec",
"type": "EVENT_EXECUTE",
"timestampNanos": 4_000_000_000,
"subject": {PREFIX + "UUID": "subj-child"},
"predicateObject": {PREFIX + "UUID": "file-payload"},
},
),
# 5) child connects to external C2
_wrap(
"Event",
{
"uuid": "evt-c2",
"type": "EVENT_CONNECT",
"timestampNanos": 5_000_000_000,
"subject": {PREFIX + "UUID": "subj-child"},
"predicateObject": {PREFIX + "UUID": "flow-c2"},
},
),
# 6) sshd reads a config file (benign, NO landmark)
_wrap(
"Event",
{
"uuid": "evt-sshd-read",
"type": "EVENT_READ",
"timestampNanos": 6_000_000_000,
"subject": {PREFIX + "UUID": "subj-sshd"},
"predicateObject": {PREFIX + "UUID": "file-sshd-cfg"},
},
),
]
path.write_text(
"\n".join(json.dumps(record, sort_keys=True) for record in records) + "\n",
encoding="utf-8",
)
class LandmarkGraphTests(unittest.TestCase):
def test_streaming_builds_attack_story(self):
with TemporaryDirectory() as tmp:
theia = Path(tmp) / "synthetic.json"
_make_synthetic_jsonl(theia)
landmarks, edges, stats = build_landmark_graph([theia])
# Landmark counts: suspicious_actor_path on attacker (evt-recv is its
# first event), external_flow on evt-c2, write_then_execute and
# process_creation on evt-exec, suspicious_object_path on the file
# write/exec, recv_then_write on evt-write, process_creation on
# evt-fork. evt-sshd-read must NOT be a landmark.
ids = {lm.event_id for lm in landmarks}
self.assertIn("evt-recv", ids) # suspicious_actor_path
self.assertIn("evt-write", ids) # recv_then_write + suspicious_object_path
self.assertIn("evt-fork", ids) # process_creation
self.assertIn("evt-exec", ids) # write_then_execute + process_creation
self.assertIn("evt-c2", ids) # external_flow
self.assertNotIn("evt-sshd-read", ids)
# Every landmark must carry at least one class label.
for lm in landmarks:
self.assertTrue(lm.landmark_classes, f"missing classes on {lm.event_id}")
# Edges should connect the attack story chronologically.
self.assertGreater(len(edges), 0)
for edge in edges:
self.assertGreater(edge.delta_nanos, 0)
self.assertEqual(edge.host_id, "host-1")
# In a healthy CSG over this fixture, there must be at least one path
# from evt-recv to evt-c2 (attack timeline).
adjacency = {}
for edge in edges:
adjacency.setdefault(edge.src_event_id, set()).add(edge.dst_event_id)
seen = {"evt-recv"}
frontier = {"evt-recv"}
while frontier:
new_frontier = set()
for node in frontier:
for nxt in adjacency.get(node, ()):
if nxt not in seen:
seen.add(nxt)
new_frontier.add(nxt)
frontier = new_frontier
self.assertIn(
"evt-c2",
seen,
f"attack story should propagate to evt-c2 via causal bridges, reached={sorted(seen)}",
)
# Stats sanity.
self.assertEqual(stats.landmarks, len(landmarks))
self.assertEqual(stats.edges, len(edges))
self.assertGreater(stats.events_seen, 0)
def test_communities_yield_one_attack_subgraph(self):
with TemporaryDirectory() as tmp:
theia = Path(tmp) / "synthetic.json"
_make_synthetic_jsonl(theia)
landmarks, edges, _ = build_landmark_graph([theia])
communities = compute_landmark_communities(landmarks, edges)
self.assertEqual(len(communities), 1)
community = communities[0]
self.assertGreaterEqual(len(community.landmark_event_ids), 4)
self.assertIn("subj-attacker", community.subjects)
self.assertIn("subj-child", community.subjects)
self.assertNotIn("subj-sshd", community.subjects)
self.assertGreater(community.span_seconds, 0)
self.assertIn("write_then_execute", community.landmark_class_counts)
def test_jsonl_roundtrip(self):
with TemporaryDirectory() as tmp:
theia = Path(tmp) / "synthetic.json"
_make_synthetic_jsonl(theia)
landmarks, edges, _ = build_landmark_graph([theia])
communities = compute_landmark_communities(landmarks, edges)
lm_path = Path(tmp) / "landmarks.jsonl"
edge_path = Path(tmp) / "edges.jsonl"
com_path = Path(tmp) / "communities.jsonl"
write_landmarks_jsonl(landmarks, lm_path)
write_edges_jsonl(edges, edge_path)
write_communities_jsonl(communities, com_path)
self.assertEqual(len(read_landmarks_jsonl(lm_path)), len(landmarks))
self.assertEqual(len(read_edges_jsonl(edge_path)), len(edges))
roundtrip_communities = read_communities_jsonl(com_path)
self.assertEqual(len(roundtrip_communities), len(communities))
self.assertEqual(
roundtrip_communities[0].community_id, communities[0].community_id
)
def test_no_ground_truth_in_construction(self):
"""The CSG construction must depend ONLY on raw THEIA records.
Construct a 'malicious' record stream and a 'benign' record stream
that differ only in process path heuristics; the algorithm must
produce more landmarks and a meaningful community for the malicious
stream without seeing any label or atom_id.
"""
with TemporaryDirectory() as tmp:
theia = Path(tmp) / "synthetic.json"
_make_synthetic_jsonl(theia)
landmarks, edges, _ = build_landmark_graph([theia])
communities = compute_landmark_communities(landmarks, edges)
# Build a community-level prompt and verify it never mentions
# "atom_id" / "ground_truth" / "label".
landmarks_by_id = {lm.event_id: lm for lm in landmarks}
edges_by_id = {edge.edge_id: edge for edge in edges}
builder = LandmarkCommunityPromptBuilder(
landmarks_by_id=landmarks_by_id,
edges_by_id=edges_by_id,
switches=CommunityPromptSwitches(max_landmarks_in_prompt=20),
)
bundle = builder.build(communities[0])
prompt_lower = bundle.prompt_text.lower()
for forbidden in ("atom_id", "ground_truth", "ground truth", "label_source", "label="):
self.assertNotIn(forbidden, prompt_lower)
self.assertIn("evt-c2", bundle.prompt_text)
self.assertIn("yes or no", prompt_lower)
def test_streaming_progress_callback(self):
with TemporaryDirectory() as tmp:
theia = Path(tmp) / "synthetic.json"
_make_synthetic_jsonl(theia)
builder = StreamingLandmarkGraphBuilder()
# progress_every=1 should trigger at least one print without raising.
from io import StringIO
import contextlib
buf = StringIO()
from er_tp_dgp.theia import iter_theia_records
with contextlib.redirect_stdout(buf):
builder.feed_iterable(iter_theia_records([theia]), progress_every=5)
text = buf.getvalue()
# Either at least one progress line, or the stream was shorter than
# the threshold — both are acceptable, but the print path must not
# explode.
if "[progress]" in text:
self.assertIn("records=", text)
if __name__ == "__main__":
unittest.main()