from __future__ import annotations import os import shutil import socket import sys import tempfile import time as _time from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from typing import Iterator import dpkt import numpy as np import pandas as pd from scipy.optimize import linear_sum_assignment _SCRIPT_DIR = Path(__file__).resolve().parent _REPO_ROOT = _SCRIPT_DIR.parent sys.path.insert(0, str(_REPO_ROOT / 'Packet_CFM')) from packet_store import PacketShardWriter PACKET_FEATURE_NAMES = ('log_size', 'log_dt_ms', 'direction', 'tcp_syn', 'tcp_fin', 'tcp_rst', 'tcp_psh', 'tcp_ack', 'log_win') PACKET_D = len(PACKET_FEATURE_NAMES) (FIN, SYN, RST, PSH, ACK) = (1, 2, 4, 8, 16) @dataclass(slots=True) class PacketRecord: timestamp: float src_ip: str dst_ip: str src_port: int dst_port: int protocol: int tcp_flags: int payload_len: int header_len: int total_len: int window_size: int def _try_open_pcap(f): try: return dpkt.pcap.Reader(f) except ValueError: f.seek(0) return dpkt.pcapng.Reader(f) def iter_packets(pcap_path: Path, max_packets: int | None=None) -> Iterator[PacketRecord]: n = 0 with open(pcap_path, 'rb') as f: reader = _try_open_pcap(f) link_type = reader.datalink() for (ts, buf) in reader: try: if link_type == dpkt.pcap.DLT_EN10MB: eth = dpkt.ethernet.Ethernet(buf) if eth.type != dpkt.ethernet.ETH_TYPE_IP: continue ip = eth.data elif link_type == dpkt.pcap.DLT_RAW: ip = dpkt.ip.IP(buf) elif link_type == dpkt.pcap.DLT_LINUX_SLL: sll = dpkt.sll.SLL(buf) if sll.ethtype != dpkt.ethernet.ETH_TYPE_IP: continue ip = sll.data else: continue if not isinstance(ip, dpkt.ip.IP): continue src_ip = socket.inet_ntoa(ip.src) dst_ip = socket.inet_ntoa(ip.dst) transport = ip.data if isinstance(transport, dpkt.tcp.TCP): yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=6, tcp_flags=transport.flags, payload_len=len(transport.data), header_len=transport.off * 4, total_len=ip.len, window_size=transport.win) elif isinstance(transport, dpkt.udp.UDP): yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=17, tcp_flags=0, payload_len=len(transport.data), header_len=8, total_len=ip.len, window_size=0) else: continue except (dpkt.NeedData, dpkt.UnpackError, AttributeError): continue n += 1 if max_packets is not None and n >= max_packets: return def _packet_token(pkt: PacketRecord, prev_ts: float | None, direction: int) -> np.ndarray: dt_ms = 0.0 if prev_ts is None else max(0.0, (pkt.timestamp - prev_ts) * 1000.0) syn = int(bool(pkt.tcp_flags & SYN)) fin = int(bool(pkt.tcp_flags & FIN)) rst = int(bool(pkt.tcp_flags & RST)) psh = int(bool(pkt.tcp_flags & PSH)) ack = int(bool(pkt.tcp_flags & ACK)) return np.array([float(np.log1p(max(pkt.total_len, 0))), float(np.log1p(dt_ms)), float(direction), syn, fin, rst, psh, ack, float(np.log1p(max(pkt.window_size, 0)))], dtype=np.float32) class _TokenFlow: __slots__ = ('key_fwd', 'start_ts', 'last_ts', 'fin_count', 'tokens', 'prev_ts', 'n_pkts') def __init__(self, key_fwd: tuple, start_ts: float) -> None: self.key_fwd = key_fwd self.start_ts = start_ts self.last_ts = start_ts self.fin_count = 0 self.tokens: list[np.ndarray] = [] self.prev_ts: float | None = None self.n_pkts: int = 0 def add(self, pkt: PacketRecord, is_forward: bool, max_len: int) -> None: direction = 0 if is_forward else 1 if len(self.tokens) < max_len: self.tokens.append(_packet_token(pkt, self.prev_ts, direction)) self.prev_ts = pkt.timestamp self.last_ts = pkt.timestamp self.n_pkts += 1 def stream_token_flows(packet_iter: Iterator[PacketRecord], idle_timeout: float, max_len: int, gc_every: int=200000) -> Iterator[_TokenFlow]: active: dict[tuple, _TokenFlow] = {} last_pkt_ts = 0.0 n_seen = 0 for pkt in packet_iter: last_pkt_ts = pkt.timestamp fwd_key = (pkt.src_ip, pkt.dst_ip, pkt.src_port, pkt.dst_port, pkt.protocol) bwd_key = (pkt.dst_ip, pkt.src_ip, pkt.dst_port, pkt.src_port, pkt.protocol) flow: _TokenFlow | None = None key = fwd_key is_forward = True if fwd_key in active: (flow, key, is_forward) = (active[fwd_key], fwd_key, True) elif bwd_key in active: (flow, key, is_forward) = (active[bwd_key], bwd_key, False) if flow is not None and pkt.timestamp - flow.last_ts > idle_timeout: old = active.pop(key) yield old flow = None if flow is None: flow = _TokenFlow(key_fwd=fwd_key, start_ts=pkt.timestamp) key = fwd_key is_forward = True active[key] = flow flow.add(pkt, is_forward, max_len) if pkt.protocol == 6: if pkt.tcp_flags & RST: yield active.pop(key) elif pkt.tcp_flags & FIN: flow.fin_count += 1 if flow.fin_count >= 2: yield active.pop(key) n_seen += 1 if n_seen % gc_every == 0: stale = [k for (k, fl) in active.items() if last_pkt_ts - fl.last_ts > idle_timeout] for k in stale: yield active.pop(k) for fl in list(active.values()): yield fl active.clear() def _canonical_key(src_ip: str, dst_ip: str, src_port: int, dst_port: int, proto: int) -> tuple: a = (src_ip, src_port) b = (dst_ip, dst_port) if a <= b: return (a[0], a[1], b[0], b[1], proto) return (b[0], b[1], a[0], a[1], proto) def _to_fixed_tensor(flow_tokens: list[np.ndarray], max_len: int) -> np.ndarray: out = np.zeros((max_len, PACKET_D), dtype=np.float32) n = min(len(flow_tokens), max_len) if n > 0: out[:n] = np.stack(flow_tokens[:n], axis=0) return out class _WorkerChunkWriter: def __init__(self, root: Path, *, prefix: str, T_full: int, chunk_size: int) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self.prefix = prefix self.T_full = T_full self.chunk_size = max(1, int(chunk_size)) self._tokens: list[np.ndarray] = [] self._records: list[dict] = [] self._next_chunk = 0 self.chunks: list[dict[str, str]] = [] def add_csv_match(self, row_i: int, tok: np.ndarray, ln: int, meta: dict) -> None: rec = dict(meta) rec['csv_row_idx'] = int(row_i) rec['packet_length'] = int(ln) self._add(tok, rec) def add_labeled(self, tok: np.ndarray, ln: int, meta: dict, label: str, extra: dict) -> None: rec = dict(meta) rec['packet_length'] = int(ln) rec['label'] = str(label) for (k, v) in extra.items(): rec[str(k)] = v self._add(tok, rec) def close(self) -> list[dict[str, str]]: if self._tokens: self._flush() return self.chunks def _add(self, tok: np.ndarray, rec: dict) -> None: self._tokens.append(tok.astype(np.float32, copy=False)) self._records.append(rec) if len(self._tokens) >= self.chunk_size: self._flush() def _flush(self) -> None: n = len(self._tokens) tokens = np.empty((n, self.T_full, PACKET_D), dtype=np.float32) for (i, tok) in enumerate(self._tokens): tokens[i] = tok stem = f'{self.prefix}-chunk-{self._next_chunk:06d}' token_path = self.root / f'{stem}.npy' meta_path = self.root / f'{stem}.parquet' np.save(token_path, tokens, allow_pickle=False) pd.DataFrame(self._records).to_parquet(meta_path, compression='snappy', index=False) self.chunks.append({'tokens': str(token_path), 'meta': str(meta_path)}) self._tokens.clear() self._records.clear() self._next_chunk += 1 def _flow_meta(fl: _TokenFlow) -> dict: (sip, dip, sp, dp, proto) = fl.key_fwd return {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)} def _build_stream_csv_index(csv_rows_for_day: dict[tuple, list[tuple[int, float]]]) -> dict[tuple, dict[str, np.ndarray]]: out: dict[tuple, dict[str, np.ndarray]] = {} for (ck, rows) in csv_rows_for_day.items(): finite = [(int(row_i), float(ts)) for (row_i, ts) in rows if not np.isnan(ts)] if not finite: continue finite.sort(key=lambda x: (x[1], x[0])) row_idx = np.asarray([r for (r, _) in finite], dtype=np.int64) ts = np.asarray([t for (_, t) in finite], dtype=np.float64) used = np.zeros(len(finite), dtype=bool) out[ck] = {'row_idx': row_idx, 'ts': ts, 'used': used} return out def _nearest_unused_row(entry: dict[str, np.ndarray], ts: float, tolerance: float) -> tuple[int | None, float | None]: csv_ts = entry['ts'] used = entry['used'] pos = int(np.searchsorted(csv_ts, ts, side='left')) best_i: int | None = None best_abs = float('inf') j = pos - 1 while j >= 0: diff = abs(float(csv_ts[j]) - ts) if diff > tolerance: break if not bool(used[j]) and diff < best_abs: best_i = j best_abs = diff j -= 1 j = pos n = len(csv_ts) while j < n: diff = abs(float(csv_ts[j]) - ts) if diff > tolerance: break if not bool(used[j]) and diff < best_abs: best_i = j best_abs = diff j += 1 if best_i is None: return (None, None) used[best_i] = True return (int(entry['row_idx'][best_i]), ts - float(csv_ts[best_i])) def _extract_day_worker(day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000, match_strategy: str='hungarian') -> dict: if match_strategy == 'stream_nearest': if spool_dir is None: raise ValueError('stream_nearest requires spool_dir') return _extract_day_worker_stream_nearest(day=day, pcap_files_str=pcap_files_str, csv_rows_for_day=csv_rows_for_day, max_len=max_len, idle_timeout=idle_timeout, time_tolerance_seconds=time_tolerance_seconds, max_packets_per_pcap=max_packets_per_pcap, spool_dir=spool_dir, worker_flush_size=worker_flush_size) pcap_by_key: dict[tuple, list[_TokenFlow]] = defaultdict(list) n_pkts = 0 t_start = _time.time() def _counting_iter(pkt_iter): nonlocal n_pkts for pkt in pkt_iter: n_pkts += 1 yield pkt for pcap_path_str in pcap_files_str: pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap) for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len): (sip, dip, sp, dp, proto) = fl.key_fwd ck = _canonical_key(sip, dip, sp, dp, proto) pcap_by_key[ck].append(fl) n_flows = sum((len(v) for v in pcap_by_key.values())) elapsed = _time.time() - t_start BIG = time_tolerance_seconds * 1000.0 results: list[tuple[int, np.ndarray, int, dict]] = [] chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None n_joined = 0 n_collision = 0 n_csv_keys = len(csv_rows_for_day) n_intersection = 0 def _emit(row_i: int, fl: _TokenFlow) -> None: nonlocal n_joined tok = _to_fixed_tensor(fl.tokens, max_len) ln = min(len(fl.tokens), max_len) meta = _flow_meta(fl) if chunk_writer is not None: chunk_writer.add_csv_match(row_i, tok, ln, meta) else: results.append((row_i, tok, ln, meta)) n_joined += 1 for (ck, rows) in sorted(csv_rows_for_day.items(), key=lambda kv: kv[1][0][0]): if ck not in pcap_by_key: continue n_intersection += 1 pcap_flows = pcap_by_key[ck] csv_ts = np.array([r[1] for r in rows], dtype=np.float64) pcap_ts = np.array([fl.start_ts for fl in pcap_flows], dtype=np.float64) (n_csv, n_pcap) = (len(csv_ts), len(pcap_ts)) if n_csv == 1 and n_pcap == 1: row_i = rows[0][0] ts = csv_ts[0] fl = pcap_flows[0] if not np.isnan(ts) and abs(fl.start_ts - ts) <= time_tolerance_seconds: _emit(row_i, fl) else: n_collision += 1 continue cost = np.abs(csv_ts[:, None] - pcap_ts[None, :]) cost[np.isnan(cost)] = BIG cost[cost > time_tolerance_seconds] = BIG (row_ind, col_ind) = linear_sum_assignment(cost) for (r, c) in zip(row_ind, col_ind): if cost[r, c] >= BIG: n_collision += 1 continue row_i = rows[r][0] fl = pcap_flows[c] _emit(row_i, fl) deltas: list[float] = [] sampled = 0 for (ck, rows) in csv_rows_for_day.items(): if sampled >= 10000 or ck not in pcap_by_key: if sampled >= 10000: break continue (row_i, ts) = rows[0] if np.isnan(ts): continue deltas.append(pcap_by_key[ck][0].start_ts - ts) sampled += 1 return {'day': day, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(pcap_by_key), 'n_csv_keys': n_csv_keys, 'n_intersection': n_intersection, 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': match_strategy} def _extract_day_worker_stream_nearest(*, day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str, worker_flush_size: int) -> dict: t_start = _time.time() n_pkts = 0 n_flows = 0 n_joined = 0 n_collision = 0 seen_pcap_keys: set[tuple] = set() intersected_keys: set[tuple] = set() deltas: list[float] = [] csv_index = _build_stream_csv_index(csv_rows_for_day) chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size) def _counting_iter(pkt_iter): nonlocal n_pkts for pkt in pkt_iter: n_pkts += 1 yield pkt for pcap_path_str in pcap_files_str: pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap) for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len): n_flows += 1 (sip, dip, sp, dp, proto) = fl.key_fwd ck = _canonical_key(sip, dip, sp, dp, proto) seen_pcap_keys.add(ck) entry = csv_index.get(ck) if entry is None: continue intersected_keys.add(ck) (row_i, delta) = _nearest_unused_row(entry, float(fl.start_ts), time_tolerance_seconds) if row_i is None: n_collision += 1 continue tok = _to_fixed_tensor(fl.tokens, max_len) ln = min(len(fl.tokens), max_len) chunk_writer.add_csv_match(row_i, tok, ln, _flow_meta(fl)) n_joined += 1 if delta is not None and len(deltas) < 10000: deltas.append(float(delta)) elapsed = _time.time() - t_start return {'day': day, 'results': [], 'chunks': chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(seen_pcap_keys), 'n_csv_keys': len(csv_rows_for_day), 'n_intersection': len(intersected_keys), 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': 'stream_nearest'} def _print_day_stats(res: dict) -> None: day = res['day'] strategy = res.get('match_strategy', 'hungarian') print(f"[{day}] {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s match={strategy} ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)") print(f" pcap_keys={res['n_pcap_keys']:,} csv_keys={res['n_csv_keys']:,} intersection={res['n_intersection']:,} joined={int(res.get('n_joined', len(res.get('results', ())))):,} within-key-miss={res['n_collision']:,}") deltas = res.get('deltas') or [] if deltas: arr = np.asarray(deltas, dtype=np.float64) print(f' time-delta (pcap_start - csv_ts), seconds: median={np.median(arr):+.2f} mean={arr.mean():+.2f} std={arr.std():.2f} p05={np.percentile(arr, 5):+.2f} p95={np.percentile(arr, 95):+.2f}') med = float(np.median(arr)) if abs(med) > 2.0: print(f' -> median |{med:.1f}s| > 2s: rerun with --time-offset {med:.0f}') def extract_dataset(*, csv_rows_by_day: dict[str, dict[tuple, list[tuple[int, float]]]], labels_by_row: np.ndarray, pcap_files_by_day: dict[str, list[Path]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, match_strategy: str | None=None, T_full: int=256, idle_timeout: float=120.0, time_tolerance_seconds: float=2.0, max_packets_per_pcap: int | None=None, n_jobs: int=0) -> None: N_csv = len(labels_by_row) print(f'[extract_dataset] N_csv={N_csv:,} T_full={T_full} days={sorted(csv_rows_by_day.keys())}') if match_strategy is None: match_strategy = 'stream_nearest' if out_store is not None else 'hungarian' if match_strategy not in ('hungarian', 'stream_nearest'): raise ValueError("match_strategy must be 'hungarian' or 'stream_nearest'") if match_strategy == 'stream_nearest' and out_store is None: raise ValueError('stream_nearest is only supported with --out-store') print(f'[extract_dataset] match_strategy={match_strategy}') tasks: list[tuple] = [] for (day, rows_dict) in csv_rows_by_day.items(): pcap_files = pcap_files_by_day.get(day, []) if not pcap_files: print(f'[{day}] NO pcap files — skipping ({len(rows_dict):,} CSV keys unmatched)') continue tasks.append((day, [str(p) for p in pcap_files], dict(rows_dict))) if not tasks: raise RuntimeError('No days with pcap files — nothing to extract.') if n_jobs <= 0: n_jobs = min(len(tasks), os.cpu_count() or 1) print(f'[extract_dataset] running {len(tasks)} day(s) with {n_jobs} worker(s)') store_writer: PacketShardWriter | None = None spool_root: Path | None = None if out_store is not None: print(f'[extract_dataset] sharded output enabled: {out_store} shard_size={shard_size:,}') store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True) if spool_dir is None: out_store_parent = Path(out_store).parent out_store_parent.mkdir(parents=True, exist_ok=True) spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent)) else: spool_root = Path(spool_dir) if spool_root.exists(): shutil.rmtree(spool_root) spool_root.mkdir(parents=True, exist_ok=True) print(f'[extract_dataset] worker spool={spool_root} flush_size={worker_flush_size:,}') tok_chunks: list[np.ndarray] = [] len_chunks: list[np.ndarray] = [] row_chunks: list[np.ndarray] = [] meta_chunks: list[list[dict]] = [] total_joined = 0 def _materialize_results(results: list[tuple[int, np.ndarray, int, dict]]) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict]]: results = sorted(results, key=lambda x: x[0]) n = len(results) tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32) len_arr = np.empty(n, dtype=np.int32) row_arr = np.empty(n, dtype=np.int64) meta_arr: list[dict] = [None] * n for (i, (row_i, tok, ln, meta)) in enumerate(results): tok_arr[i] = tok len_arr[i] = ln row_arr[i] = row_i meta_arr[i] = meta return (tok_arr, len_arr, row_arr, meta_arr) def _flows_from_meta(row_arr: np.ndarray, meta_arr: list[dict]) -> pd.DataFrame: labels = labels_by_row[row_arr].astype(str) return pd.DataFrame({'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)}) def _append_spool_chunks(res: dict) -> None: chunks = res.get('chunks') or [] for chunk in chunks: tokens = np.load(chunk['tokens'], mmap_mode='r') meta_df = pd.read_parquet(chunk['meta']) if meta_df.empty: continue meta_df = meta_df.assign(__token_row=np.arange(len(meta_df), dtype=np.int64)) meta_df = meta_df.sort_values('csv_row_idx', kind='stable').reset_index(drop=True) row_arr = meta_df['csv_row_idx'].to_numpy(dtype=np.int64) lengths = meta_df['packet_length'].to_numpy(dtype=np.int32) order = meta_df['__token_row'].to_numpy(dtype=np.int64) labels = labels_by_row[row_arr].astype(str) flows = pd.DataFrame({'label': labels, 'start_ts': meta_df['start_ts'].to_numpy(dtype=np.float64), 'src_ip': meta_df['src_ip'].to_numpy(dtype=object), 'dst_ip': meta_df['dst_ip'].to_numpy(dtype=object), 'src_port': meta_df['src_port'].to_numpy(dtype=np.uint32), 'dst_port': meta_df['dst_port'].to_numpy(dtype=np.uint32), 'protocol': meta_df['protocol'].to_numpy(dtype=np.uint8), 'n_pkts': meta_df['n_pkts'].to_numpy(dtype=np.uint32)}) assert store_writer is not None store_writer.add_batch(np.asarray(tokens[order]), lengths, flows) def _absorb(res: dict, *, print_stats: bool=True) -> None: if print_stats: _print_day_stats(res) results = res['results'] if not results: return (tok_arr, len_arr, row_arr, meta_arr) = _materialize_results(results) if store_writer is not None: store_writer.add_batch(tok_arr, len_arr, _flows_from_meta(row_arr, meta_arr)) else: tok_chunks.append(tok_arr) len_chunks.append(len_arr) row_chunks.append(row_arr) meta_chunks.append(meta_arr) if n_jobs <= 1: try: for (i, (day, pcaps, rows)) in enumerate(tasks): task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}') res = _extract_day_worker(day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy) _print_day_stats(res) total_joined += int(res.get('n_joined', 0)) if store_writer is not None: _append_spool_chunks(res) else: _absorb(res, print_stats=False) finally: if spool_root is not None: shutil.rmtree(spool_root, ignore_errors=True) else: try: with ProcessPoolExecutor(max_workers=n_jobs) as pool: futs = [] for (i, (day, pcaps, rows)) in enumerate(tasks): task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}') futs.append(pool.submit(_extract_day_worker, day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy)) if store_writer is not None: completed: dict[str, dict] = {} for fut in as_completed(futs): res = fut.result() _print_day_stats(res) completed[res['day']] = res for (day, _, _) in tasks: if day in completed: total_joined += int(completed[day].get('n_joined', 0)) _append_spool_chunks(completed[day]) else: for fut in as_completed(futs): _absorb(fut.result()) finally: if spool_root is not None: shutil.rmtree(spool_root, ignore_errors=True) if store_writer is not None: if total_joined == 0: raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.') store_writer.close() print(f'[extract_dataset] wrote sharded store {out_store}') return if not tok_chunks: raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.') tokens = np.concatenate(tok_chunks, axis=0) lengths = np.concatenate(len_chunks, axis=0) csv_rows = np.concatenate(row_chunks, axis=0) meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk] del tok_chunks, len_chunks, row_chunks, meta_chunks order = np.argsort(csv_rows, kind='stable') tokens = tokens[order] lengths = lengths[order] csv_rows = csv_rows[order] meta_list = [meta_list[i] for i in order] N_matched = len(tokens) labels = labels_by_row[csv_rows].astype(str) flow_id = np.arange(N_matched, dtype=np.uint64) print(f'\n[extract_dataset] matched {N_matched:,}/{N_csv:,} ({100.0 * N_matched / max(N_csv, 1):.2f}%)') print(f'[extract_dataset] label distribution (matched rows):') (ulabels, counts) = np.unique(labels, return_counts=True) for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]): print(f' {lbl:<40s} {cnt:>10,}') out_packets.parent.mkdir(parents=True, exist_ok=True) np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id) print(f'[extract_dataset] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)') out_flows.parent.mkdir(parents=True, exist_ok=True) flow_df = pd.DataFrame({'flow_id': flow_id, 'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)}) flow_df.to_parquet(out_flows, compression='snappy', index=False) print(f'[extract_dataset] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB)') _write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels, out_path=out_flows.parent / 'flow_features.parquet') def _write_canonical_flow_features(*, tokens: np.ndarray, lengths: np.ndarray, flow_id: np.ndarray, labels: np.ndarray, out_path: Path) -> None: sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from common.data_contract import CANONICAL_FLOW_FEATURE_NAMES, compute_flow_features_from_packets print(f'[extract_dataset] computing canonical {len(CANONICAL_FLOW_FEATURE_NAMES)}-d flow features from packet tokens ...') feats = compute_flow_features_from_packets(tokens, lengths) out_path.parent.mkdir(parents=True, exist_ok=True) df = pd.DataFrame({'flow_id': flow_id, 'label': labels}) for (i, name) in enumerate(CANONICAL_FLOW_FEATURE_NAMES): df[name] = feats[:, i] df.to_parquet(out_path, compression='snappy', index=False) print(f'[extract_dataset] wrote {out_path} ({out_path.stat().st_size / 1000000.0:.2f} MB)') def _extract_single_pcap_worker(pcap_path_str: str, label: str, extra: dict, max_len: int, idle_timeout: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000) -> dict: t_start = _time.time() n_pkts = 0 n_flows = 0 results: list[tuple[np.ndarray, int, dict]] = [] chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'pcap-{Path(pcap_path_str).stem}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None def _counting_iter(pkt_iter): nonlocal n_pkts for pkt in pkt_iter: n_pkts += 1 yield pkt pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap) for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len): (sip, dip, sp, dp, proto) = fl.key_fwd meta = {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)} tok = _to_fixed_tensor(fl.tokens, max_len) ln = min(len(fl.tokens), max_len) if chunk_writer is not None: chunk_writer.add_labeled(tok, ln, meta, label, extra) else: results.append((tok, ln, meta)) n_flows += 1 elapsed = _time.time() - t_start return {'pcap': pcap_path_str, 'label': label, 'extra': extra, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed} def extract_labeled_pcaps(*, pcap_files_with_labels: list[tuple[Path, str, dict]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, T_full: int=256, idle_timeout: float=120.0, max_packets_per_pcap: int | None=None, n_jobs: int=0, extra_column_names: tuple[str, ...]=()) -> None: N_pcap = len(pcap_files_with_labels) print(f'[extract_labeled_pcaps] n_pcaps={N_pcap} T_full={T_full} extra_cols={extra_column_names}') for (p, lbl, extra) in pcap_files_with_labels[:10]: print(f' {lbl:<20s} {Path(p).name:<60s} extra={extra}') if N_pcap > 10: print(f' ... ({N_pcap - 10} more)') if n_jobs <= 0: n_jobs = min(N_pcap, os.cpu_count() or 1) print(f'[extract_labeled_pcaps] running {N_pcap} pcap(s) with {n_jobs} worker(s)') store_writer: PacketShardWriter | None = None spool_root: Path | None = None if out_store is not None: print(f'[extract_labeled_pcaps] sharded output enabled: {out_store} shard_size={shard_size:,}') store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True) if spool_dir is None: out_store_parent = Path(out_store).parent out_store_parent.mkdir(parents=True, exist_ok=True) spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent)) else: spool_root = Path(spool_dir) if spool_root.exists(): shutil.rmtree(spool_root) spool_root.mkdir(parents=True, exist_ok=True) print(f'[extract_labeled_pcaps] worker spool={spool_root} flush_size={worker_flush_size:,}') tok_chunks: list[np.ndarray] = [] len_chunks: list[np.ndarray] = [] meta_chunks: list[list[dict]] = [] label_chunks: list[np.ndarray] = [] extra_chunks: list[dict[str, list]] = [] total_flows = 0 def _flows_for_labeled_chunk(res: dict, meta_arr: list[dict], n: int) -> pd.DataFrame: cols = {'label': np.full(n, res['label'], dtype=object), 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)} for col in extra_column_names: cols[col] = np.full(n, res['extra'].get(col, ''), dtype=object) return pd.DataFrame(cols) def _append_labeled_spool_chunks(res: dict) -> None: chunks = res.get('chunks') or [] for chunk in chunks: tokens = np.load(chunk['tokens'], mmap_mode='r') flows = pd.read_parquet(chunk['meta']) if flows.empty: continue flows = flows.assign(__token_row=np.arange(len(flows), dtype=np.int64)) sort_keys = ['label', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'start_ts'] flows = flows.sort_values(sort_keys, kind='stable').reset_index(drop=True) order = flows['__token_row'].to_numpy(dtype=np.int64) lengths = flows['packet_length'].to_numpy(dtype=np.int32) flows = flows.drop(columns=['packet_length', '__token_row']) assert store_writer is not None store_writer.add_batch(np.asarray(tokens[order]), lengths, flows) def _absorb(res: dict, *, print_stats: bool=True) -> None: pcap_name = Path(res['pcap']).name lbl = res['label'] extra = res['extra'] if print_stats: print(f"[pcap:{pcap_name}] label={lbl} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)") if not res['results']: return n = len(res['results']) tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32) len_arr = np.empty(n, dtype=np.int32) meta_arr: list[dict] = [None] * n for (i, (tok, ln, meta)) in enumerate(res['results']): tok_arr[i] = tok len_arr[i] = ln meta_arr[i] = meta if store_writer is not None: flows = _flows_for_labeled_chunk(res, meta_arr, n) order = np.lexsort((flows['start_ts'].to_numpy(dtype=np.float64), flows['protocol'].to_numpy(dtype=np.int64), flows['dst_port'].to_numpy(dtype=np.int64), flows['src_port'].to_numpy(dtype=np.int64), flows['dst_ip'].to_numpy(dtype=object), flows['src_ip'].to_numpy(dtype=object), flows['label'].to_numpy(dtype=object))) store_writer.add_batch(tok_arr[order], len_arr[order], flows.iloc[order].reset_index(drop=True)) else: tok_chunks.append(tok_arr) len_chunks.append(len_arr) meta_chunks.append(meta_arr) label_chunks.append(np.full(n, lbl, dtype=object)) ex: dict[str, list] = {} for col in extra_column_names: val = extra.get(col, '') ex[col] = [val] * n extra_chunks.append(ex) if n_jobs <= 1: try: for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels): task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}') res = _extract_single_pcap_worker(str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size) _absorb(res) total_flows += int(res.get('n_flows', 0)) if store_writer is not None: _append_labeled_spool_chunks(res) finally: if spool_root is not None: shutil.rmtree(spool_root, ignore_errors=True) else: try: with ProcessPoolExecutor(max_workers=n_jobs) as pool: futs = [] for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels): task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}') futs.append(pool.submit(_extract_single_pcap_worker, str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size)) if store_writer is not None: completed: dict[str, dict] = {} for fut in as_completed(futs): res = fut.result() pcap_name = Path(res['pcap']).name print(f"[pcap:{pcap_name}] label={res['label']} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)") completed[str(res['pcap'])] = res for (p, _, _) in pcap_files_with_labels: res = completed.get(str(p)) if res is not None: total_flows += int(res.get('n_flows', 0)) _append_labeled_spool_chunks(res) else: for fut in as_completed(futs): _absorb(fut.result()) finally: if spool_root is not None: shutil.rmtree(spool_root, ignore_errors=True) if store_writer is not None: if total_flows == 0: raise RuntimeError('No flows emitted — check pcap contents.') store_writer.close() print(f'[extract_labeled_pcaps] wrote sharded store {out_store}') return if not tok_chunks: raise RuntimeError('No flows emitted — check pcap contents.') tokens = np.concatenate(tok_chunks, axis=0) lengths = np.concatenate(len_chunks, axis=0) meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk] labels = np.concatenate(label_chunks, axis=0) extra_dict: dict[str, list] = {col: [] for col in extra_column_names} for chunk in extra_chunks: for col in extra_column_names: extra_dict[col].extend(chunk[col]) del tok_chunks, len_chunks, meta_chunks, label_chunks, extra_chunks sip_arr = np.asarray([m['src_ip'] for m in meta_list], dtype=object) dip_arr = np.asarray([m['dst_ip'] for m in meta_list], dtype=object) sp_arr = np.asarray([m['src_port'] for m in meta_list], dtype=np.int64) dp_arr = np.asarray([m['dst_port'] for m in meta_list], dtype=np.int64) pr_arr = np.asarray([m['protocol'] for m in meta_list], dtype=np.int64) ts_arr = np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64) order = np.lexsort((ts_arr, pr_arr, dp_arr, sp_arr, dip_arr, sip_arr, labels)) tokens = tokens[order] lengths = lengths[order] labels = labels[order] meta_list = [meta_list[i] for i in order] for col in extra_column_names: extra_dict[col] = [extra_dict[col][i] for i in order] N = len(tokens) flow_id = np.arange(N, dtype=np.uint64) print(f'\n[extract_labeled_pcaps] total flows: {N:,}') print(f'[extract_labeled_pcaps] label distribution:') (ulabels, counts) = np.unique(labels, return_counts=True) for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]): print(f' {lbl:<40s} {cnt:>10,}') out_packets.parent.mkdir(parents=True, exist_ok=True) np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id) print(f'[extract_labeled_pcaps] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)') out_flows.parent.mkdir(parents=True, exist_ok=True) cols = {'flow_id': flow_id, 'label': labels.astype(str), 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)} for col in extra_column_names: cols[col] = np.asarray(extra_dict[col], dtype=object) flow_df = pd.DataFrame(cols) flow_df.to_parquet(out_flows, compression='snappy', index=False) print(f'[extract_labeled_pcaps] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB) cols={list(flow_df.columns)}') _write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels.astype(str), out_path=out_flows.parent / 'flow_features.parquet')