276 lines
15 KiB
Python
276 lines
15 KiB
Python
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import numpy as np
|
|
import pandas as pd
|
|
import sys as _sys
|
|
from pathlib import Path as _Path
|
|
_sys.path.insert(0, str(_Path(__file__).resolve().parents[1]))
|
|
from common.data_contract import PACKET_FEATURE_NAMES, PACKET_CONTINUOUS_CHANNEL_IDX as CONTINUOUS_CHANNEL_IDX, PACKET_BINARY_CHANNEL_IDX as BINARY_CHANNEL_IDX, canonical_5tuple as _canonical_key, fit_packet_stats as _fit_packet_stats, zscore as _zscore, apply_mixed_dequant as _apply_mixed_dequant
|
|
DEFAULT_FLOW_META_COLUMNS = {'flow_id', 'label', 'day', 'service', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'timestamp', 'start_ts', 'n_pkts'}
|
|
DERIVED_FLOW_FEATURE_NAMES = ('log_len', 'fwd_frac', 'bwd_frac', 'log_size_mean', 'log_size_std', 'log_size_min', 'log_size_max', 'log_dt_mean', 'log_dt_std', 'log_dt_max', 'syn_frac', 'fin_frac', 'rst_frac', 'psh_frac', 'ack_frac', 'log_win_mean')
|
|
|
|
@dataclass
|
|
class UnifiedData:
|
|
train_flow: np.ndarray
|
|
val_flow: np.ndarray
|
|
attack_flow: np.ndarray
|
|
train_packets: np.ndarray
|
|
val_packets: np.ndarray
|
|
attack_packets: np.ndarray
|
|
train_len: np.ndarray
|
|
val_len: np.ndarray
|
|
attack_len: np.ndarray
|
|
attack_labels: np.ndarray
|
|
packet_mean: np.ndarray
|
|
packet_std: np.ndarray
|
|
flow_mean: np.ndarray
|
|
flow_std: np.ndarray
|
|
packet_preprocess: str
|
|
flow_feature_names: tuple[str, ...]
|
|
packet_feature_names: tuple[str, ...] = PACKET_FEATURE_NAMES
|
|
|
|
@property
|
|
def T(self) -> int:
|
|
return int(self.train_packets.shape[1])
|
|
|
|
@property
|
|
def packet_dim(self) -> int:
|
|
return int(self.train_packets.shape[2])
|
|
|
|
@property
|
|
def flow_dim(self) -> int:
|
|
return int(self.train_flow.shape[1])
|
|
|
|
def _preprocess_packets(train_x: np.ndarray, val_x: np.ndarray, attack_x: np.ndarray, train_l: np.ndarray, val_l: np.ndarray, attack_l: np.ndarray, preprocess: str, seed: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
if preprocess not in ('zscore', 'mixed_dequant'):
|
|
raise ValueError("packet_preprocess must be 'zscore' or 'mixed_dequant'")
|
|
(mean, std) = _fit_packet_stats(train_x, train_l)
|
|
|
|
def prep(x: np.ndarray, l: np.ndarray, tag: str) -> np.ndarray:
|
|
if preprocess == 'zscore':
|
|
z = _zscore(x, mean, std)
|
|
mask = np.arange(x.shape[1])[None, :] < l[:, None]
|
|
return (z * mask[:, :, None]).astype(np.float32)
|
|
return _apply_mixed_dequant(x, l, mean, std, split_tag=tag, seed=seed)
|
|
return (prep(train_x, train_l, 'train'), prep(val_x, val_l, 'val'), prep(attack_x, attack_l, 'attack'), mean, std)
|
|
|
|
def _derive_flow_features(tokens: np.ndarray, lens: np.ndarray) -> np.ndarray:
|
|
(N, T, _) = tokens.shape
|
|
out = np.zeros((N, len(DERIVED_FLOW_FEATURE_NAMES)), dtype=np.float32)
|
|
for i in range(N):
|
|
n = int(max(lens[i], 1))
|
|
x = tokens[i, :n]
|
|
direction = x[:, 2]
|
|
size = x[:, 0]
|
|
dt = x[:, 1]
|
|
win = x[:, 8]
|
|
out[i, 0] = np.log1p(n)
|
|
out[i, 1] = np.mean(direction < 0.5)
|
|
out[i, 2] = np.mean(direction >= 0.5)
|
|
out[i, 3] = size.mean()
|
|
out[i, 4] = size.std()
|
|
out[i, 5] = size.min()
|
|
out[i, 6] = size.max()
|
|
out[i, 7] = dt.mean()
|
|
out[i, 8] = dt.std()
|
|
out[i, 9] = dt.max()
|
|
out[i, 10] = x[:, 3].mean()
|
|
out[i, 11] = x[:, 4].mean()
|
|
out[i, 12] = x[:, 5].mean()
|
|
out[i, 13] = x[:, 6].mean()
|
|
out[i, 14] = x[:, 7].mean()
|
|
out[i, 15] = win.mean()
|
|
return out
|
|
|
|
def _read_flow_features(path: Path, *, expected_rows: int, feature_columns: Optional[list[str]]=None) -> tuple[np.ndarray, tuple[str, ...], np.ndarray | None]:
|
|
path = Path(path)
|
|
if path.suffix == '.npz':
|
|
data = np.load(path, allow_pickle=True)
|
|
x = data['features'].astype(np.float32)
|
|
raw_names = data['feature_names'] if 'feature_names' in data.files else np.arange(x.shape[1])
|
|
names = tuple((str(v) for v in raw_names))
|
|
flow_id = data['flow_id'] if 'flow_id' in data.files else None
|
|
elif path.suffix in ('.parquet', '.pq'):
|
|
df = pd.read_parquet(path)
|
|
flow_id = df['flow_id'].to_numpy() if 'flow_id' in df.columns else None
|
|
if feature_columns:
|
|
cols = feature_columns
|
|
else:
|
|
cols = [c for c in df.columns if c not in DEFAULT_FLOW_META_COLUMNS and pd.api.types.is_numeric_dtype(df[c])]
|
|
if not cols:
|
|
raise ValueError(f'no numeric flow feature columns found in {path}')
|
|
x = df[cols].to_numpy(dtype=np.float32)
|
|
names = tuple(cols)
|
|
else:
|
|
raise ValueError(f'unsupported flow feature file: {path}')
|
|
if len(x) != expected_rows:
|
|
raise ValueError(f'flow feature row count {len(x):,} != packet row count {expected_rows:,}')
|
|
x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)
|
|
return (x, names, flow_id)
|
|
|
|
def _feature_columns_from_df(df: pd.DataFrame, requested: Optional[list[str]]) -> list[str]:
|
|
if requested:
|
|
return requested
|
|
return [c for c in df.columns if c not in DEFAULT_FLOW_META_COLUMNS and pd.api.types.is_numeric_dtype(df[c])]
|
|
|
|
def _align_flow_features_by_scan(feature_df: pd.DataFrame, packet_flows: pd.DataFrame, *, feature_columns: list[str]) -> tuple[np.ndarray, tuple[str, ...]]:
|
|
required = ['label', 'src_ip', 'src_port', 'dst_ip', 'dst_port', 'protocol']
|
|
missing_feature = [c for c in required if c not in feature_df.columns]
|
|
missing_packet = [c for c in required if c not in packet_flows.columns]
|
|
if missing_feature or missing_packet:
|
|
raise ValueError(f'scan alignment requires label + 5-tuple metadata. missing in feature_df={missing_feature}, packet_flows={missing_packet}')
|
|
packet_keys = [(str(lbl), _canonical_key(src, sp, dst, dp, proto)) for (lbl, src, sp, dst, dp, proto) in zip(packet_flows['label'].to_numpy(), packet_flows['src_ip'].to_numpy(), packet_flows['src_port'].to_numpy(), packet_flows['dst_ip'].to_numpy(), packet_flows['dst_port'].to_numpy(), packet_flows['protocol'].to_numpy())]
|
|
labels = feature_df['label'].to_numpy()
|
|
src_ip = feature_df['src_ip'].to_numpy()
|
|
src_port = feature_df['src_port'].to_numpy()
|
|
dst_ip = feature_df['dst_ip'].to_numpy()
|
|
dst_port = feature_df['dst_port'].to_numpy()
|
|
protocol = feature_df['protocol'].to_numpy()
|
|
matched: list[int] = []
|
|
j = 0
|
|
n_csv = len(feature_df)
|
|
for (i, target) in enumerate(packet_keys):
|
|
while j < n_csv:
|
|
cand = (str(labels[j]), _canonical_key(src_ip[j], src_port[j], dst_ip[j], dst_port[j], protocol[j]))
|
|
j += 1
|
|
if cand == target:
|
|
matched.append(j - 1)
|
|
break
|
|
else:
|
|
raise ValueError(f'failed to align packet flow row {i:,}/{len(packet_keys):,}; the CSV cache may not be the same one used for packet extraction')
|
|
print(f'[data] scan-aligned CSV flow features: matched={len(matched):,} from csv_rows={n_csv:,} skipped={matched[-1] + 1 - len(matched):,}')
|
|
x = feature_df.iloc[matched][feature_columns].to_numpy(dtype=np.float32)
|
|
x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)
|
|
return (x, tuple(feature_columns))
|
|
|
|
def _read_aligned_flow_features(path: Path, packet_flows: pd.DataFrame, *, feature_columns: Optional[list[str]]=None, align: str='auto') -> tuple[np.ndarray, tuple[str, ...]]:
|
|
path = Path(path)
|
|
if align not in ('auto', 'row', 'scan'):
|
|
raise ValueError("flow_features_align must be 'auto', 'row', or 'scan'")
|
|
if path.suffix == '.npz':
|
|
(x, names, flow_id) = _read_flow_features(path, expected_rows=len(packet_flows), feature_columns=feature_columns)
|
|
packet_id = packet_flows['flow_id'].to_numpy() if 'flow_id' in packet_flows else None
|
|
if flow_id is not None and packet_id is not None and (not np.array_equal(flow_id, packet_id)):
|
|
raise ValueError('NPZ flow_id does not align with Packet_CFM flows')
|
|
return (x, names)
|
|
if path.suffix not in ('.parquet', '.pq'):
|
|
raise ValueError(f'unsupported flow feature file: {path}')
|
|
feature_df = pd.read_parquet(path)
|
|
cols = _feature_columns_from_df(feature_df, feature_columns)
|
|
if not cols:
|
|
raise ValueError(f'no numeric flow feature columns found in {path}')
|
|
packet_id = packet_flows['flow_id'].to_numpy() if 'flow_id' in packet_flows else None
|
|
if len(feature_df) == len(packet_flows):
|
|
feature_id = feature_df['flow_id'].to_numpy() if 'flow_id' in feature_df.columns else None
|
|
if feature_id is None or packet_id is None or np.array_equal(feature_id, packet_id):
|
|
x = feature_df[cols].to_numpy(dtype=np.float32)
|
|
x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)
|
|
return (x, tuple(cols))
|
|
if align == 'row':
|
|
raise ValueError("flow_id mismatch with flow_features_align='row'")
|
|
if align == 'row':
|
|
raise ValueError(f'row alignment requested but feature rows={len(feature_df):,} packet rows={len(packet_flows):,}')
|
|
return _align_flow_features_by_scan(feature_df, packet_flows, feature_columns=cols)
|
|
|
|
def _preprocess_flow(train: np.ndarray, val: np.ndarray, attack: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
mean = train.mean(axis=0).astype(np.float32)
|
|
std = train.std(axis=0).astype(np.float32)
|
|
return (_zscore(train, mean, std), _zscore(val, mean, std), _zscore(attack, mean, std), mean, std)
|
|
|
|
def load_unified_data(*, packets_npz: Path | None=None, source_store: Path | None=None, flows_parquet: Path, flow_features_path: Path | None=None, flow_feature_columns: Optional[list[str]]=None, flow_features_align: str='auto', T: int=128, split_seed: int=42, train_ratio: float=0.8, benign_label: str='normal', min_len: int=2, packet_preprocess: str='mixed_dequant', attack_cap: int | None=None, val_cap: int | None=None) -> UnifiedData:
|
|
if (packets_npz is None) == (source_store is None):
|
|
raise ValueError('pass exactly one of packets_npz or source_store')
|
|
flows_parquet = Path(flows_parquet)
|
|
print(f'[data] flows={flows_parquet} packets_source={(packets_npz if packets_npz else source_store)}')
|
|
flow_cols = ['flow_id', 'label']
|
|
if flow_features_path is not None:
|
|
flow_cols += ['src_ip', 'src_port', 'dst_ip', 'dst_port', 'protocol']
|
|
flows = pd.read_parquet(flows_parquet, columns=flow_cols)
|
|
labels_full = flows['label'].to_numpy().astype(str)
|
|
flow_id = flows['flow_id'].to_numpy()
|
|
tokens_full: np.ndarray | None = None
|
|
store = None
|
|
if packets_npz is not None:
|
|
pz = np.load(Path(packets_npz))
|
|
tokens_full = pz['packet_tokens'].astype(np.float32)
|
|
lens_full = pz['packet_lengths'].astype(np.int32)
|
|
packet_flow_id = pz['flow_id'] if 'flow_id' in pz.files else None
|
|
if T > tokens_full.shape[1]:
|
|
raise ValueError(f'requested T={T} > stored T_full={tokens_full.shape[1]}')
|
|
tokens_full = tokens_full[:, :T].copy()
|
|
lens_full = np.minimum(lens_full, T).astype(np.int32)
|
|
if packet_flow_id is not None and (not np.array_equal(packet_flow_id, flow_id)):
|
|
raise ValueError('packets_npz and flows_parquet are not row-aligned by flow_id')
|
|
else:
|
|
if flow_features_path is None:
|
|
raise ValueError('source_store path requires flow_features_path (derived features need tokens in memory)')
|
|
from common.packet_store import PacketShardStore
|
|
store = PacketShardStore.open(Path(source_store))
|
|
store_flow_id = store.read_flows(columns=['flow_id'])['flow_id'].to_numpy()
|
|
if not np.array_equal(store_flow_id, flow_id):
|
|
raise ValueError('source_store and flows_parquet are not row-aligned by flow_id')
|
|
lens_full = np.minimum(store.manifest['packet_length'].to_numpy(dtype=np.int32), T)
|
|
if flow_features_path is None:
|
|
assert tokens_full is not None
|
|
flow_features = _derive_flow_features(tokens_full, lens_full)
|
|
flow_names = DERIVED_FLOW_FEATURE_NAMES
|
|
print(f'[data] using derived flow features D={flow_features.shape[1]}')
|
|
else:
|
|
(flow_features, flow_names) = _read_aligned_flow_features(Path(flow_features_path), flows, feature_columns=flow_feature_columns, align=flow_features_align)
|
|
print(f'[data] using external flow features D={flow_features.shape[1]}')
|
|
keep = lens_full >= min_len
|
|
labels = labels_full[keep]
|
|
flow_features = flow_features[keep]
|
|
lens = lens_full[keep]
|
|
global_idx = np.flatnonzero(keep).astype(np.int64)
|
|
if tokens_full is not None:
|
|
materialized_tokens = tokens_full[keep]
|
|
else:
|
|
materialized_tokens = None
|
|
print(f'[data] rows total={len(keep):,} keep len>={min_len}: {keep.sum():,}')
|
|
benign_local = np.where(labels == benign_label)[0]
|
|
attack_local = np.where(labels != benign_label)[0]
|
|
rng = np.random.default_rng(split_seed)
|
|
rng.shuffle(benign_local)
|
|
n_train = int(len(benign_local) * train_ratio)
|
|
train_local = benign_local[:n_train]
|
|
val_local = benign_local[n_train:]
|
|
if val_cap is not None and len(val_local) > val_cap:
|
|
val_local = np.sort(rng.choice(val_local, size=val_cap, replace=False))
|
|
if attack_cap is not None and len(attack_local) > attack_cap:
|
|
attack_local = np.sort(rng.choice(attack_local, size=attack_cap, replace=False))
|
|
print(f'[data] benign={len(benign_local):,} attack={len(attack_local):,} -> train={len(train_local):,} val={len(val_local):,}')
|
|
|
|
def _materialize(local_indices: np.ndarray) -> np.ndarray:
|
|
if materialized_tokens is not None:
|
|
return materialized_tokens[local_indices].astype(np.float32, copy=False)
|
|
assert store is not None
|
|
g = global_idx[local_indices]
|
|
(tok, _) = store.read_packets(g.astype(np.int64), T=T)
|
|
return tok.astype(np.float32, copy=False)
|
|
tr_p_raw = _materialize(train_local)
|
|
va_p_raw = _materialize(val_local)
|
|
at_p_raw = _materialize(attack_local)
|
|
tr_l = lens[train_local]
|
|
va_l = lens[val_local]
|
|
at_l = lens[attack_local]
|
|
tr_f_raw = flow_features[train_local]
|
|
va_f_raw = flow_features[val_local]
|
|
at_f_raw = flow_features[attack_local]
|
|
train_idx = train_local
|
|
val_idx = val_local
|
|
attack_idx = attack_local
|
|
(tr_p, va_p, at_p, p_mean, p_std) = _preprocess_packets(tr_p_raw, va_p_raw, at_p_raw, tr_l, va_l, at_l, preprocess=packet_preprocess, seed=split_seed)
|
|
(tr_f, va_f, at_f, f_mean, f_std) = _preprocess_flow(tr_f_raw, va_f_raw, at_f_raw)
|
|
return UnifiedData(train_flow=tr_f, val_flow=va_f, attack_flow=at_f, train_packets=tr_p, val_packets=va_p, attack_packets=at_p, train_len=tr_l, val_len=va_l, attack_len=at_l, attack_labels=labels[attack_idx], packet_mean=p_mean, packet_std=p_std, flow_mean=f_mean, flow_std=f_std, packet_preprocess=packet_preprocess, flow_feature_names=tuple(flow_names))
|
|
|
|
def subsample_train(data: UnifiedData, n_train: int, seed: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
if n_train <= 0 or n_train >= len(data.train_flow):
|
|
return (data.train_flow, data.train_packets, data.train_len)
|
|
rng = np.random.default_rng(seed)
|
|
idx = rng.choice(len(data.train_flow), n_train, replace=False)
|
|
idx.sort()
|
|
return (data.train_flow[idx], data.train_packets[idx], data.train_len[idx])
|