Initial commit: code, paper, small artifacts

This commit is contained in:
2026-05-07 20:47:30 +08:00
commit fae2db8cff
322 changed files with 33159 additions and 0 deletions

774
scripts/extract_lib.py Normal file
View File

@@ -0,0 +1,774 @@
from __future__ import annotations
import os
import shutil
import socket
import sys
import tempfile
import time as _time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
import dpkt
import numpy as np
import pandas as pd
from scipy.optimize import linear_sum_assignment
_SCRIPT_DIR = Path(__file__).resolve().parent
_REPO_ROOT = _SCRIPT_DIR.parent
sys.path.insert(0, str(_REPO_ROOT / 'Packet_CFM'))
from packet_store import PacketShardWriter
PACKET_FEATURE_NAMES = ('log_size', 'log_dt_ms', 'direction', 'tcp_syn', 'tcp_fin', 'tcp_rst', 'tcp_psh', 'tcp_ack', 'log_win')
PACKET_D = len(PACKET_FEATURE_NAMES)
(FIN, SYN, RST, PSH, ACK) = (1, 2, 4, 8, 16)
@dataclass(slots=True)
class PacketRecord:
timestamp: float
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: int
tcp_flags: int
payload_len: int
header_len: int
total_len: int
window_size: int
def _try_open_pcap(f):
try:
return dpkt.pcap.Reader(f)
except ValueError:
f.seek(0)
return dpkt.pcapng.Reader(f)
def iter_packets(pcap_path: Path, max_packets: int | None=None) -> Iterator[PacketRecord]:
n = 0
with open(pcap_path, 'rb') as f:
reader = _try_open_pcap(f)
link_type = reader.datalink()
for (ts, buf) in reader:
try:
if link_type == dpkt.pcap.DLT_EN10MB:
eth = dpkt.ethernet.Ethernet(buf)
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = eth.data
elif link_type == dpkt.pcap.DLT_RAW:
ip = dpkt.ip.IP(buf)
elif link_type == dpkt.pcap.DLT_LINUX_SLL:
sll = dpkt.sll.SLL(buf)
if sll.ethtype != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = sll.data
else:
continue
if not isinstance(ip, dpkt.ip.IP):
continue
src_ip = socket.inet_ntoa(ip.src)
dst_ip = socket.inet_ntoa(ip.dst)
transport = ip.data
if isinstance(transport, dpkt.tcp.TCP):
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=6, tcp_flags=transport.flags, payload_len=len(transport.data), header_len=transport.off * 4, total_len=ip.len, window_size=transport.win)
elif isinstance(transport, dpkt.udp.UDP):
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=17, tcp_flags=0, payload_len=len(transport.data), header_len=8, total_len=ip.len, window_size=0)
else:
continue
except (dpkt.NeedData, dpkt.UnpackError, AttributeError):
continue
n += 1
if max_packets is not None and n >= max_packets:
return
def _packet_token(pkt: PacketRecord, prev_ts: float | None, direction: int) -> np.ndarray:
dt_ms = 0.0 if prev_ts is None else max(0.0, (pkt.timestamp - prev_ts) * 1000.0)
syn = int(bool(pkt.tcp_flags & SYN))
fin = int(bool(pkt.tcp_flags & FIN))
rst = int(bool(pkt.tcp_flags & RST))
psh = int(bool(pkt.tcp_flags & PSH))
ack = int(bool(pkt.tcp_flags & ACK))
return np.array([float(np.log1p(max(pkt.total_len, 0))), float(np.log1p(dt_ms)), float(direction), syn, fin, rst, psh, ack, float(np.log1p(max(pkt.window_size, 0)))], dtype=np.float32)
class _TokenFlow:
__slots__ = ('key_fwd', 'start_ts', 'last_ts', 'fin_count', 'tokens', 'prev_ts', 'n_pkts')
def __init__(self, key_fwd: tuple, start_ts: float) -> None:
self.key_fwd = key_fwd
self.start_ts = start_ts
self.last_ts = start_ts
self.fin_count = 0
self.tokens: list[np.ndarray] = []
self.prev_ts: float | None = None
self.n_pkts: int = 0
def add(self, pkt: PacketRecord, is_forward: bool, max_len: int) -> None:
direction = 0 if is_forward else 1
if len(self.tokens) < max_len:
self.tokens.append(_packet_token(pkt, self.prev_ts, direction))
self.prev_ts = pkt.timestamp
self.last_ts = pkt.timestamp
self.n_pkts += 1
def stream_token_flows(packet_iter: Iterator[PacketRecord], idle_timeout: float, max_len: int, gc_every: int=200000) -> Iterator[_TokenFlow]:
active: dict[tuple, _TokenFlow] = {}
last_pkt_ts = 0.0
n_seen = 0
for pkt in packet_iter:
last_pkt_ts = pkt.timestamp
fwd_key = (pkt.src_ip, pkt.dst_ip, pkt.src_port, pkt.dst_port, pkt.protocol)
bwd_key = (pkt.dst_ip, pkt.src_ip, pkt.dst_port, pkt.src_port, pkt.protocol)
flow: _TokenFlow | None = None
key = fwd_key
is_forward = True
if fwd_key in active:
(flow, key, is_forward) = (active[fwd_key], fwd_key, True)
elif bwd_key in active:
(flow, key, is_forward) = (active[bwd_key], bwd_key, False)
if flow is not None and pkt.timestamp - flow.last_ts > idle_timeout:
old = active.pop(key)
yield old
flow = None
if flow is None:
flow = _TokenFlow(key_fwd=fwd_key, start_ts=pkt.timestamp)
key = fwd_key
is_forward = True
active[key] = flow
flow.add(pkt, is_forward, max_len)
if pkt.protocol == 6:
if pkt.tcp_flags & RST:
yield active.pop(key)
elif pkt.tcp_flags & FIN:
flow.fin_count += 1
if flow.fin_count >= 2:
yield active.pop(key)
n_seen += 1
if n_seen % gc_every == 0:
stale = [k for (k, fl) in active.items() if last_pkt_ts - fl.last_ts > idle_timeout]
for k in stale:
yield active.pop(k)
for fl in list(active.values()):
yield fl
active.clear()
def _canonical_key(src_ip: str, dst_ip: str, src_port: int, dst_port: int, proto: int) -> tuple:
a = (src_ip, src_port)
b = (dst_ip, dst_port)
if a <= b:
return (a[0], a[1], b[0], b[1], proto)
return (b[0], b[1], a[0], a[1], proto)
def _to_fixed_tensor(flow_tokens: list[np.ndarray], max_len: int) -> np.ndarray:
out = np.zeros((max_len, PACKET_D), dtype=np.float32)
n = min(len(flow_tokens), max_len)
if n > 0:
out[:n] = np.stack(flow_tokens[:n], axis=0)
return out
class _WorkerChunkWriter:
def __init__(self, root: Path, *, prefix: str, T_full: int, chunk_size: int) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self.prefix = prefix
self.T_full = T_full
self.chunk_size = max(1, int(chunk_size))
self._tokens: list[np.ndarray] = []
self._records: list[dict] = []
self._next_chunk = 0
self.chunks: list[dict[str, str]] = []
def add_csv_match(self, row_i: int, tok: np.ndarray, ln: int, meta: dict) -> None:
rec = dict(meta)
rec['csv_row_idx'] = int(row_i)
rec['packet_length'] = int(ln)
self._add(tok, rec)
def add_labeled(self, tok: np.ndarray, ln: int, meta: dict, label: str, extra: dict) -> None:
rec = dict(meta)
rec['packet_length'] = int(ln)
rec['label'] = str(label)
for (k, v) in extra.items():
rec[str(k)] = v
self._add(tok, rec)
def close(self) -> list[dict[str, str]]:
if self._tokens:
self._flush()
return self.chunks
def _add(self, tok: np.ndarray, rec: dict) -> None:
self._tokens.append(tok.astype(np.float32, copy=False))
self._records.append(rec)
if len(self._tokens) >= self.chunk_size:
self._flush()
def _flush(self) -> None:
n = len(self._tokens)
tokens = np.empty((n, self.T_full, PACKET_D), dtype=np.float32)
for (i, tok) in enumerate(self._tokens):
tokens[i] = tok
stem = f'{self.prefix}-chunk-{self._next_chunk:06d}'
token_path = self.root / f'{stem}.npy'
meta_path = self.root / f'{stem}.parquet'
np.save(token_path, tokens, allow_pickle=False)
pd.DataFrame(self._records).to_parquet(meta_path, compression='snappy', index=False)
self.chunks.append({'tokens': str(token_path), 'meta': str(meta_path)})
self._tokens.clear()
self._records.clear()
self._next_chunk += 1
def _flow_meta(fl: _TokenFlow) -> dict:
(sip, dip, sp, dp, proto) = fl.key_fwd
return {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
def _build_stream_csv_index(csv_rows_for_day: dict[tuple, list[tuple[int, float]]]) -> dict[tuple, dict[str, np.ndarray]]:
out: dict[tuple, dict[str, np.ndarray]] = {}
for (ck, rows) in csv_rows_for_day.items():
finite = [(int(row_i), float(ts)) for (row_i, ts) in rows if not np.isnan(ts)]
if not finite:
continue
finite.sort(key=lambda x: (x[1], x[0]))
row_idx = np.asarray([r for (r, _) in finite], dtype=np.int64)
ts = np.asarray([t for (_, t) in finite], dtype=np.float64)
used = np.zeros(len(finite), dtype=bool)
out[ck] = {'row_idx': row_idx, 'ts': ts, 'used': used}
return out
def _nearest_unused_row(entry: dict[str, np.ndarray], ts: float, tolerance: float) -> tuple[int | None, float | None]:
csv_ts = entry['ts']
used = entry['used']
pos = int(np.searchsorted(csv_ts, ts, side='left'))
best_i: int | None = None
best_abs = float('inf')
j = pos - 1
while j >= 0:
diff = abs(float(csv_ts[j]) - ts)
if diff > tolerance:
break
if not bool(used[j]) and diff < best_abs:
best_i = j
best_abs = diff
j -= 1
j = pos
n = len(csv_ts)
while j < n:
diff = abs(float(csv_ts[j]) - ts)
if diff > tolerance:
break
if not bool(used[j]) and diff < best_abs:
best_i = j
best_abs = diff
j += 1
if best_i is None:
return (None, None)
used[best_i] = True
return (int(entry['row_idx'][best_i]), ts - float(csv_ts[best_i]))
def _extract_day_worker(day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000, match_strategy: str='hungarian') -> dict:
if match_strategy == 'stream_nearest':
if spool_dir is None:
raise ValueError('stream_nearest requires spool_dir')
return _extract_day_worker_stream_nearest(day=day, pcap_files_str=pcap_files_str, csv_rows_for_day=csv_rows_for_day, max_len=max_len, idle_timeout=idle_timeout, time_tolerance_seconds=time_tolerance_seconds, max_packets_per_pcap=max_packets_per_pcap, spool_dir=spool_dir, worker_flush_size=worker_flush_size)
pcap_by_key: dict[tuple, list[_TokenFlow]] = defaultdict(list)
n_pkts = 0
t_start = _time.time()
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
for pcap_path_str in pcap_files_str:
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
(sip, dip, sp, dp, proto) = fl.key_fwd
ck = _canonical_key(sip, dip, sp, dp, proto)
pcap_by_key[ck].append(fl)
n_flows = sum((len(v) for v in pcap_by_key.values()))
elapsed = _time.time() - t_start
BIG = time_tolerance_seconds * 1000.0
results: list[tuple[int, np.ndarray, int, dict]] = []
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
n_joined = 0
n_collision = 0
n_csv_keys = len(csv_rows_for_day)
n_intersection = 0
def _emit(row_i: int, fl: _TokenFlow) -> None:
nonlocal n_joined
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
meta = _flow_meta(fl)
if chunk_writer is not None:
chunk_writer.add_csv_match(row_i, tok, ln, meta)
else:
results.append((row_i, tok, ln, meta))
n_joined += 1
for (ck, rows) in sorted(csv_rows_for_day.items(), key=lambda kv: kv[1][0][0]):
if ck not in pcap_by_key:
continue
n_intersection += 1
pcap_flows = pcap_by_key[ck]
csv_ts = np.array([r[1] for r in rows], dtype=np.float64)
pcap_ts = np.array([fl.start_ts for fl in pcap_flows], dtype=np.float64)
(n_csv, n_pcap) = (len(csv_ts), len(pcap_ts))
if n_csv == 1 and n_pcap == 1:
row_i = rows[0][0]
ts = csv_ts[0]
fl = pcap_flows[0]
if not np.isnan(ts) and abs(fl.start_ts - ts) <= time_tolerance_seconds:
_emit(row_i, fl)
else:
n_collision += 1
continue
cost = np.abs(csv_ts[:, None] - pcap_ts[None, :])
cost[np.isnan(cost)] = BIG
cost[cost > time_tolerance_seconds] = BIG
(row_ind, col_ind) = linear_sum_assignment(cost)
for (r, c) in zip(row_ind, col_ind):
if cost[r, c] >= BIG:
n_collision += 1
continue
row_i = rows[r][0]
fl = pcap_flows[c]
_emit(row_i, fl)
deltas: list[float] = []
sampled = 0
for (ck, rows) in csv_rows_for_day.items():
if sampled >= 10000 or ck not in pcap_by_key:
if sampled >= 10000:
break
continue
(row_i, ts) = rows[0]
if np.isnan(ts):
continue
deltas.append(pcap_by_key[ck][0].start_ts - ts)
sampled += 1
return {'day': day, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(pcap_by_key), 'n_csv_keys': n_csv_keys, 'n_intersection': n_intersection, 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': match_strategy}
def _extract_day_worker_stream_nearest(*, day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str, worker_flush_size: int) -> dict:
t_start = _time.time()
n_pkts = 0
n_flows = 0
n_joined = 0
n_collision = 0
seen_pcap_keys: set[tuple] = set()
intersected_keys: set[tuple] = set()
deltas: list[float] = []
csv_index = _build_stream_csv_index(csv_rows_for_day)
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size)
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
for pcap_path_str in pcap_files_str:
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
n_flows += 1
(sip, dip, sp, dp, proto) = fl.key_fwd
ck = _canonical_key(sip, dip, sp, dp, proto)
seen_pcap_keys.add(ck)
entry = csv_index.get(ck)
if entry is None:
continue
intersected_keys.add(ck)
(row_i, delta) = _nearest_unused_row(entry, float(fl.start_ts), time_tolerance_seconds)
if row_i is None:
n_collision += 1
continue
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
chunk_writer.add_csv_match(row_i, tok, ln, _flow_meta(fl))
n_joined += 1
if delta is not None and len(deltas) < 10000:
deltas.append(float(delta))
elapsed = _time.time() - t_start
return {'day': day, 'results': [], 'chunks': chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(seen_pcap_keys), 'n_csv_keys': len(csv_rows_for_day), 'n_intersection': len(intersected_keys), 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': 'stream_nearest'}
def _print_day_stats(res: dict) -> None:
day = res['day']
strategy = res.get('match_strategy', 'hungarian')
print(f"[{day}] {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s match={strategy} ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
print(f" pcap_keys={res['n_pcap_keys']:,} csv_keys={res['n_csv_keys']:,} intersection={res['n_intersection']:,} joined={int(res.get('n_joined', len(res.get('results', ())))):,} within-key-miss={res['n_collision']:,}")
deltas = res.get('deltas') or []
if deltas:
arr = np.asarray(deltas, dtype=np.float64)
print(f' time-delta (pcap_start - csv_ts), seconds: median={np.median(arr):+.2f} mean={arr.mean():+.2f} std={arr.std():.2f} p05={np.percentile(arr, 5):+.2f} p95={np.percentile(arr, 95):+.2f}')
med = float(np.median(arr))
if abs(med) > 2.0:
print(f' -> median |{med:.1f}s| > 2s: rerun with --time-offset {med:.0f}')
def extract_dataset(*, csv_rows_by_day: dict[str, dict[tuple, list[tuple[int, float]]]], labels_by_row: np.ndarray, pcap_files_by_day: dict[str, list[Path]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, match_strategy: str | None=None, T_full: int=256, idle_timeout: float=120.0, time_tolerance_seconds: float=2.0, max_packets_per_pcap: int | None=None, n_jobs: int=0) -> None:
N_csv = len(labels_by_row)
print(f'[extract_dataset] N_csv={N_csv:,} T_full={T_full} days={sorted(csv_rows_by_day.keys())}')
if match_strategy is None:
match_strategy = 'stream_nearest' if out_store is not None else 'hungarian'
if match_strategy not in ('hungarian', 'stream_nearest'):
raise ValueError("match_strategy must be 'hungarian' or 'stream_nearest'")
if match_strategy == 'stream_nearest' and out_store is None:
raise ValueError('stream_nearest is only supported with --out-store')
print(f'[extract_dataset] match_strategy={match_strategy}')
tasks: list[tuple] = []
for (day, rows_dict) in csv_rows_by_day.items():
pcap_files = pcap_files_by_day.get(day, [])
if not pcap_files:
print(f'[{day}] NO pcap files — skipping ({len(rows_dict):,} CSV keys unmatched)')
continue
tasks.append((day, [str(p) for p in pcap_files], dict(rows_dict)))
if not tasks:
raise RuntimeError('No days with pcap files — nothing to extract.')
if n_jobs <= 0:
n_jobs = min(len(tasks), os.cpu_count() or 1)
print(f'[extract_dataset] running {len(tasks)} day(s) with {n_jobs} worker(s)')
store_writer: PacketShardWriter | None = None
spool_root: Path | None = None
if out_store is not None:
print(f'[extract_dataset] sharded output enabled: {out_store} shard_size={shard_size:,}')
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
if spool_dir is None:
out_store_parent = Path(out_store).parent
out_store_parent.mkdir(parents=True, exist_ok=True)
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
else:
spool_root = Path(spool_dir)
if spool_root.exists():
shutil.rmtree(spool_root)
spool_root.mkdir(parents=True, exist_ok=True)
print(f'[extract_dataset] worker spool={spool_root} flush_size={worker_flush_size:,}')
tok_chunks: list[np.ndarray] = []
len_chunks: list[np.ndarray] = []
row_chunks: list[np.ndarray] = []
meta_chunks: list[list[dict]] = []
total_joined = 0
def _materialize_results(results: list[tuple[int, np.ndarray, int, dict]]) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict]]:
results = sorted(results, key=lambda x: x[0])
n = len(results)
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
len_arr = np.empty(n, dtype=np.int32)
row_arr = np.empty(n, dtype=np.int64)
meta_arr: list[dict] = [None] * n
for (i, (row_i, tok, ln, meta)) in enumerate(results):
tok_arr[i] = tok
len_arr[i] = ln
row_arr[i] = row_i
meta_arr[i] = meta
return (tok_arr, len_arr, row_arr, meta_arr)
def _flows_from_meta(row_arr: np.ndarray, meta_arr: list[dict]) -> pd.DataFrame:
labels = labels_by_row[row_arr].astype(str)
return pd.DataFrame({'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)})
def _append_spool_chunks(res: dict) -> None:
chunks = res.get('chunks') or []
for chunk in chunks:
tokens = np.load(chunk['tokens'], mmap_mode='r')
meta_df = pd.read_parquet(chunk['meta'])
if meta_df.empty:
continue
meta_df = meta_df.assign(__token_row=np.arange(len(meta_df), dtype=np.int64))
meta_df = meta_df.sort_values('csv_row_idx', kind='stable').reset_index(drop=True)
row_arr = meta_df['csv_row_idx'].to_numpy(dtype=np.int64)
lengths = meta_df['packet_length'].to_numpy(dtype=np.int32)
order = meta_df['__token_row'].to_numpy(dtype=np.int64)
labels = labels_by_row[row_arr].astype(str)
flows = pd.DataFrame({'label': labels, 'start_ts': meta_df['start_ts'].to_numpy(dtype=np.float64), 'src_ip': meta_df['src_ip'].to_numpy(dtype=object), 'dst_ip': meta_df['dst_ip'].to_numpy(dtype=object), 'src_port': meta_df['src_port'].to_numpy(dtype=np.uint32), 'dst_port': meta_df['dst_port'].to_numpy(dtype=np.uint32), 'protocol': meta_df['protocol'].to_numpy(dtype=np.uint8), 'n_pkts': meta_df['n_pkts'].to_numpy(dtype=np.uint32)})
assert store_writer is not None
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
def _absorb(res: dict, *, print_stats: bool=True) -> None:
if print_stats:
_print_day_stats(res)
results = res['results']
if not results:
return
(tok_arr, len_arr, row_arr, meta_arr) = _materialize_results(results)
if store_writer is not None:
store_writer.add_batch(tok_arr, len_arr, _flows_from_meta(row_arr, meta_arr))
else:
tok_chunks.append(tok_arr)
len_chunks.append(len_arr)
row_chunks.append(row_arr)
meta_chunks.append(meta_arr)
if n_jobs <= 1:
try:
for (i, (day, pcaps, rows)) in enumerate(tasks):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
res = _extract_day_worker(day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy)
_print_day_stats(res)
total_joined += int(res.get('n_joined', 0))
if store_writer is not None:
_append_spool_chunks(res)
else:
_absorb(res, print_stats=False)
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
else:
try:
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
futs = []
for (i, (day, pcaps, rows)) in enumerate(tasks):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
futs.append(pool.submit(_extract_day_worker, day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy))
if store_writer is not None:
completed: dict[str, dict] = {}
for fut in as_completed(futs):
res = fut.result()
_print_day_stats(res)
completed[res['day']] = res
for (day, _, _) in tasks:
if day in completed:
total_joined += int(completed[day].get('n_joined', 0))
_append_spool_chunks(completed[day])
else:
for fut in as_completed(futs):
_absorb(fut.result())
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
if store_writer is not None:
if total_joined == 0:
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
store_writer.close()
print(f'[extract_dataset] wrote sharded store {out_store}')
return
if not tok_chunks:
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
tokens = np.concatenate(tok_chunks, axis=0)
lengths = np.concatenate(len_chunks, axis=0)
csv_rows = np.concatenate(row_chunks, axis=0)
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
del tok_chunks, len_chunks, row_chunks, meta_chunks
order = np.argsort(csv_rows, kind='stable')
tokens = tokens[order]
lengths = lengths[order]
csv_rows = csv_rows[order]
meta_list = [meta_list[i] for i in order]
N_matched = len(tokens)
labels = labels_by_row[csv_rows].astype(str)
flow_id = np.arange(N_matched, dtype=np.uint64)
print(f'\n[extract_dataset] matched {N_matched:,}/{N_csv:,} ({100.0 * N_matched / max(N_csv, 1):.2f}%)')
print(f'[extract_dataset] label distribution (matched rows):')
(ulabels, counts) = np.unique(labels, return_counts=True)
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
print(f' {lbl:<40s} {cnt:>10,}')
out_packets.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
print(f'[extract_dataset] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
out_flows.parent.mkdir(parents=True, exist_ok=True)
flow_df = pd.DataFrame({'flow_id': flow_id, 'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)})
flow_df.to_parquet(out_flows, compression='snappy', index=False)
print(f'[extract_dataset] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB)')
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels, out_path=out_flows.parent / 'flow_features.parquet')
def _write_canonical_flow_features(*, tokens: np.ndarray, lengths: np.ndarray, flow_id: np.ndarray, labels: np.ndarray, out_path: Path) -> None:
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from common.data_contract import CANONICAL_FLOW_FEATURE_NAMES, compute_flow_features_from_packets
print(f'[extract_dataset] computing canonical {len(CANONICAL_FLOW_FEATURE_NAMES)}-d flow features from packet tokens ...')
feats = compute_flow_features_from_packets(tokens, lengths)
out_path.parent.mkdir(parents=True, exist_ok=True)
df = pd.DataFrame({'flow_id': flow_id, 'label': labels})
for (i, name) in enumerate(CANONICAL_FLOW_FEATURE_NAMES):
df[name] = feats[:, i]
df.to_parquet(out_path, compression='snappy', index=False)
print(f'[extract_dataset] wrote {out_path} ({out_path.stat().st_size / 1000000.0:.2f} MB)')
def _extract_single_pcap_worker(pcap_path_str: str, label: str, extra: dict, max_len: int, idle_timeout: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000) -> dict:
t_start = _time.time()
n_pkts = 0
n_flows = 0
results: list[tuple[np.ndarray, int, dict]] = []
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'pcap-{Path(pcap_path_str).stem}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
def _counting_iter(pkt_iter):
nonlocal n_pkts
for pkt in pkt_iter:
n_pkts += 1
yield pkt
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
(sip, dip, sp, dp, proto) = fl.key_fwd
meta = {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
tok = _to_fixed_tensor(fl.tokens, max_len)
ln = min(len(fl.tokens), max_len)
if chunk_writer is not None:
chunk_writer.add_labeled(tok, ln, meta, label, extra)
else:
results.append((tok, ln, meta))
n_flows += 1
elapsed = _time.time() - t_start
return {'pcap': pcap_path_str, 'label': label, 'extra': extra, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed}
def extract_labeled_pcaps(*, pcap_files_with_labels: list[tuple[Path, str, dict]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, T_full: int=256, idle_timeout: float=120.0, max_packets_per_pcap: int | None=None, n_jobs: int=0, extra_column_names: tuple[str, ...]=()) -> None:
N_pcap = len(pcap_files_with_labels)
print(f'[extract_labeled_pcaps] n_pcaps={N_pcap} T_full={T_full} extra_cols={extra_column_names}')
for (p, lbl, extra) in pcap_files_with_labels[:10]:
print(f' {lbl:<20s} {Path(p).name:<60s} extra={extra}')
if N_pcap > 10:
print(f' ... ({N_pcap - 10} more)')
if n_jobs <= 0:
n_jobs = min(N_pcap, os.cpu_count() or 1)
print(f'[extract_labeled_pcaps] running {N_pcap} pcap(s) with {n_jobs} worker(s)')
store_writer: PacketShardWriter | None = None
spool_root: Path | None = None
if out_store is not None:
print(f'[extract_labeled_pcaps] sharded output enabled: {out_store} shard_size={shard_size:,}')
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
if spool_dir is None:
out_store_parent = Path(out_store).parent
out_store_parent.mkdir(parents=True, exist_ok=True)
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
else:
spool_root = Path(spool_dir)
if spool_root.exists():
shutil.rmtree(spool_root)
spool_root.mkdir(parents=True, exist_ok=True)
print(f'[extract_labeled_pcaps] worker spool={spool_root} flush_size={worker_flush_size:,}')
tok_chunks: list[np.ndarray] = []
len_chunks: list[np.ndarray] = []
meta_chunks: list[list[dict]] = []
label_chunks: list[np.ndarray] = []
extra_chunks: list[dict[str, list]] = []
total_flows = 0
def _flows_for_labeled_chunk(res: dict, meta_arr: list[dict], n: int) -> pd.DataFrame:
cols = {'label': np.full(n, res['label'], dtype=object), 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)}
for col in extra_column_names:
cols[col] = np.full(n, res['extra'].get(col, ''), dtype=object)
return pd.DataFrame(cols)
def _append_labeled_spool_chunks(res: dict) -> None:
chunks = res.get('chunks') or []
for chunk in chunks:
tokens = np.load(chunk['tokens'], mmap_mode='r')
flows = pd.read_parquet(chunk['meta'])
if flows.empty:
continue
flows = flows.assign(__token_row=np.arange(len(flows), dtype=np.int64))
sort_keys = ['label', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'start_ts']
flows = flows.sort_values(sort_keys, kind='stable').reset_index(drop=True)
order = flows['__token_row'].to_numpy(dtype=np.int64)
lengths = flows['packet_length'].to_numpy(dtype=np.int32)
flows = flows.drop(columns=['packet_length', '__token_row'])
assert store_writer is not None
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
def _absorb(res: dict, *, print_stats: bool=True) -> None:
pcap_name = Path(res['pcap']).name
lbl = res['label']
extra = res['extra']
if print_stats:
print(f"[pcap:{pcap_name}] label={lbl} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
if not res['results']:
return
n = len(res['results'])
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
len_arr = np.empty(n, dtype=np.int32)
meta_arr: list[dict] = [None] * n
for (i, (tok, ln, meta)) in enumerate(res['results']):
tok_arr[i] = tok
len_arr[i] = ln
meta_arr[i] = meta
if store_writer is not None:
flows = _flows_for_labeled_chunk(res, meta_arr, n)
order = np.lexsort((flows['start_ts'].to_numpy(dtype=np.float64), flows['protocol'].to_numpy(dtype=np.int64), flows['dst_port'].to_numpy(dtype=np.int64), flows['src_port'].to_numpy(dtype=np.int64), flows['dst_ip'].to_numpy(dtype=object), flows['src_ip'].to_numpy(dtype=object), flows['label'].to_numpy(dtype=object)))
store_writer.add_batch(tok_arr[order], len_arr[order], flows.iloc[order].reset_index(drop=True))
else:
tok_chunks.append(tok_arr)
len_chunks.append(len_arr)
meta_chunks.append(meta_arr)
label_chunks.append(np.full(n, lbl, dtype=object))
ex: dict[str, list] = {}
for col in extra_column_names:
val = extra.get(col, '')
ex[col] = [val] * n
extra_chunks.append(ex)
if n_jobs <= 1:
try:
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
res = _extract_single_pcap_worker(str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size)
_absorb(res)
total_flows += int(res.get('n_flows', 0))
if store_writer is not None:
_append_labeled_spool_chunks(res)
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
else:
try:
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
futs = []
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
futs.append(pool.submit(_extract_single_pcap_worker, str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size))
if store_writer is not None:
completed: dict[str, dict] = {}
for fut in as_completed(futs):
res = fut.result()
pcap_name = Path(res['pcap']).name
print(f"[pcap:{pcap_name}] label={res['label']} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
completed[str(res['pcap'])] = res
for (p, _, _) in pcap_files_with_labels:
res = completed.get(str(p))
if res is not None:
total_flows += int(res.get('n_flows', 0))
_append_labeled_spool_chunks(res)
else:
for fut in as_completed(futs):
_absorb(fut.result())
finally:
if spool_root is not None:
shutil.rmtree(spool_root, ignore_errors=True)
if store_writer is not None:
if total_flows == 0:
raise RuntimeError('No flows emitted — check pcap contents.')
store_writer.close()
print(f'[extract_labeled_pcaps] wrote sharded store {out_store}')
return
if not tok_chunks:
raise RuntimeError('No flows emitted — check pcap contents.')
tokens = np.concatenate(tok_chunks, axis=0)
lengths = np.concatenate(len_chunks, axis=0)
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
labels = np.concatenate(label_chunks, axis=0)
extra_dict: dict[str, list] = {col: [] for col in extra_column_names}
for chunk in extra_chunks:
for col in extra_column_names:
extra_dict[col].extend(chunk[col])
del tok_chunks, len_chunks, meta_chunks, label_chunks, extra_chunks
sip_arr = np.asarray([m['src_ip'] for m in meta_list], dtype=object)
dip_arr = np.asarray([m['dst_ip'] for m in meta_list], dtype=object)
sp_arr = np.asarray([m['src_port'] for m in meta_list], dtype=np.int64)
dp_arr = np.asarray([m['dst_port'] for m in meta_list], dtype=np.int64)
pr_arr = np.asarray([m['protocol'] for m in meta_list], dtype=np.int64)
ts_arr = np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64)
order = np.lexsort((ts_arr, pr_arr, dp_arr, sp_arr, dip_arr, sip_arr, labels))
tokens = tokens[order]
lengths = lengths[order]
labels = labels[order]
meta_list = [meta_list[i] for i in order]
for col in extra_column_names:
extra_dict[col] = [extra_dict[col][i] for i in order]
N = len(tokens)
flow_id = np.arange(N, dtype=np.uint64)
print(f'\n[extract_labeled_pcaps] total flows: {N:,}')
print(f'[extract_labeled_pcaps] label distribution:')
(ulabels, counts) = np.unique(labels, return_counts=True)
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
print(f' {lbl:<40s} {cnt:>10,}')
out_packets.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
print(f'[extract_labeled_pcaps] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
out_flows.parent.mkdir(parents=True, exist_ok=True)
cols = {'flow_id': flow_id, 'label': labels.astype(str), 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)}
for col in extra_column_names:
cols[col] = np.asarray(extra_dict[col], dtype=object)
flow_df = pd.DataFrame(cols)
flow_df.to_parquet(out_flows, compression='snappy', index=False)
print(f'[extract_labeled_pcaps] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB) cols={list(flow_df.columns)}')
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels.astype(str), out_path=out_flows.parent / 'flow_features.parquet')