Event-Reified Temporal Provenance Dual-Granularity Prompting for LLM-based APT detection on DARPA provenance datasets. Includes phase 0-14 method spec, IR/graph/metapath/trimming/prompt modules, scripts for THEIA candidate universe, landmark CSG construction, hybrid prompting, and LLM inference. Excludes data/, reports/, and local LLM config from version control.
131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
"""Debugging-only synthetic graph fixture.
|
|
|
|
This fixture is not DARPA data and must not be used as an experimental result.
|
|
It only validates that the ER-TP-DGP pipeline preserves required structures.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from er_tp_dgp.constants import EntityType, NormalizedAction
|
|
from er_tp_dgp.graph import ProvenanceGraph
|
|
from er_tp_dgp.ir import EntityNode, EventNode
|
|
|
|
|
|
def build_synthetic_graph() -> ProvenanceGraph:
|
|
entities = [
|
|
EntityNode(
|
|
node_id="proc-parent",
|
|
node_type=EntityType.PROCESS.value,
|
|
stable_name="/usr/bin/python",
|
|
dataset="synthetic",
|
|
host="h1",
|
|
text_fields={"path": "/usr/bin/python", "command_line": "python updater.py"},
|
|
),
|
|
EntityNode(
|
|
node_id="proc-child",
|
|
node_type=EntityType.PROCESS.value,
|
|
stable_name="/tmp/payload",
|
|
dataset="synthetic",
|
|
host="h1",
|
|
text_fields={"path": "/tmp/payload", "command_line": "/tmp/payload --sync"},
|
|
optional_properties={"first_seen": True},
|
|
),
|
|
EntityNode(
|
|
node_id="file-payload",
|
|
node_type=EntityType.FILE.value,
|
|
stable_name="/tmp/payload",
|
|
dataset="synthetic",
|
|
host="h1",
|
|
text_fields={"path": "/tmp/payload"},
|
|
optional_properties={"first_seen": True},
|
|
),
|
|
EntityNode(
|
|
node_id="file-secret",
|
|
node_type=EntityType.FILE.value,
|
|
stable_name="/home/user/secret.txt",
|
|
dataset="synthetic",
|
|
host="h1",
|
|
text_fields={"path": "/home/user/secret.txt"},
|
|
),
|
|
EntityNode(
|
|
node_id="ip-c2",
|
|
node_type=EntityType.IP.value,
|
|
stable_name="8.8.8.8:443",
|
|
dataset="synthetic",
|
|
host="internet",
|
|
text_fields={"ip": "8.8.8.8", "port": "443"},
|
|
),
|
|
]
|
|
events = [
|
|
EventNode(
|
|
event_id="event-write",
|
|
raw_event_id="raw-1",
|
|
timestamp=1.0,
|
|
action="write",
|
|
normalized_action=NormalizedAction.WRITE.value,
|
|
actor_entity_id="proc-parent",
|
|
object_entity_id="file-payload",
|
|
host="h1",
|
|
raw_event_type="EVENT_WRITE",
|
|
),
|
|
EventNode(
|
|
event_id="event-create",
|
|
raw_event_id="raw-2",
|
|
timestamp=2.0,
|
|
action="create",
|
|
normalized_action=NormalizedAction.CREATE.value,
|
|
actor_entity_id="proc-parent",
|
|
object_entity_id="proc-child",
|
|
host="h1",
|
|
raw_event_type="EVENT_CREATE",
|
|
),
|
|
EventNode(
|
|
event_id="event-exec-file",
|
|
raw_event_id="raw-3",
|
|
timestamp=3.0,
|
|
action="exec",
|
|
normalized_action=NormalizedAction.EXEC.value,
|
|
actor_entity_id="proc-child",
|
|
object_entity_id="file-payload",
|
|
host="h1",
|
|
raw_event_type="EVENT_EXEC",
|
|
),
|
|
EventNode(
|
|
event_id="event-read",
|
|
raw_event_id="raw-4",
|
|
timestamp=4.0,
|
|
action="read",
|
|
normalized_action=NormalizedAction.READ.value,
|
|
actor_entity_id="proc-child",
|
|
object_entity_id="file-secret",
|
|
host="h1",
|
|
raw_event_type="EVENT_READ",
|
|
),
|
|
EventNode(
|
|
event_id="event-send",
|
|
raw_event_id="raw-5",
|
|
timestamp=5.0,
|
|
action="send",
|
|
normalized_action=NormalizedAction.SEND.value,
|
|
actor_entity_id="proc-child",
|
|
object_entity_id="ip-c2",
|
|
host="h1",
|
|
raw_event_type="EVENT_SEND",
|
|
raw_properties={"remote_ip": "8.8.8.8", "remote_port": 443},
|
|
),
|
|
]
|
|
return ProvenanceGraph(entities=entities, events=events)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from er_tp_dgp.metapaths import APTMetapathExtractor
|
|
from er_tp_dgp.prompt import PromptBuilder
|
|
from er_tp_dgp.trimming import TemporalSecurityAwareTrimmer
|
|
|
|
graph = build_synthetic_graph()
|
|
paths = APTMetapathExtractor(graph).extract_for_target("proc-child")
|
|
selected = TemporalSecurityAwareTrimmer(graph, top_m_per_metapath=3).trim("proc-child", paths)
|
|
bundle = PromptBuilder(graph).build("proc-child", selected)
|
|
print(bundle.prompt_text)
|
|
|