775 lines
41 KiB
Python
775 lines
41 KiB
Python
from __future__ import annotations
|
||
import os
|
||
import shutil
|
||
import socket
|
||
import sys
|
||
import tempfile
|
||
import time as _time
|
||
from collections import defaultdict
|
||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Iterator
|
||
import dpkt
|
||
import numpy as np
|
||
import pandas as pd
|
||
from scipy.optimize import linear_sum_assignment
|
||
_SCRIPT_DIR = Path(__file__).resolve().parent
|
||
_REPO_ROOT = _SCRIPT_DIR.parent
|
||
sys.path.insert(0, str(_REPO_ROOT / 'Packet_CFM'))
|
||
from packet_store import PacketShardWriter
|
||
PACKET_FEATURE_NAMES = ('log_size', 'log_dt_ms', 'direction', 'tcp_syn', 'tcp_fin', 'tcp_rst', 'tcp_psh', 'tcp_ack', 'log_win')
|
||
PACKET_D = len(PACKET_FEATURE_NAMES)
|
||
(FIN, SYN, RST, PSH, ACK) = (1, 2, 4, 8, 16)
|
||
|
||
@dataclass(slots=True)
|
||
class PacketRecord:
|
||
timestamp: float
|
||
src_ip: str
|
||
dst_ip: str
|
||
src_port: int
|
||
dst_port: int
|
||
protocol: int
|
||
tcp_flags: int
|
||
payload_len: int
|
||
header_len: int
|
||
total_len: int
|
||
window_size: int
|
||
|
||
def _try_open_pcap(f):
|
||
try:
|
||
return dpkt.pcap.Reader(f)
|
||
except ValueError:
|
||
f.seek(0)
|
||
return dpkt.pcapng.Reader(f)
|
||
|
||
def iter_packets(pcap_path: Path, max_packets: int | None=None) -> Iterator[PacketRecord]:
|
||
n = 0
|
||
with open(pcap_path, 'rb') as f:
|
||
reader = _try_open_pcap(f)
|
||
link_type = reader.datalink()
|
||
for (ts, buf) in reader:
|
||
try:
|
||
if link_type == dpkt.pcap.DLT_EN10MB:
|
||
eth = dpkt.ethernet.Ethernet(buf)
|
||
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
|
||
continue
|
||
ip = eth.data
|
||
elif link_type == dpkt.pcap.DLT_RAW:
|
||
ip = dpkt.ip.IP(buf)
|
||
elif link_type == dpkt.pcap.DLT_LINUX_SLL:
|
||
sll = dpkt.sll.SLL(buf)
|
||
if sll.ethtype != dpkt.ethernet.ETH_TYPE_IP:
|
||
continue
|
||
ip = sll.data
|
||
else:
|
||
continue
|
||
if not isinstance(ip, dpkt.ip.IP):
|
||
continue
|
||
src_ip = socket.inet_ntoa(ip.src)
|
||
dst_ip = socket.inet_ntoa(ip.dst)
|
||
transport = ip.data
|
||
if isinstance(transport, dpkt.tcp.TCP):
|
||
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=6, tcp_flags=transport.flags, payload_len=len(transport.data), header_len=transport.off * 4, total_len=ip.len, window_size=transport.win)
|
||
elif isinstance(transport, dpkt.udp.UDP):
|
||
yield PacketRecord(timestamp=ts, src_ip=src_ip, dst_ip=dst_ip, src_port=transport.sport, dst_port=transport.dport, protocol=17, tcp_flags=0, payload_len=len(transport.data), header_len=8, total_len=ip.len, window_size=0)
|
||
else:
|
||
continue
|
||
except (dpkt.NeedData, dpkt.UnpackError, AttributeError):
|
||
continue
|
||
n += 1
|
||
if max_packets is not None and n >= max_packets:
|
||
return
|
||
|
||
def _packet_token(pkt: PacketRecord, prev_ts: float | None, direction: int) -> np.ndarray:
|
||
dt_ms = 0.0 if prev_ts is None else max(0.0, (pkt.timestamp - prev_ts) * 1000.0)
|
||
syn = int(bool(pkt.tcp_flags & SYN))
|
||
fin = int(bool(pkt.tcp_flags & FIN))
|
||
rst = int(bool(pkt.tcp_flags & RST))
|
||
psh = int(bool(pkt.tcp_flags & PSH))
|
||
ack = int(bool(pkt.tcp_flags & ACK))
|
||
return np.array([float(np.log1p(max(pkt.total_len, 0))), float(np.log1p(dt_ms)), float(direction), syn, fin, rst, psh, ack, float(np.log1p(max(pkt.window_size, 0)))], dtype=np.float32)
|
||
|
||
class _TokenFlow:
|
||
__slots__ = ('key_fwd', 'start_ts', 'last_ts', 'fin_count', 'tokens', 'prev_ts', 'n_pkts')
|
||
|
||
def __init__(self, key_fwd: tuple, start_ts: float) -> None:
|
||
self.key_fwd = key_fwd
|
||
self.start_ts = start_ts
|
||
self.last_ts = start_ts
|
||
self.fin_count = 0
|
||
self.tokens: list[np.ndarray] = []
|
||
self.prev_ts: float | None = None
|
||
self.n_pkts: int = 0
|
||
|
||
def add(self, pkt: PacketRecord, is_forward: bool, max_len: int) -> None:
|
||
direction = 0 if is_forward else 1
|
||
if len(self.tokens) < max_len:
|
||
self.tokens.append(_packet_token(pkt, self.prev_ts, direction))
|
||
self.prev_ts = pkt.timestamp
|
||
self.last_ts = pkt.timestamp
|
||
self.n_pkts += 1
|
||
|
||
def stream_token_flows(packet_iter: Iterator[PacketRecord], idle_timeout: float, max_len: int, gc_every: int=200000) -> Iterator[_TokenFlow]:
|
||
active: dict[tuple, _TokenFlow] = {}
|
||
last_pkt_ts = 0.0
|
||
n_seen = 0
|
||
for pkt in packet_iter:
|
||
last_pkt_ts = pkt.timestamp
|
||
fwd_key = (pkt.src_ip, pkt.dst_ip, pkt.src_port, pkt.dst_port, pkt.protocol)
|
||
bwd_key = (pkt.dst_ip, pkt.src_ip, pkt.dst_port, pkt.src_port, pkt.protocol)
|
||
flow: _TokenFlow | None = None
|
||
key = fwd_key
|
||
is_forward = True
|
||
if fwd_key in active:
|
||
(flow, key, is_forward) = (active[fwd_key], fwd_key, True)
|
||
elif bwd_key in active:
|
||
(flow, key, is_forward) = (active[bwd_key], bwd_key, False)
|
||
if flow is not None and pkt.timestamp - flow.last_ts > idle_timeout:
|
||
old = active.pop(key)
|
||
yield old
|
||
flow = None
|
||
if flow is None:
|
||
flow = _TokenFlow(key_fwd=fwd_key, start_ts=pkt.timestamp)
|
||
key = fwd_key
|
||
is_forward = True
|
||
active[key] = flow
|
||
flow.add(pkt, is_forward, max_len)
|
||
if pkt.protocol == 6:
|
||
if pkt.tcp_flags & RST:
|
||
yield active.pop(key)
|
||
elif pkt.tcp_flags & FIN:
|
||
flow.fin_count += 1
|
||
if flow.fin_count >= 2:
|
||
yield active.pop(key)
|
||
n_seen += 1
|
||
if n_seen % gc_every == 0:
|
||
stale = [k for (k, fl) in active.items() if last_pkt_ts - fl.last_ts > idle_timeout]
|
||
for k in stale:
|
||
yield active.pop(k)
|
||
for fl in list(active.values()):
|
||
yield fl
|
||
active.clear()
|
||
|
||
def _canonical_key(src_ip: str, dst_ip: str, src_port: int, dst_port: int, proto: int) -> tuple:
|
||
a = (src_ip, src_port)
|
||
b = (dst_ip, dst_port)
|
||
if a <= b:
|
||
return (a[0], a[1], b[0], b[1], proto)
|
||
return (b[0], b[1], a[0], a[1], proto)
|
||
|
||
def _to_fixed_tensor(flow_tokens: list[np.ndarray], max_len: int) -> np.ndarray:
|
||
out = np.zeros((max_len, PACKET_D), dtype=np.float32)
|
||
n = min(len(flow_tokens), max_len)
|
||
if n > 0:
|
||
out[:n] = np.stack(flow_tokens[:n], axis=0)
|
||
return out
|
||
|
||
class _WorkerChunkWriter:
|
||
|
||
def __init__(self, root: Path, *, prefix: str, T_full: int, chunk_size: int) -> None:
|
||
self.root = Path(root)
|
||
self.root.mkdir(parents=True, exist_ok=True)
|
||
self.prefix = prefix
|
||
self.T_full = T_full
|
||
self.chunk_size = max(1, int(chunk_size))
|
||
self._tokens: list[np.ndarray] = []
|
||
self._records: list[dict] = []
|
||
self._next_chunk = 0
|
||
self.chunks: list[dict[str, str]] = []
|
||
|
||
def add_csv_match(self, row_i: int, tok: np.ndarray, ln: int, meta: dict) -> None:
|
||
rec = dict(meta)
|
||
rec['csv_row_idx'] = int(row_i)
|
||
rec['packet_length'] = int(ln)
|
||
self._add(tok, rec)
|
||
|
||
def add_labeled(self, tok: np.ndarray, ln: int, meta: dict, label: str, extra: dict) -> None:
|
||
rec = dict(meta)
|
||
rec['packet_length'] = int(ln)
|
||
rec['label'] = str(label)
|
||
for (k, v) in extra.items():
|
||
rec[str(k)] = v
|
||
self._add(tok, rec)
|
||
|
||
def close(self) -> list[dict[str, str]]:
|
||
if self._tokens:
|
||
self._flush()
|
||
return self.chunks
|
||
|
||
def _add(self, tok: np.ndarray, rec: dict) -> None:
|
||
self._tokens.append(tok.astype(np.float32, copy=False))
|
||
self._records.append(rec)
|
||
if len(self._tokens) >= self.chunk_size:
|
||
self._flush()
|
||
|
||
def _flush(self) -> None:
|
||
n = len(self._tokens)
|
||
tokens = np.empty((n, self.T_full, PACKET_D), dtype=np.float32)
|
||
for (i, tok) in enumerate(self._tokens):
|
||
tokens[i] = tok
|
||
stem = f'{self.prefix}-chunk-{self._next_chunk:06d}'
|
||
token_path = self.root / f'{stem}.npy'
|
||
meta_path = self.root / f'{stem}.parquet'
|
||
np.save(token_path, tokens, allow_pickle=False)
|
||
pd.DataFrame(self._records).to_parquet(meta_path, compression='snappy', index=False)
|
||
self.chunks.append({'tokens': str(token_path), 'meta': str(meta_path)})
|
||
self._tokens.clear()
|
||
self._records.clear()
|
||
self._next_chunk += 1
|
||
|
||
def _flow_meta(fl: _TokenFlow) -> dict:
|
||
(sip, dip, sp, dp, proto) = fl.key_fwd
|
||
return {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
|
||
|
||
def _build_stream_csv_index(csv_rows_for_day: dict[tuple, list[tuple[int, float]]]) -> dict[tuple, dict[str, np.ndarray]]:
|
||
out: dict[tuple, dict[str, np.ndarray]] = {}
|
||
for (ck, rows) in csv_rows_for_day.items():
|
||
finite = [(int(row_i), float(ts)) for (row_i, ts) in rows if not np.isnan(ts)]
|
||
if not finite:
|
||
continue
|
||
finite.sort(key=lambda x: (x[1], x[0]))
|
||
row_idx = np.asarray([r for (r, _) in finite], dtype=np.int64)
|
||
ts = np.asarray([t for (_, t) in finite], dtype=np.float64)
|
||
used = np.zeros(len(finite), dtype=bool)
|
||
out[ck] = {'row_idx': row_idx, 'ts': ts, 'used': used}
|
||
return out
|
||
|
||
def _nearest_unused_row(entry: dict[str, np.ndarray], ts: float, tolerance: float) -> tuple[int | None, float | None]:
|
||
csv_ts = entry['ts']
|
||
used = entry['used']
|
||
pos = int(np.searchsorted(csv_ts, ts, side='left'))
|
||
best_i: int | None = None
|
||
best_abs = float('inf')
|
||
j = pos - 1
|
||
while j >= 0:
|
||
diff = abs(float(csv_ts[j]) - ts)
|
||
if diff > tolerance:
|
||
break
|
||
if not bool(used[j]) and diff < best_abs:
|
||
best_i = j
|
||
best_abs = diff
|
||
j -= 1
|
||
j = pos
|
||
n = len(csv_ts)
|
||
while j < n:
|
||
diff = abs(float(csv_ts[j]) - ts)
|
||
if diff > tolerance:
|
||
break
|
||
if not bool(used[j]) and diff < best_abs:
|
||
best_i = j
|
||
best_abs = diff
|
||
j += 1
|
||
if best_i is None:
|
||
return (None, None)
|
||
used[best_i] = True
|
||
return (int(entry['row_idx'][best_i]), ts - float(csv_ts[best_i]))
|
||
|
||
def _extract_day_worker(day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000, match_strategy: str='hungarian') -> dict:
|
||
if match_strategy == 'stream_nearest':
|
||
if spool_dir is None:
|
||
raise ValueError('stream_nearest requires spool_dir')
|
||
return _extract_day_worker_stream_nearest(day=day, pcap_files_str=pcap_files_str, csv_rows_for_day=csv_rows_for_day, max_len=max_len, idle_timeout=idle_timeout, time_tolerance_seconds=time_tolerance_seconds, max_packets_per_pcap=max_packets_per_pcap, spool_dir=spool_dir, worker_flush_size=worker_flush_size)
|
||
pcap_by_key: dict[tuple, list[_TokenFlow]] = defaultdict(list)
|
||
n_pkts = 0
|
||
t_start = _time.time()
|
||
|
||
def _counting_iter(pkt_iter):
|
||
nonlocal n_pkts
|
||
for pkt in pkt_iter:
|
||
n_pkts += 1
|
||
yield pkt
|
||
for pcap_path_str in pcap_files_str:
|
||
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
|
||
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
|
||
(sip, dip, sp, dp, proto) = fl.key_fwd
|
||
ck = _canonical_key(sip, dip, sp, dp, proto)
|
||
pcap_by_key[ck].append(fl)
|
||
n_flows = sum((len(v) for v in pcap_by_key.values()))
|
||
elapsed = _time.time() - t_start
|
||
BIG = time_tolerance_seconds * 1000.0
|
||
results: list[tuple[int, np.ndarray, int, dict]] = []
|
||
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
|
||
n_joined = 0
|
||
n_collision = 0
|
||
n_csv_keys = len(csv_rows_for_day)
|
||
n_intersection = 0
|
||
|
||
def _emit(row_i: int, fl: _TokenFlow) -> None:
|
||
nonlocal n_joined
|
||
tok = _to_fixed_tensor(fl.tokens, max_len)
|
||
ln = min(len(fl.tokens), max_len)
|
||
meta = _flow_meta(fl)
|
||
if chunk_writer is not None:
|
||
chunk_writer.add_csv_match(row_i, tok, ln, meta)
|
||
else:
|
||
results.append((row_i, tok, ln, meta))
|
||
n_joined += 1
|
||
for (ck, rows) in sorted(csv_rows_for_day.items(), key=lambda kv: kv[1][0][0]):
|
||
if ck not in pcap_by_key:
|
||
continue
|
||
n_intersection += 1
|
||
pcap_flows = pcap_by_key[ck]
|
||
csv_ts = np.array([r[1] for r in rows], dtype=np.float64)
|
||
pcap_ts = np.array([fl.start_ts for fl in pcap_flows], dtype=np.float64)
|
||
(n_csv, n_pcap) = (len(csv_ts), len(pcap_ts))
|
||
if n_csv == 1 and n_pcap == 1:
|
||
row_i = rows[0][0]
|
||
ts = csv_ts[0]
|
||
fl = pcap_flows[0]
|
||
if not np.isnan(ts) and abs(fl.start_ts - ts) <= time_tolerance_seconds:
|
||
_emit(row_i, fl)
|
||
else:
|
||
n_collision += 1
|
||
continue
|
||
cost = np.abs(csv_ts[:, None] - pcap_ts[None, :])
|
||
cost[np.isnan(cost)] = BIG
|
||
cost[cost > time_tolerance_seconds] = BIG
|
||
(row_ind, col_ind) = linear_sum_assignment(cost)
|
||
for (r, c) in zip(row_ind, col_ind):
|
||
if cost[r, c] >= BIG:
|
||
n_collision += 1
|
||
continue
|
||
row_i = rows[r][0]
|
||
fl = pcap_flows[c]
|
||
_emit(row_i, fl)
|
||
deltas: list[float] = []
|
||
sampled = 0
|
||
for (ck, rows) in csv_rows_for_day.items():
|
||
if sampled >= 10000 or ck not in pcap_by_key:
|
||
if sampled >= 10000:
|
||
break
|
||
continue
|
||
(row_i, ts) = rows[0]
|
||
if np.isnan(ts):
|
||
continue
|
||
deltas.append(pcap_by_key[ck][0].start_ts - ts)
|
||
sampled += 1
|
||
return {'day': day, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(pcap_by_key), 'n_csv_keys': n_csv_keys, 'n_intersection': n_intersection, 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': match_strategy}
|
||
|
||
def _extract_day_worker_stream_nearest(*, day: str, pcap_files_str: list[str], csv_rows_for_day: dict[tuple, list[tuple[int, float]]], max_len: int, idle_timeout: float, time_tolerance_seconds: float, max_packets_per_pcap: int | None, spool_dir: str, worker_flush_size: int) -> dict:
|
||
t_start = _time.time()
|
||
n_pkts = 0
|
||
n_flows = 0
|
||
n_joined = 0
|
||
n_collision = 0
|
||
seen_pcap_keys: set[tuple] = set()
|
||
intersected_keys: set[tuple] = set()
|
||
deltas: list[float] = []
|
||
csv_index = _build_stream_csv_index(csv_rows_for_day)
|
||
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'day-{day}', T_full=max_len, chunk_size=worker_flush_size)
|
||
|
||
def _counting_iter(pkt_iter):
|
||
nonlocal n_pkts
|
||
for pkt in pkt_iter:
|
||
n_pkts += 1
|
||
yield pkt
|
||
for pcap_path_str in pcap_files_str:
|
||
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
|
||
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
|
||
n_flows += 1
|
||
(sip, dip, sp, dp, proto) = fl.key_fwd
|
||
ck = _canonical_key(sip, dip, sp, dp, proto)
|
||
seen_pcap_keys.add(ck)
|
||
entry = csv_index.get(ck)
|
||
if entry is None:
|
||
continue
|
||
intersected_keys.add(ck)
|
||
(row_i, delta) = _nearest_unused_row(entry, float(fl.start_ts), time_tolerance_seconds)
|
||
if row_i is None:
|
||
n_collision += 1
|
||
continue
|
||
tok = _to_fixed_tensor(fl.tokens, max_len)
|
||
ln = min(len(fl.tokens), max_len)
|
||
chunk_writer.add_csv_match(row_i, tok, ln, _flow_meta(fl))
|
||
n_joined += 1
|
||
if delta is not None and len(deltas) < 10000:
|
||
deltas.append(float(delta))
|
||
elapsed = _time.time() - t_start
|
||
return {'day': day, 'results': [], 'chunks': chunk_writer.close(), 'n_joined': n_joined, 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed, 'n_pcap_keys': len(seen_pcap_keys), 'n_csv_keys': len(csv_rows_for_day), 'n_intersection': len(intersected_keys), 'n_collision': n_collision, 'deltas': deltas, 'match_strategy': 'stream_nearest'}
|
||
|
||
def _print_day_stats(res: dict) -> None:
|
||
day = res['day']
|
||
strategy = res.get('match_strategy', 'hungarian')
|
||
print(f"[{day}] {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s match={strategy} ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
|
||
print(f" pcap_keys={res['n_pcap_keys']:,} csv_keys={res['n_csv_keys']:,} intersection={res['n_intersection']:,} joined={int(res.get('n_joined', len(res.get('results', ())))):,} within-key-miss={res['n_collision']:,}")
|
||
deltas = res.get('deltas') or []
|
||
if deltas:
|
||
arr = np.asarray(deltas, dtype=np.float64)
|
||
print(f' time-delta (pcap_start - csv_ts), seconds: median={np.median(arr):+.2f} mean={arr.mean():+.2f} std={arr.std():.2f} p05={np.percentile(arr, 5):+.2f} p95={np.percentile(arr, 95):+.2f}')
|
||
med = float(np.median(arr))
|
||
if abs(med) > 2.0:
|
||
print(f' -> median |{med:.1f}s| > 2s: rerun with --time-offset {med:.0f}')
|
||
|
||
def extract_dataset(*, csv_rows_by_day: dict[str, dict[tuple, list[tuple[int, float]]]], labels_by_row: np.ndarray, pcap_files_by_day: dict[str, list[Path]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, match_strategy: str | None=None, T_full: int=256, idle_timeout: float=120.0, time_tolerance_seconds: float=2.0, max_packets_per_pcap: int | None=None, n_jobs: int=0) -> None:
|
||
N_csv = len(labels_by_row)
|
||
print(f'[extract_dataset] N_csv={N_csv:,} T_full={T_full} days={sorted(csv_rows_by_day.keys())}')
|
||
if match_strategy is None:
|
||
match_strategy = 'stream_nearest' if out_store is not None else 'hungarian'
|
||
if match_strategy not in ('hungarian', 'stream_nearest'):
|
||
raise ValueError("match_strategy must be 'hungarian' or 'stream_nearest'")
|
||
if match_strategy == 'stream_nearest' and out_store is None:
|
||
raise ValueError('stream_nearest is only supported with --out-store')
|
||
print(f'[extract_dataset] match_strategy={match_strategy}')
|
||
tasks: list[tuple] = []
|
||
for (day, rows_dict) in csv_rows_by_day.items():
|
||
pcap_files = pcap_files_by_day.get(day, [])
|
||
if not pcap_files:
|
||
print(f'[{day}] NO pcap files — skipping ({len(rows_dict):,} CSV keys unmatched)')
|
||
continue
|
||
tasks.append((day, [str(p) for p in pcap_files], dict(rows_dict)))
|
||
if not tasks:
|
||
raise RuntimeError('No days with pcap files — nothing to extract.')
|
||
if n_jobs <= 0:
|
||
n_jobs = min(len(tasks), os.cpu_count() or 1)
|
||
print(f'[extract_dataset] running {len(tasks)} day(s) with {n_jobs} worker(s)')
|
||
store_writer: PacketShardWriter | None = None
|
||
spool_root: Path | None = None
|
||
if out_store is not None:
|
||
print(f'[extract_dataset] sharded output enabled: {out_store} shard_size={shard_size:,}')
|
||
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
|
||
if spool_dir is None:
|
||
out_store_parent = Path(out_store).parent
|
||
out_store_parent.mkdir(parents=True, exist_ok=True)
|
||
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
|
||
else:
|
||
spool_root = Path(spool_dir)
|
||
if spool_root.exists():
|
||
shutil.rmtree(spool_root)
|
||
spool_root.mkdir(parents=True, exist_ok=True)
|
||
print(f'[extract_dataset] worker spool={spool_root} flush_size={worker_flush_size:,}')
|
||
tok_chunks: list[np.ndarray] = []
|
||
len_chunks: list[np.ndarray] = []
|
||
row_chunks: list[np.ndarray] = []
|
||
meta_chunks: list[list[dict]] = []
|
||
total_joined = 0
|
||
|
||
def _materialize_results(results: list[tuple[int, np.ndarray, int, dict]]) -> tuple[np.ndarray, np.ndarray, np.ndarray, list[dict]]:
|
||
results = sorted(results, key=lambda x: x[0])
|
||
n = len(results)
|
||
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
|
||
len_arr = np.empty(n, dtype=np.int32)
|
||
row_arr = np.empty(n, dtype=np.int64)
|
||
meta_arr: list[dict] = [None] * n
|
||
for (i, (row_i, tok, ln, meta)) in enumerate(results):
|
||
tok_arr[i] = tok
|
||
len_arr[i] = ln
|
||
row_arr[i] = row_i
|
||
meta_arr[i] = meta
|
||
return (tok_arr, len_arr, row_arr, meta_arr)
|
||
|
||
def _flows_from_meta(row_arr: np.ndarray, meta_arr: list[dict]) -> pd.DataFrame:
|
||
labels = labels_by_row[row_arr].astype(str)
|
||
return pd.DataFrame({'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)})
|
||
|
||
def _append_spool_chunks(res: dict) -> None:
|
||
chunks = res.get('chunks') or []
|
||
for chunk in chunks:
|
||
tokens = np.load(chunk['tokens'], mmap_mode='r')
|
||
meta_df = pd.read_parquet(chunk['meta'])
|
||
if meta_df.empty:
|
||
continue
|
||
meta_df = meta_df.assign(__token_row=np.arange(len(meta_df), dtype=np.int64))
|
||
meta_df = meta_df.sort_values('csv_row_idx', kind='stable').reset_index(drop=True)
|
||
row_arr = meta_df['csv_row_idx'].to_numpy(dtype=np.int64)
|
||
lengths = meta_df['packet_length'].to_numpy(dtype=np.int32)
|
||
order = meta_df['__token_row'].to_numpy(dtype=np.int64)
|
||
labels = labels_by_row[row_arr].astype(str)
|
||
flows = pd.DataFrame({'label': labels, 'start_ts': meta_df['start_ts'].to_numpy(dtype=np.float64), 'src_ip': meta_df['src_ip'].to_numpy(dtype=object), 'dst_ip': meta_df['dst_ip'].to_numpy(dtype=object), 'src_port': meta_df['src_port'].to_numpy(dtype=np.uint32), 'dst_port': meta_df['dst_port'].to_numpy(dtype=np.uint32), 'protocol': meta_df['protocol'].to_numpy(dtype=np.uint8), 'n_pkts': meta_df['n_pkts'].to_numpy(dtype=np.uint32)})
|
||
assert store_writer is not None
|
||
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
|
||
|
||
def _absorb(res: dict, *, print_stats: bool=True) -> None:
|
||
if print_stats:
|
||
_print_day_stats(res)
|
||
results = res['results']
|
||
if not results:
|
||
return
|
||
(tok_arr, len_arr, row_arr, meta_arr) = _materialize_results(results)
|
||
if store_writer is not None:
|
||
store_writer.add_batch(tok_arr, len_arr, _flows_from_meta(row_arr, meta_arr))
|
||
else:
|
||
tok_chunks.append(tok_arr)
|
||
len_chunks.append(len_arr)
|
||
row_chunks.append(row_arr)
|
||
meta_chunks.append(meta_arr)
|
||
if n_jobs <= 1:
|
||
try:
|
||
for (i, (day, pcaps, rows)) in enumerate(tasks):
|
||
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
|
||
res = _extract_day_worker(day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy)
|
||
_print_day_stats(res)
|
||
total_joined += int(res.get('n_joined', 0))
|
||
if store_writer is not None:
|
||
_append_spool_chunks(res)
|
||
else:
|
||
_absorb(res, print_stats=False)
|
||
finally:
|
||
if spool_root is not None:
|
||
shutil.rmtree(spool_root, ignore_errors=True)
|
||
else:
|
||
try:
|
||
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
|
||
futs = []
|
||
for (i, (day, pcaps, rows)) in enumerate(tasks):
|
||
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{day}')
|
||
futs.append(pool.submit(_extract_day_worker, day, pcaps, rows, T_full, idle_timeout, time_tolerance_seconds, max_packets_per_pcap, task_spool, worker_flush_size, match_strategy))
|
||
if store_writer is not None:
|
||
completed: dict[str, dict] = {}
|
||
for fut in as_completed(futs):
|
||
res = fut.result()
|
||
_print_day_stats(res)
|
||
completed[res['day']] = res
|
||
for (day, _, _) in tasks:
|
||
if day in completed:
|
||
total_joined += int(completed[day].get('n_joined', 0))
|
||
_append_spool_chunks(completed[day])
|
||
else:
|
||
for fut in as_completed(futs):
|
||
_absorb(fut.result())
|
||
finally:
|
||
if spool_root is not None:
|
||
shutil.rmtree(spool_root, ignore_errors=True)
|
||
if store_writer is not None:
|
||
if total_joined == 0:
|
||
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
|
||
store_writer.close()
|
||
print(f'[extract_dataset] wrote sharded store {out_store}')
|
||
return
|
||
if not tok_chunks:
|
||
raise RuntimeError('No matched flows — check timestamps (--time-offset) and pcap×CSV correspondence.')
|
||
tokens = np.concatenate(tok_chunks, axis=0)
|
||
lengths = np.concatenate(len_chunks, axis=0)
|
||
csv_rows = np.concatenate(row_chunks, axis=0)
|
||
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
|
||
del tok_chunks, len_chunks, row_chunks, meta_chunks
|
||
order = np.argsort(csv_rows, kind='stable')
|
||
tokens = tokens[order]
|
||
lengths = lengths[order]
|
||
csv_rows = csv_rows[order]
|
||
meta_list = [meta_list[i] for i in order]
|
||
N_matched = len(tokens)
|
||
labels = labels_by_row[csv_rows].astype(str)
|
||
flow_id = np.arange(N_matched, dtype=np.uint64)
|
||
print(f'\n[extract_dataset] matched {N_matched:,}/{N_csv:,} ({100.0 * N_matched / max(N_csv, 1):.2f}%)')
|
||
print(f'[extract_dataset] label distribution (matched rows):')
|
||
(ulabels, counts) = np.unique(labels, return_counts=True)
|
||
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
|
||
print(f' {lbl:<40s} {cnt:>10,}')
|
||
out_packets.parent.mkdir(parents=True, exist_ok=True)
|
||
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
|
||
print(f'[extract_dataset] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
|
||
out_flows.parent.mkdir(parents=True, exist_ok=True)
|
||
flow_df = pd.DataFrame({'flow_id': flow_id, 'label': labels, 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)})
|
||
flow_df.to_parquet(out_flows, compression='snappy', index=False)
|
||
print(f'[extract_dataset] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB)')
|
||
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels, out_path=out_flows.parent / 'flow_features.parquet')
|
||
|
||
def _write_canonical_flow_features(*, tokens: np.ndarray, lengths: np.ndarray, flow_id: np.ndarray, labels: np.ndarray, out_path: Path) -> None:
|
||
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
|
||
from common.data_contract import CANONICAL_FLOW_FEATURE_NAMES, compute_flow_features_from_packets
|
||
print(f'[extract_dataset] computing canonical {len(CANONICAL_FLOW_FEATURE_NAMES)}-d flow features from packet tokens ...')
|
||
feats = compute_flow_features_from_packets(tokens, lengths)
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
df = pd.DataFrame({'flow_id': flow_id, 'label': labels})
|
||
for (i, name) in enumerate(CANONICAL_FLOW_FEATURE_NAMES):
|
||
df[name] = feats[:, i]
|
||
df.to_parquet(out_path, compression='snappy', index=False)
|
||
print(f'[extract_dataset] wrote {out_path} ({out_path.stat().st_size / 1000000.0:.2f} MB)')
|
||
|
||
def _extract_single_pcap_worker(pcap_path_str: str, label: str, extra: dict, max_len: int, idle_timeout: float, max_packets_per_pcap: int | None, spool_dir: str | None=None, worker_flush_size: int=10000) -> dict:
|
||
t_start = _time.time()
|
||
n_pkts = 0
|
||
n_flows = 0
|
||
results: list[tuple[np.ndarray, int, dict]] = []
|
||
chunk_writer = _WorkerChunkWriter(Path(spool_dir), prefix=f'pcap-{Path(pcap_path_str).stem}', T_full=max_len, chunk_size=worker_flush_size) if spool_dir is not None else None
|
||
|
||
def _counting_iter(pkt_iter):
|
||
nonlocal n_pkts
|
||
for pkt in pkt_iter:
|
||
n_pkts += 1
|
||
yield pkt
|
||
pkt_iter = iter_packets(Path(pcap_path_str), max_packets=max_packets_per_pcap)
|
||
for fl in stream_token_flows(_counting_iter(pkt_iter), idle_timeout=idle_timeout, max_len=max_len):
|
||
(sip, dip, sp, dp, proto) = fl.key_fwd
|
||
meta = {'start_ts': float(fl.start_ts), 'src_ip': str(sip), 'dst_ip': str(dip), 'src_port': int(sp), 'dst_port': int(dp), 'protocol': int(proto), 'n_pkts': int(fl.n_pkts)}
|
||
tok = _to_fixed_tensor(fl.tokens, max_len)
|
||
ln = min(len(fl.tokens), max_len)
|
||
if chunk_writer is not None:
|
||
chunk_writer.add_labeled(tok, ln, meta, label, extra)
|
||
else:
|
||
results.append((tok, ln, meta))
|
||
n_flows += 1
|
||
elapsed = _time.time() - t_start
|
||
return {'pcap': pcap_path_str, 'label': label, 'extra': extra, 'results': results, 'chunks': [] if chunk_writer is None else chunk_writer.close(), 'n_pkts': n_pkts, 'n_flows': n_flows, 'elapsed': elapsed}
|
||
|
||
def extract_labeled_pcaps(*, pcap_files_with_labels: list[tuple[Path, str, dict]], out_packets: Path, out_flows: Path, out_store: Path | None=None, shard_size: int=100000, worker_flush_size: int=10000, spool_dir: Path | None=None, T_full: int=256, idle_timeout: float=120.0, max_packets_per_pcap: int | None=None, n_jobs: int=0, extra_column_names: tuple[str, ...]=()) -> None:
|
||
N_pcap = len(pcap_files_with_labels)
|
||
print(f'[extract_labeled_pcaps] n_pcaps={N_pcap} T_full={T_full} extra_cols={extra_column_names}')
|
||
for (p, lbl, extra) in pcap_files_with_labels[:10]:
|
||
print(f' {lbl:<20s} {Path(p).name:<60s} extra={extra}')
|
||
if N_pcap > 10:
|
||
print(f' ... ({N_pcap - 10} more)')
|
||
if n_jobs <= 0:
|
||
n_jobs = min(N_pcap, os.cpu_count() or 1)
|
||
print(f'[extract_labeled_pcaps] running {N_pcap} pcap(s) with {n_jobs} worker(s)')
|
||
store_writer: PacketShardWriter | None = None
|
||
spool_root: Path | None = None
|
||
if out_store is not None:
|
||
print(f'[extract_labeled_pcaps] sharded output enabled: {out_store} shard_size={shard_size:,}')
|
||
store_writer = PacketShardWriter(out_store, shard_size=shard_size, T_full=T_full, D=PACKET_D, overwrite=True)
|
||
if spool_dir is None:
|
||
out_store_parent = Path(out_store).parent
|
||
out_store_parent.mkdir(parents=True, exist_ok=True)
|
||
spool_root = Path(tempfile.mkdtemp(prefix=f'.{Path(out_store).name}.spool.', dir=out_store_parent))
|
||
else:
|
||
spool_root = Path(spool_dir)
|
||
if spool_root.exists():
|
||
shutil.rmtree(spool_root)
|
||
spool_root.mkdir(parents=True, exist_ok=True)
|
||
print(f'[extract_labeled_pcaps] worker spool={spool_root} flush_size={worker_flush_size:,}')
|
||
tok_chunks: list[np.ndarray] = []
|
||
len_chunks: list[np.ndarray] = []
|
||
meta_chunks: list[list[dict]] = []
|
||
label_chunks: list[np.ndarray] = []
|
||
extra_chunks: list[dict[str, list]] = []
|
||
total_flows = 0
|
||
|
||
def _flows_for_labeled_chunk(res: dict, meta_arr: list[dict], n: int) -> pd.DataFrame:
|
||
cols = {'label': np.full(n, res['label'], dtype=object), 'start_ts': np.asarray([m['start_ts'] for m in meta_arr], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_arr], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_arr], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_arr], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_arr], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_arr], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_arr], dtype=np.uint32)}
|
||
for col in extra_column_names:
|
||
cols[col] = np.full(n, res['extra'].get(col, ''), dtype=object)
|
||
return pd.DataFrame(cols)
|
||
|
||
def _append_labeled_spool_chunks(res: dict) -> None:
|
||
chunks = res.get('chunks') or []
|
||
for chunk in chunks:
|
||
tokens = np.load(chunk['tokens'], mmap_mode='r')
|
||
flows = pd.read_parquet(chunk['meta'])
|
||
if flows.empty:
|
||
continue
|
||
flows = flows.assign(__token_row=np.arange(len(flows), dtype=np.int64))
|
||
sort_keys = ['label', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'start_ts']
|
||
flows = flows.sort_values(sort_keys, kind='stable').reset_index(drop=True)
|
||
order = flows['__token_row'].to_numpy(dtype=np.int64)
|
||
lengths = flows['packet_length'].to_numpy(dtype=np.int32)
|
||
flows = flows.drop(columns=['packet_length', '__token_row'])
|
||
assert store_writer is not None
|
||
store_writer.add_batch(np.asarray(tokens[order]), lengths, flows)
|
||
|
||
def _absorb(res: dict, *, print_stats: bool=True) -> None:
|
||
pcap_name = Path(res['pcap']).name
|
||
lbl = res['label']
|
||
extra = res['extra']
|
||
if print_stats:
|
||
print(f"[pcap:{pcap_name}] label={lbl} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
|
||
if not res['results']:
|
||
return
|
||
n = len(res['results'])
|
||
tok_arr = np.empty((n, T_full, PACKET_D), dtype=np.float32)
|
||
len_arr = np.empty(n, dtype=np.int32)
|
||
meta_arr: list[dict] = [None] * n
|
||
for (i, (tok, ln, meta)) in enumerate(res['results']):
|
||
tok_arr[i] = tok
|
||
len_arr[i] = ln
|
||
meta_arr[i] = meta
|
||
if store_writer is not None:
|
||
flows = _flows_for_labeled_chunk(res, meta_arr, n)
|
||
order = np.lexsort((flows['start_ts'].to_numpy(dtype=np.float64), flows['protocol'].to_numpy(dtype=np.int64), flows['dst_port'].to_numpy(dtype=np.int64), flows['src_port'].to_numpy(dtype=np.int64), flows['dst_ip'].to_numpy(dtype=object), flows['src_ip'].to_numpy(dtype=object), flows['label'].to_numpy(dtype=object)))
|
||
store_writer.add_batch(tok_arr[order], len_arr[order], flows.iloc[order].reset_index(drop=True))
|
||
else:
|
||
tok_chunks.append(tok_arr)
|
||
len_chunks.append(len_arr)
|
||
meta_chunks.append(meta_arr)
|
||
label_chunks.append(np.full(n, lbl, dtype=object))
|
||
ex: dict[str, list] = {}
|
||
for col in extra_column_names:
|
||
val = extra.get(col, '')
|
||
ex[col] = [val] * n
|
||
extra_chunks.append(ex)
|
||
if n_jobs <= 1:
|
||
try:
|
||
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
|
||
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
|
||
res = _extract_single_pcap_worker(str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size)
|
||
_absorb(res)
|
||
total_flows += int(res.get('n_flows', 0))
|
||
if store_writer is not None:
|
||
_append_labeled_spool_chunks(res)
|
||
finally:
|
||
if spool_root is not None:
|
||
shutil.rmtree(spool_root, ignore_errors=True)
|
||
else:
|
||
try:
|
||
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
|
||
futs = []
|
||
for (i, (p, lbl, extra)) in enumerate(pcap_files_with_labels):
|
||
task_spool = None if spool_root is None else str(spool_root / f'task-{i:04d}-{Path(p).stem}')
|
||
futs.append(pool.submit(_extract_single_pcap_worker, str(p), lbl, extra, T_full, idle_timeout, max_packets_per_pcap, task_spool, worker_flush_size))
|
||
if store_writer is not None:
|
||
completed: dict[str, dict] = {}
|
||
for fut in as_completed(futs):
|
||
res = fut.result()
|
||
pcap_name = Path(res['pcap']).name
|
||
print(f"[pcap:{pcap_name}] label={res['label']} {res['n_pkts']:,} pkts → {res['n_flows']:,} flows in {res['elapsed']:.1f}s ({res['n_pkts'] / max(res['elapsed'], 0.001) / 1000000.0:.2f}M pkts/s)")
|
||
completed[str(res['pcap'])] = res
|
||
for (p, _, _) in pcap_files_with_labels:
|
||
res = completed.get(str(p))
|
||
if res is not None:
|
||
total_flows += int(res.get('n_flows', 0))
|
||
_append_labeled_spool_chunks(res)
|
||
else:
|
||
for fut in as_completed(futs):
|
||
_absorb(fut.result())
|
||
finally:
|
||
if spool_root is not None:
|
||
shutil.rmtree(spool_root, ignore_errors=True)
|
||
if store_writer is not None:
|
||
if total_flows == 0:
|
||
raise RuntimeError('No flows emitted — check pcap contents.')
|
||
store_writer.close()
|
||
print(f'[extract_labeled_pcaps] wrote sharded store {out_store}')
|
||
return
|
||
if not tok_chunks:
|
||
raise RuntimeError('No flows emitted — check pcap contents.')
|
||
tokens = np.concatenate(tok_chunks, axis=0)
|
||
lengths = np.concatenate(len_chunks, axis=0)
|
||
meta_list: list[dict] = [m for chunk in meta_chunks for m in chunk]
|
||
labels = np.concatenate(label_chunks, axis=0)
|
||
extra_dict: dict[str, list] = {col: [] for col in extra_column_names}
|
||
for chunk in extra_chunks:
|
||
for col in extra_column_names:
|
||
extra_dict[col].extend(chunk[col])
|
||
del tok_chunks, len_chunks, meta_chunks, label_chunks, extra_chunks
|
||
sip_arr = np.asarray([m['src_ip'] for m in meta_list], dtype=object)
|
||
dip_arr = np.asarray([m['dst_ip'] for m in meta_list], dtype=object)
|
||
sp_arr = np.asarray([m['src_port'] for m in meta_list], dtype=np.int64)
|
||
dp_arr = np.asarray([m['dst_port'] for m in meta_list], dtype=np.int64)
|
||
pr_arr = np.asarray([m['protocol'] for m in meta_list], dtype=np.int64)
|
||
ts_arr = np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64)
|
||
order = np.lexsort((ts_arr, pr_arr, dp_arr, sp_arr, dip_arr, sip_arr, labels))
|
||
tokens = tokens[order]
|
||
lengths = lengths[order]
|
||
labels = labels[order]
|
||
meta_list = [meta_list[i] for i in order]
|
||
for col in extra_column_names:
|
||
extra_dict[col] = [extra_dict[col][i] for i in order]
|
||
N = len(tokens)
|
||
flow_id = np.arange(N, dtype=np.uint64)
|
||
print(f'\n[extract_labeled_pcaps] total flows: {N:,}')
|
||
print(f'[extract_labeled_pcaps] label distribution:')
|
||
(ulabels, counts) = np.unique(labels, return_counts=True)
|
||
for (lbl, cnt) in sorted(zip(ulabels, counts), key=lambda x: -x[1]):
|
||
print(f' {lbl:<40s} {cnt:>10,}')
|
||
out_packets.parent.mkdir(parents=True, exist_ok=True)
|
||
np.savez_compressed(out_packets, packet_tokens=tokens, packet_lengths=lengths, flow_id=flow_id)
|
||
print(f'[extract_labeled_pcaps] wrote {out_packets} ({out_packets.stat().st_size / 1000000000.0:.2f} GB)')
|
||
out_flows.parent.mkdir(parents=True, exist_ok=True)
|
||
cols = {'flow_id': flow_id, 'label': labels.astype(str), 'start_ts': np.asarray([m['start_ts'] for m in meta_list], dtype=np.float64), 'src_ip': np.asarray([m['src_ip'] for m in meta_list], dtype=object), 'dst_ip': np.asarray([m['dst_ip'] for m in meta_list], dtype=object), 'src_port': np.asarray([m['src_port'] for m in meta_list], dtype=np.uint32), 'dst_port': np.asarray([m['dst_port'] for m in meta_list], dtype=np.uint32), 'protocol': np.asarray([m['protocol'] for m in meta_list], dtype=np.uint8), 'n_pkts': np.asarray([m['n_pkts'] for m in meta_list], dtype=np.uint32)}
|
||
for col in extra_column_names:
|
||
cols[col] = np.asarray(extra_dict[col], dtype=object)
|
||
flow_df = pd.DataFrame(cols)
|
||
flow_df.to_parquet(out_flows, compression='snappy', index=False)
|
||
print(f'[extract_labeled_pcaps] wrote {out_flows} ({out_flows.stat().st_size / 1000000.0:.2f} MB) cols={list(flow_df.columns)}')
|
||
_write_canonical_flow_features(tokens=tokens, lengths=lengths, flow_id=flow_id, labels=labels.astype(str), out_path=out_flows.parent / 'flow_features.parquet')
|