Files
JANUS/scripts/extract_lib.py

775 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import os
import shutil
import socket
import sys
import tempfile
import time as _time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
import dpkt
import numpy as np
import pandas as pd
from scipy.optimize import linear_sum_assignment
_SCRIPT_DIR = Path(__file__).resolve().parent
_REPO_ROOT = _SCRIPT_DIR.parent
sys.path.insert(0, str(_REPO_ROOT / 'Packet_CFM'))
from packet_store import PacketShardWriter
PACKET_FEATURE_NAMES = ('log_size', 'log_dt_ms', 'direction', 'tcp_syn', 'tcp_fin', 'tcp_rst', 'tcp_psh', 'tcp_ack', 'log_win')
PACKET_D = len(PACKET_FEATURE_NAMES)
(FIN, SYN, RST, PSH, ACK) = (1, 2, 4, 8, 16)
@dataclass(slots=True)
class PacketRecord:
timestamp: float
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: int
tcp_flags: int
payload_len: int
header_len: int
total_len: int
window_size: int
def _try_open_pcap(f):
try:
return dpkt.pcap.Reader(f)
except ValueError:
f.seek(0)
return dpkt.pcapng.Reader(f)
def iter_packets(pcap_path: Path, max_packets: int | None=None) -> Iterator[PacketRecord]:
n = 0
with open(pcap_path, 'rb') as f:
reader = _try_open_pcap(f)
link_type = reader.datalink()
for (ts, buf) in reader:
try:
if link_type == dpkt.pcap.DLT_EN10MB:
eth = dpkt.ethernet.Ethernet(buf)
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = eth.data
elif link_type == dpkt.pcap.DLT_RAW:
ip = dpkt.ip.IP(buf)
elif link_type == dpkt.pcap.DLT_LINUX_SLL:
sll = dpkt.sll.SLL(buf)
if sll.ethtype != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = sll.data
else:
continue
if not isinstance(ip, dpkt.ip.IP):
continue
src_ip = socket.inet_ntoa(ip.src)
dst_ip = socket.inet_ntoa(ip.dst)
transport = ip.data
if isinstance(transport, dpkt.tcp.TCP):
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=6, tcp_flags=transport.flags, payload_len=len(transport.data), header_len=transport.off * 4, total_len=ip.len, window_size=transport.win)
elif isinstance(transport, dpkt.udp.UDP):
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=17, tcp_flags=0, payload_len=len(transport.data), header_len=8, total_len=ip.len, window_size=0)
else:
continue
except (dpkt.NeedData, dpkt.UnpackError, AttributeError):
continue
n += 1
if max_packets is not None and n >= max_packets:
return
def _packet_token(pkt: PacketRecord, prev_ts: float | None, direction: int) -> np.ndarray:
dt_ms = 0.0 if prev_ts is None else max(0.0, (pkt.timestamp - prev_ts) * 1000.0)
syn = int(bool(pkt.tcp_flags & SYN))
fin = int(bool(pkt.tcp_flags & FIN))
rst = int(bool(pkt.tcp_flags & RST))
psh = int(bool(pkt.tcp_flags & PSH))
ack = int(bool(pkt.tcp_flags & ACK))
return np.array([float(np.log1p(max(pkt.total_len, 0))), float(np.log1p(dt_ms)), float(direction), syn, fin, rst, psh, ack, float(np.log1p(max(pkt.window_size, 0)))], dtype=np.float32)
class _TokenFlow:
__slots__ = ('key_fwd', 'start_ts', 'last_ts', 'fin_count', 'tokens', 'prev_ts', 'n_pkts')
def __init__(self, key_fwd: tuple, start_ts: float) -> None:
self.key_fwd = key_fwd
self.start_ts = start_ts
self.last_ts = start_ts
self.fin_count = 0
self.tokens: list[np.ndarray] = []
self.prev_ts: float | None = None
self.n_pkts: int = 0
def add(self, pkt: PacketRecord, is_forward: bool, max_len: int) -> None:
direction = 0 if is_forward else 1
if len(self.tokens) < max_len:
self.tokens.append(_packet_token(pkt, self.prev_ts, direction))
self.prev_ts = pkt.timestamp
self.last_ts = pkt.timestamp
self.n_pkts += 1
def stream_token_flows(packet_iter: Iterator[PacketRecord], idle_timeout: float, max_len: int, gc_every: int=200000) -> Iterator[_TokenFlow]:
active: dict[tuple, _TokenFlow] = {}
last_pkt_ts = 0.0
n_seen = 0
for pkt in packet_iter:
last_pkt_ts = pkt.timestamp
fwd_key = (pkt.src_ip, pkt.dst_ip, pkt.src_port, pkt.dst_port, pkt.protocol)
bwd_key = (pkt.dst_ip, pkt.src_ip, pkt.dst_port, pkt.src_port, pkt.protocol)
flow: _TokenFlow | None = None
key = fwd_key
is_forward = True
if fwd_key in active:
(flow, key, is_forward) = (active[fwd_key], fwd_key, True)
elif bwd_key in active:
(flow, key, is_forward) = (active[bwd_key], bwd_key, False)
if flow is not None and pkt.timestamp - flow.last_ts > idle_timeout:
old = active.pop(key)
yield old
flow = None
if flow is None:
flow = _TokenFlow(key_fwd=fwd_key, start_ts=pkt.timestamp)
key = fwd_key
is_forward = True
active[key] = flow
flow.add(pkt, is_forward, max_len)
if pkt.protocol == 6:
if pkt.tcp_flags & RST:
yield active.pop(key)
elif pkt.tcp_flags & FIN:
flow.fin_count += 1
if flow.fin_count >= 2:
yield active.pop(key)
n_seen += 1
if n_seen % gc_every == 0:
stale = [k for (k, fl) in active.items() if last_pkt_ts - fl.last_ts > idle_timeout]
for k in stale:
yield active.pop(k)
for fl in list(active.values()):
yield fl
active.clear()
def _canonical_key(src_ip: str, dst_ip: str, src_port: int, dst_port: int, proto: int) -> tuple:
a = (src_ip, src_port)
b = (dst_ip, dst_port)
if a <= b:
return (a[0], a[1], b[0], b[1], proto)
return (b[0], b[1], a[0], a[1], proto)
def _to_fixed_tensor(flow_tokens: list[np.ndarray], max_len: int) -> np.ndarray:
out = np.zeros((max_len, PACKET_D), dtype=np.float32)
n = min(len(flow_tokens), max_len)
if n > 0:
out[:n] = np.stack(flow_tokens[:n], axis=0)
return out
class _WorkerChunkWriter:
def __init__(self, root: Path, *, prefix: str, T_full: int, chunk_size: int) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self.prefix = prefix
self.T_full = T_full
self.chunk_size = max(1, int(chunk_size))
self._tokens: list[np.ndarray] = []
self._records: list[dict] = []
self._next_chunk = 0
self.chunks: list[dict[str, str]] = []
def add_csv_match(self, row_i: int, tok: np.ndarray, ln: int, meta: dict) -> None:
rec = dict(meta)
rec['csv_row_idx'] = int(row_i)
rec['packet_length'] = int(ln)
self._add(tok, rec)
def add_labeled(self, tok: np.ndarray, ln: int, meta: dict, label: str, extra: dict) -> None:
rec = dict(meta)
rec['packet_length'] = int(ln)
rec['label'] = str(label)
for (k, v) in extra.items():
rec[str(k)] = v
self._add(tok, rec)
def close(self) -> list[dict[str, str]]:
if self._tokens:
self._flush()
return self.chunks
def _add(self, tok: np.ndarray, rec: dict) -> None:
self._tokens.append(tok.astype(np.float32, copy=False))
self._records.append(rec)
if len(self._tokens) >= self.chunk_size:
self._flush()
def _flush(self) -> None:
n = len(self._tokens)
tokens = np.empty((n, self.T_full, PACKET_D), dtype=np.float32)
for (i, tok) in enumerate(self._tokens):
tokens[i] = tok
stem = f'{self.prefix}-chunk-{self._next_chunk:06d}'
token_path = self.root / f'{stem}.npy'
meta_path = self.root / f'{stem}.parquet'
np.save(token_path, tokens, allow_pickle=False)
pd.DataFrame(self._records).to_parquet(meta_path, compression='snappy', index=False)
self.chunks.append({'tokens': str(token_path), 'meta': str(meta_path)})
self._tokens.clear()
self._records.clear()
self._next_chunk += 1
def _flow_meta(fl: _TokenFlow) -> dict:
(sip, dip, sp, dp, proto) = fl.key_fwd
return {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
def _build_stream_csv_index(csv_rows_for_day: dict[tuple, list[tuple[int, float]]]) -> dict[tuple, dict[str, np.ndarray]]:
out: dict[tuple, dict[str, np.ndarray]] = {}
for (ck, rows) in csv_rows_for_day.items():
finite = [(int(row_i), float(ts)) for (row_i, ts) in rows if not np.isnan(ts)]
if not finite:
continue
finite.sort(key=lambda x: (x[1], x[0]))
row_idx = np.asarray([r for (r, _) in finite], dtype=np.int64)
ts = np.asarray([t for (_, t) in finite], dtype=np.float64)
used = np.zeros(len(finite), dtype=bool)
out[ck] = {'row_idx': row_idx, 'ts': ts, 'used': used}
return out
def _nearest_unused_row(entry: dict[str, np.ndarray], ts: float, tolerance: float) -> tuple[int | None, float | None]:
csv_ts = entry['ts']
used = entry['used']
pos = int(np.searchsorted(csv_ts, ts, side='left'))
best_i: int | None = None
best_abs = float('inf')
j = pos - 1
while j >= 0:
diff = abs(float(csv_ts[j]) - ts)
if diff > tolerance:
break
if not bool(used[j]) and diff < best_abs:
best_i = j
best_abs = diff
j -= 1
j = pos
n = len(csv_ts)
while j < n:
diff = abs(float(csv_ts[j]) - ts)
if diff > tolerance:
break
if not bool(used[j]) and diff < best_abs:
best_i = j
best_abs = diff
j += 1
if best_i is None:
return (None, None)
used[best_i] = True
return (int(entry['row_idx'][best_i]), ts - float(csv_ts[best_i]))
def _extract_day_worker(day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000, match_strategy: str='hungarian') -> dict:
if match_strategy == 'stream_nearest':
if spool_dir is None:
raise ValueError('stream_nearest requires spool_dir')
return _extract_day_worker_stream_nearest(day=day, pcap_files_str=pcap_files_str, csv_rows_for_day=csv_rows_for_day, max_len=max_len, idle_timeout=idle_timeout, time_tolerance_seconds=time_tolerance_seconds, max_packets_per_pcap=max_packets_per_pcap, spool_dir=spool_dir, worker_flush_size=worker_flush_size)
pcap_by_key: dict[tuple, list[_TokenFlow]] = defaultdict(list)
n_pkts = 0
t_start = _time.time()
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
for pcap_path_str in pcap_files_str:
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
(sip, dip, sp, dp, proto) = fl.key_fwd
ck = _canonical_key(sip, dip, sp, dp, proto)
pcap_by_key[ck].append(fl)
n_flows = sum((len(v) for v in pcap_by_key.values()))
elapsed = _time.time() - t_start
BIG = time_tolerance_seconds * 1000.0
results: list[tuple[int, np.ndarray, int, dict]] = []
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
n_joined = 0
n_collision = 0
n_csv_keys = len(csv_rows_for_day)
n_intersection = 0
def _emit(row_i: int, fl: _TokenFlow) -> None:
nonlocal n_joined
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
meta = _flow_meta(fl)
if chunk_writer is not None:
chunk_writer.add_csv_match(row_i, tok, ln, meta)
else:
results.append((row_i, tok, ln, meta))
n_joined += 1
for (ck, rows) in sorted(csv_rows_for_day.items(), key=lambda kv: kv[1][0][0]):
if ck not in pcap_by_key:
continue
n_intersection += 1
pcap_flows = pcap_by_key[ck]
csv_ts = np.array([r[1] for r in rows], dtype=np.float64)
pcap_ts = np.array([fl.start_ts for fl in pcap_flows], dtype=np.float64)
(n_csv, n_pcap) = (len(csv_ts), len(pcap_ts))
if n_csv == 1 and n_pcap == 1:
row_i = rows[0][0]
ts = csv_ts[0]
fl = pcap_flows[0]
if not np.isnan(ts) and abs(fl.start_ts - ts) <= time_tolerance_seconds:
_emit(row_i, fl)
else:
n_collision += 1
continue
cost = np.abs(csv_ts[:, None] - pcap_ts[None, :])
cost[np.isnan(cost)] = BIG
cost[cost > time_tolerance_seconds] = BIG
(row_ind, col_ind) = linear_sum_assignment(cost)
for (r, c) in zip(row_ind, col_ind):
if cost[r, c] >= BIG:
n_collision += 1
continue
row_i = rows[r][0]
fl = pcap_flows[c]
_emit(row_i, fl)
deltas: list[float] = []
sampled = 0
for (ck, rows) in csv_rows_for_day.items():
if sampled >= 10000 or ck not in pcap_by_key:
if sampled >= 10000:
break
continue
(row_i, ts) = rows[0]
if np.isnan(ts):
continue
deltas.append(pcap_by_key[ck][0].start_ts - ts)
sampled += 1
return {'day': day, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(pcap_by_key), 'n_csv_keys': n_csv_keys, 'n_intersection': n_intersection, 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': match_strategy}
def _extract_day_worker_stream_nearest(*, day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str, worker_flush_size: int) -> dict:
t_start = _time.time()
n_pkts = 0
n_flows = 0
n_joined = 0
n_collision = 0
seen_pcap_keys: set[tuple] = set()
intersected_keys: set[tuple] = set()
deltas: list[float] = []
csv_index = _build_stream_csv_index(csv_rows_for_day)
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size)
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
for pcap_path_str in pcap_files_str:
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
n_flows += 1
(sip, dip, sp, dp, proto) = fl.key_fwd
ck = _canonical_key(sip, dip, sp, dp, proto)
seen_pcap_keys.add(ck)
entry = csv_index.get(ck)
if entry is None:
continue
intersected_keys.add(ck)
(row_i, delta) = _nearest_unused_row(entry, float(fl.start_ts), time_tolerance_seconds)
if row_i is None:
n_collision += 1
continue
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
chunk_writer.add_csv_match(row_i, tok, ln, _flow_meta(fl))
n_joined += 1
if delta is not None and len(deltas) < 10000:
deltas.append(float(delta))
elapsed = _time.time() - t_start
return {'day': day, 'results': [], 'chunks': chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(seen_pcap_keys), 'n_csv_keys': len(csv_rows_for_day), 'n_intersection': len(intersected_keys), 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': 'stream_nearest'}
def _print_day_stats(res: dict) -> None:
day = res['day']
strategy = res.get('match_strategy', 'hungarian')
print(f"[{day}] {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s match={strategy} ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
print(f" pcap_keys={res['n_pcap_keys']:,} csv_keys={res['n_csv_keys']:,} intersection={res['n_intersection']:,} joined={int(res.get('n_joined', len(res.get('results', ())))):,} within-key-miss={res['n_collision']:,}")
deltas = res.get('deltas') or []
if deltas:
arr = np.asarray(deltas, dtype=np.float64)
print(f' time-delta (pcap_start - csv_ts), seconds: median={np.median(arr):+.2f} mean={arr.mean():+.2f} std={arr.std():.2f} p05={np.percentile(arr, 5):+.2f} p95={np.percentile(arr, 95):+.2f}')
med = float(np.median(arr))
if abs(med) > 2.0:
print(f' -> median |{med:.1f}s| > 2s: rerun with --time-offset {med:.0f}')
def extract_dataset(*, csv_rows_by_day: dict[str, dict[tuple, list[tuple[int, float]]]], labels_by_row: np.ndarray, pcap_files_by_day: dict[str, list[Path]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, match_strategy: str | None=None, T_full: int=256, idle_timeout: float=120.0, time_tolerance_seconds: float=2.0, max_packets_per_pcap: int | None=None, n_jobs: int=0) -> None:
N_csv = len(labels_by_row)
print(f'[extract_dataset] N_csv={N_csv:,} T_full={T_full} days={sorted(csv_rows_by_day.keys())}')
if match_strategy is None:
match_strategy = 'stream_nearest' if out_store is not None else 'hungarian'
if match_strategy not in ('hungarian', 'stream_nearest'):
raise ValueError("match_strategy must be 'hungarian' or 'stream_nearest'")
if match_strategy == 'stream_nearest' and out_store is None:
raise ValueError('stream_nearest is only supported with --out-store')
print(f'[extract_dataset] match_strategy={match_strategy}')
tasks: list[tuple] = []
for (day, rows_dict) in csv_rows_by_day.items():
pcap_files = pcap_files_by_day.get(day, [])
if not pcap_files:
print(f'[{day}] NO pcap files — skipping ({len(rows_dict):,} CSV keys unmatched)')
continue
tasks.append((day, [str(p) for p in pcap_files], dict(rows_dict)))
if not tasks:
raise RuntimeError('No days with pcap files — nothing to extract.')
if n_jobs <= 0:
n_jobs = min(len(tasks), os.cpu_count() or 1)
print(f'[extract_dataset] running {len(tasks)} day(s) with {n_jobs} worker(s)')
store_writer: PacketShardWriter | None = None
spool_root: Path | None = None
if out_store is not None:
print(f'[extract_dataset] sharded output enabled: {out_store} shard_size={shard_size:,}')
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
if spool_dir is None:
out_store_parent = Path(out_store).parent
out_store_parent.mkdir(parents=True, exist_ok=True)
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
else:
spool_root = Path(spool_dir)
if spool_root.exists():
shutil.rmtree(spool_root)
spool_root.mkdir(parents=True, exist_ok=True)
print(f'[extract_dataset] worker spool={spool_root} flush_size={worker_flush_size:,}')
tok_chunks: list[np.ndarray] = []
len_chunks: list[np.ndarray] = []
row_chunks: list[np.ndarray] = []
meta_chunks: list[list[dict]] = []
total_joined = 0
def _materialize_results(results: list[tuple[int, np.ndarray, int, dict]]) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict]]:
results = sorted(results, key=lambda x: x[0])
n = len(results)
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
len_arr = np.empty(n, dtype=np.int32)
row_arr = np.empty(n, dtype=np.int64)
meta_arr: list[dict] = [None] * n
for (i, (row_i, tok, ln, meta)) in enumerate(results):
tok_arr[i] = tok
len_arr[i] = ln
row_arr[i] = row_i
meta_arr[i] = meta
return (tok_arr, len_arr, row_arr, meta_arr)
def _flows_from_meta(row_arr: np.ndarray, meta_arr: list[dict]) -> pd.DataFrame:
labels = labels_by_row[row_arr].astype(str)
return pd.DataFrame({'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)})
def _append_spool_chunks(res: dict) -> None:
chunks = res.get('chunks') or []
for chunk in chunks:
tokens = np.load(chunk['tokens'], mmap_mode='r')
meta_df = pd.read_parquet(chunk['meta'])
if meta_df.empty:
continue
meta_df = meta_df.assign(__token_row=np.arange(len(meta_df), dtype=np.int64))
meta_df = meta_df.sort_values('csv_row_idx', kind='stable').reset_index(drop=True)
row_arr = meta_df['csv_row_idx'].to_numpy(dtype=np.int64)
lengths = meta_df['packet_length'].to_numpy(dtype=np.int32)
order = meta_df['__token_row'].to_numpy(dtype=np.int64)
labels = labels_by_row[row_arr].astype(str)
flows = pd.DataFrame({'label': labels, 'start_ts': meta_df['start_ts'].to_numpy(dtype=np.float64), 'src_ip': meta_df['src_ip'].to_numpy(dtype=object), 'dst_ip': meta_df['dst_ip'].to_numpy(dtype=object), 'src_port': meta_df['src_port'].to_numpy(dtype=np.uint32), 'dst_port': meta_df['dst_port'].to_numpy(dtype=np.uint32), 'protocol': meta_df['protocol'].to_numpy(dtype=np.uint8), 'n_pkts': meta_df['n_pkts'].to_numpy(dtype=np.uint32)})
assert store_writer is not None
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
def _absorb(res: dict, *, print_stats: bool=True) -> None:
if print_stats:
_print_day_stats(res)
results = res['results']
if not results:
return
(tok_arr, len_arr, row_arr, meta_arr) = _materialize_results(results)
if store_writer is not None:
store_writer.add_batch(tok_arr, len_arr, _flows_from_meta(row_arr, meta_arr))
else:
tok_chunks.append(tok_arr)
len_chunks.append(len_arr)
row_chunks.append(row_arr)
meta_chunks.append(meta_arr)
if n_jobs <= 1:
try:
for (i, (day, pcaps, rows)) in enumerate(tasks):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
res = _extract_day_worker(day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy)
_print_day_stats(res)
total_joined += int(res.get('n_joined', 0))
if store_writer is not None:
_append_spool_chunks(res)
else:
_absorb(res, print_stats=False)
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
else:
try:
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
futs = []
for (i, (day, pcaps, rows)) in enumerate(tasks):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
futs.append(pool.submit(_extract_day_worker, day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy))
if store_writer is not None:
completed: dict[str, dict] = {}
for fut in as_completed(futs):
res = fut.result()
_print_day_stats(res)
completed[res['day']] = res
for (day, _, _) in tasks:
if day in completed:
total_joined += int(completed[day].get('n_joined', 0))
_append_spool_chunks(completed[day])
else:
for fut in as_completed(futs):
_absorb(fut.result())
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
if store_writer is not None:
if total_joined == 0:
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
store_writer.close()
print(f'[extract_dataset] wrote sharded store {out_store}')
return
if not tok_chunks:
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
tokens = np.concatenate(tok_chunks, axis=0)
lengths = np.concatenate(len_chunks, axis=0)
csv_rows = np.concatenate(row_chunks, axis=0)
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
del tok_chunks, len_chunks, row_chunks, meta_chunks
order = np.argsort(csv_rows, kind='stable')
tokens = tokens[order]
lengths = lengths[order]
csv_rows = csv_rows[order]
meta_list = [meta_list[i] for i in order]
N_matched = len(tokens)
labels = labels_by_row[csv_rows].astype(str)
flow_id = np.arange(N_matched, dtype=np.uint64)
print(f'\n[extract_dataset] matched {N_matched:,}/{N_csv:,} ({100.0 * N_matched / max(N_csv, 1):.2f}%)')
print(f'[extract_dataset] label distribution (matched rows):')
(ulabels, counts) = np.unique(labels, return_counts=True)
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
print(f' {lbl:<40s} {cnt:>10,}')
out_packets.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
print(f'[extract_dataset] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
out_flows.parent.mkdir(parents=True, exist_ok=True)
flow_df = pd.DataFrame({'flow_id': flow_id, 'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)})
flow_df.to_parquet(out_flows, compression='snappy', index=False)
print(f'[extract_dataset] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB)')
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels, out_path=out_flows.parent / 'flow_features.parquet')
def _write_canonical_flow_features(*, tokens: np.ndarray, lengths: np.ndarray, flow_id: np.ndarray, labels: np.ndarray, out_path: Path) -> None:
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from common.data_contract import CANONICAL_FLOW_FEATURE_NAMES, compute_flow_features_from_packets
print(f'[extract_dataset] computing canonical {len(CANONICAL_FLOW_FEATURE_NAMES)}-d flow features from packet tokens ...')
feats = compute_flow_features_from_packets(tokens, lengths)
out_path.parent.mkdir(parents=True, exist_ok=True)
df = pd.DataFrame({'flow_id': flow_id, 'label': labels})
for (i, name) in enumerate(CANONICAL_FLOW_FEATURE_NAMES):
df[name] = feats[:, i]
df.to_parquet(out_path, compression='snappy', index=False)
print(f'[extract_dataset] wrote {out_path} ({out_path.stat().st_size / 1000000.0:.2f} MB)')
def _extract_single_pcap_worker(pcap_path_str: str, label: str, extra: dict, max_len: int, idle_timeout: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000) -> dict:
t_start = _time.time()
n_pkts = 0
n_flows = 0
results: list[tuple[np.ndarray, int, dict]] = []
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'pcap-{Path(pcap_path_str).stem}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
(sip, dip, sp, dp, proto) = fl.key_fwd
meta = {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
if chunk_writer is not None:
chunk_writer.add_labeled(tok, ln, meta, label, extra)
else:
results.append((tok, ln, meta))
n_flows += 1
elapsed = _time.time() - t_start
return {'pcap': pcap_path_str, 'label': label, 'extra': extra, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed}
def extract_labeled_pcaps(*, pcap_files_with_labels: list[tuple[Path, str, dict]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, T_full: int=256, idle_timeout: float=120.0, max_packets_per_pcap: int | None=None, n_jobs: int=0, extra_column_names: tuple[str, ...]=()) -> None:
N_pcap = len(pcap_files_with_labels)
print(f'[extract_labeled_pcaps] n_pcaps={N_pcap} T_full={T_full} extra_cols={extra_column_names}')
for (p, lbl, extra) in pcap_files_with_labels[:10]:
print(f' {lbl:<20s} {Path(p).name:<60s} extra={extra}')
if N_pcap > 10:
print(f' ... ({N_pcap - 10} more)')
if n_jobs <= 0:
n_jobs = min(N_pcap, os.cpu_count() or 1)
print(f'[extract_labeled_pcaps] running {N_pcap} pcap(s) with {n_jobs} worker(s)')
store_writer: PacketShardWriter | None = None
spool_root: Path | None = None
if out_store is not None:
print(f'[extract_labeled_pcaps] sharded output enabled: {out_store} shard_size={shard_size:,}')
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
if spool_dir is None:
out_store_parent = Path(out_store).parent
out_store_parent.mkdir(parents=True, exist_ok=True)
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
else:
spool_root = Path(spool_dir)
if spool_root.exists():
shutil.rmtree(spool_root)
spool_root.mkdir(parents=True, exist_ok=True)
print(f'[extract_labeled_pcaps] worker spool={spool_root} flush_size={worker_flush_size:,}')
tok_chunks: list[np.ndarray] = []
len_chunks: list[np.ndarray] = []
meta_chunks: list[list[dict]] = []
label_chunks: list[np.ndarray] = []
extra_chunks: list[dict[str, list]] = []
total_flows = 0
def _flows_for_labeled_chunk(res: dict, meta_arr: list[dict], n: int) -> pd.DataFrame:
cols = {'label': np.full(n, res['label'], dtype=object), 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)}
for col in extra_column_names:
cols[col] = np.full(n, res['extra'].get(col, ''), dtype=object)
return pd.DataFrame(cols)
def _append_labeled_spool_chunks(res: dict) -> None:
chunks = res.get('chunks') or []
for chunk in chunks:
tokens = np.load(chunk['tokens'], mmap_mode='r')
flows = pd.read_parquet(chunk['meta'])
if flows.empty:
continue
flows = flows.assign(__token_row=np.arange(len(flows), dtype=np.int64))
sort_keys = ['label', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'start_ts']
flows = flows.sort_values(sort_keys, kind='stable').reset_index(drop=True)
order = flows['__token_row'].to_numpy(dtype=np.int64)
lengths = flows['packet_length'].to_numpy(dtype=np.int32)
flows = flows.drop(columns=['packet_length', '__token_row'])
assert store_writer is not None
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
def _absorb(res: dict, *, print_stats: bool=True) -> None:
pcap_name = Path(res['pcap']).name
lbl = res['label']
extra = res['extra']
if print_stats:
print(f"[pcap:{pcap_name}] label={lbl} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
if not res['results']:
return
n = len(res['results'])
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
len_arr = np.empty(n, dtype=np.int32)
meta_arr: list[dict] = [None] * n
for (i, (tok, ln, meta)) in enumerate(res['results']):
tok_arr[i] = tok
len_arr[i] = ln
meta_arr[i] = meta
if store_writer is not None:
flows = _flows_for_labeled_chunk(res, meta_arr, n)
order = np.lexsort((flows['start_ts'].to_numpy(dtype=np.float64), flows['protocol'].to_numpy(dtype=np.int64), flows['dst_port'].to_numpy(dtype=np.int64), flows['src_port'].to_numpy(dtype=np.int64), flows['dst_ip'].to_numpy(dtype=object), flows['src_ip'].to_numpy(dtype=object), flows['label'].to_numpy(dtype=object)))
store_writer.add_batch(tok_arr[order], len_arr[order], flows.iloc[order].reset_index(drop=True))
else:
tok_chunks.append(tok_arr)
len_chunks.append(len_arr)
meta_chunks.append(meta_arr)
label_chunks.append(np.full(n, lbl, dtype=object))
ex: dict[str, list] = {}
for col in extra_column_names:
val = extra.get(col, '')
ex[col] = [val] * n
extra_chunks.append(ex)
if n_jobs <= 1:
try:
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
res = _extract_single_pcap_worker(str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size)
_absorb(res)
total_flows += int(res.get('n_flows', 0))
if store_writer is not None:
_append_labeled_spool_chunks(res)
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
else:
try:
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
futs = []
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
futs.append(pool.submit(_extract_single_pcap_worker, str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size))
if store_writer is not None:
completed: dict[str, dict] = {}
for fut in as_completed(futs):
res = fut.result()
pcap_name = Path(res['pcap']).name
print(f"[pcap:{pcap_name}] label={res['label']} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
completed[str(res['pcap'])] = res
for (p, _, _) in pcap_files_with_labels:
res = completed.get(str(p))
if res is not None:
total_flows += int(res.get('n_flows', 0))
_append_labeled_spool_chunks(res)
else:
for fut in as_completed(futs):
_absorb(fut.result())
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
if store_writer is not None:
if total_flows == 0:
raise RuntimeError('No flows emitted — check pcap contents.')
store_writer.close()
print(f'[extract_labeled_pcaps] wrote sharded store {out_store}')
return
if not tok_chunks:
raise RuntimeError('No flows emitted — check pcap contents.')
tokens = np.concatenate(tok_chunks, axis=0)
lengths = np.concatenate(len_chunks, axis=0)
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
labels = np.concatenate(label_chunks, axis=0)
extra_dict: dict[str, list] = {col: [] for col in extra_column_names}
for chunk in extra_chunks:
for col in extra_column_names:
extra_dict[col].extend(chunk[col])
del tok_chunks, len_chunks, meta_chunks, label_chunks, extra_chunks
sip_arr = np.asarray([m['src_ip'] for m in meta_list], dtype=object)
dip_arr = np.asarray([m['dst_ip'] for m in meta_list], dtype=object)
sp_arr = np.asarray([m['src_port'] for m in meta_list], dtype=np.int64)
dp_arr = np.asarray([m['dst_port'] for m in meta_list], dtype=np.int64)
pr_arr = np.asarray([m['protocol'] for m in meta_list], dtype=np.int64)
ts_arr = np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64)
order = np.lexsort((ts_arr, pr_arr, dp_arr, sp_arr, dip_arr, sip_arr, labels))
tokens = tokens[order]
lengths = lengths[order]
labels = labels[order]
meta_list = [meta_list[i] for i in order]
for col in extra_column_names:
extra_dict[col] = [extra_dict[col][i] for i in order]
N = len(tokens)
flow_id = np.arange(N, dtype=np.uint64)
print(f'\n[extract_labeled_pcaps] total flows: {N:,}')
print(f'[extract_labeled_pcaps] label distribution:')
(ulabels, counts) = np.unique(labels, return_counts=True)
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
print(f' {lbl:<40s} {cnt:>10,}')
out_packets.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
print(f'[extract_labeled_pcaps] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
out_flows.parent.mkdir(parents=True, exist_ok=True)
cols = {'flow_id': flow_id, 'label': labels.astype(str), 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)}
for col in extra_column_names:
cols[col] = np.asarray(extra_dict[col], dtype=object)
flow_df = pd.DataFrame(cols)
flow_df.to_parquet(out_flows, compression='snappy', index=False)
print(f'[extract_labeled_pcaps] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB) cols={list(flow_df.columns)}')
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels.astype(str), out_path=out_flows.parent / 'flow_features.parquet')