"""Debugging-only synthetic graph fixture. This fixture is not DARPA data and must not be used as an experimental result. It only validates that the ER-TP-DGP pipeline preserves required structures. """ from __future__ import annotations from er_tp_dgp.constants import EntityType, NormalizedAction from er_tp_dgp.graph import ProvenanceGraph from er_tp_dgp.ir import EntityNode, EventNode def build_synthetic_graph() -> ProvenanceGraph: entities = [ EntityNode( node_id="proc-parent", node_type=EntityType.PROCESS.value, stable_name="/usr/bin/python", dataset="synthetic", host="h1", text_fields={"path": "/usr/bin/python", "command_line": "python updater.py"}, ), EntityNode( node_id="proc-child", node_type=EntityType.PROCESS.value, stable_name="/tmp/payload", dataset="synthetic", host="h1", text_fields={"path": "/tmp/payload", "command_line": "/tmp/payload --sync"}, optional_properties={"first_seen": True}, ), EntityNode( node_id="file-payload", node_type=EntityType.FILE.value, stable_name="/tmp/payload", dataset="synthetic", host="h1", text_fields={"path": "/tmp/payload"}, optional_properties={"first_seen": True}, ), EntityNode( node_id="file-secret", node_type=EntityType.FILE.value, stable_name="/home/user/secret.txt", dataset="synthetic", host="h1", text_fields={"path": "/home/user/secret.txt"}, ), EntityNode( node_id="ip-c2", node_type=EntityType.IP.value, stable_name="8.8.8.8:443", dataset="synthetic", host="internet", text_fields={"ip": "8.8.8.8", "port": "443"}, ), ] events = [ EventNode( event_id="event-write", raw_event_id="raw-1", timestamp=1.0, action="write", normalized_action=NormalizedAction.WRITE.value, actor_entity_id="proc-parent", object_entity_id="file-payload", host="h1", raw_event_type="EVENT_WRITE", ), EventNode( event_id="event-create", raw_event_id="raw-2", timestamp=2.0, action="create", normalized_action=NormalizedAction.CREATE.value, actor_entity_id="proc-parent", object_entity_id="proc-child", host="h1", raw_event_type="EVENT_CREATE", ), EventNode( event_id="event-exec-file", raw_event_id="raw-3", timestamp=3.0, action="exec", normalized_action=NormalizedAction.EXEC.value, actor_entity_id="proc-child", object_entity_id="file-payload", host="h1", raw_event_type="EVENT_EXEC", ), EventNode( event_id="event-read", raw_event_id="raw-4", timestamp=4.0, action="read", normalized_action=NormalizedAction.READ.value, actor_entity_id="proc-child", object_entity_id="file-secret", host="h1", raw_event_type="EVENT_READ", ), EventNode( event_id="event-send", raw_event_id="raw-5", timestamp=5.0, action="send", normalized_action=NormalizedAction.SEND.value, actor_entity_id="proc-child", object_entity_id="ip-c2", host="h1", raw_event_type="EVENT_SEND", raw_properties={"remote_ip": "8.8.8.8", "remote_port": 443}, ), ] return ProvenanceGraph(entities=entities, events=events) if __name__ == "__main__": from er_tp_dgp.metapaths import APTMetapathExtractor from er_tp_dgp.prompt import PromptBuilder from er_tp_dgp.trimming import TemporalSecurityAwareTrimmer graph = build_synthetic_graph() paths = APTMetapathExtractor(graph).extract_for_target("proc-child") selected = TemporalSecurityAwareTrimmer(graph, top_m_per_metapath=3).trim("proc-child", paths) bundle = PromptBuilder(graph).build("proc-child", selected) print(bundle.prompt_text)