from __future__ import annotations import argparse import csv import sys from datetime import datetime from pathlib import Path import numpy as np sys.path.insert(0, str(Path(__file__).resolve().parent)) from extract_lib import extract_dataset, _canonical_key from csv_adapter import CsvFlowAdapter, parse_csv_rows JOIN_COLS = {'src_ip': 'Source IP', 'src_port': 'Source Port', 'dst_ip': 'Destination IP', 'dst_port': 'Destination Port', 'protocol': 'Protocol', 'timestamp': 'Timestamp'} LABEL_COL = 'Label' TIMESTAMP_FORMATS = ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S') BENIGN_ALIASES = {'BENIGN', 'Benign', 'benign'} BENIGN_TOKEN = 'normal' DROP_LABEL_PATTERNS: tuple[str, ...] = () LABEL_ALIASES = {'UDP-lag': 'UDPLag'} SHARDS = {'01-12': 'SAT-01-12-2018', '03-11': 'SAT-03-11-2018'} SHARD_OFFSETS_DEFAULT = {'01-12': 43200.0, '03-11': 39600.0} DEFAULT_CSV_DIR = Path('datasets/cicddos2019/raw/csv') DEFAULT_PCAP_DIR = Path('datasets/cicddos2019/raw/pcap') DEFAULT_OUT_PACKETS = Path('datasets/cicddos2019/processed/packets.npz') DEFAULT_OUT_FLOWS = Path('datasets/cicddos2019/processed/flows.parquet') CICDDOS2019_ADAPTER = CsvFlowAdapter(join_cols=JOIN_COLS, label_col=LABEL_COL, timestamp_formats=TIMESTAMP_FORMATS, benign_aliases=frozenset(BENIGN_ALIASES), benign_token=BENIGN_TOKEN, drop_label_patterns=DROP_LABEL_PATTERNS, label_aliases=LABEL_ALIASES) def _normalize_label(raw: str) -> str: s = raw.strip() if s in BENIGN_ALIASES: return BENIGN_TOKEN return LABEL_ALIASES.get(s, s) def _parse_timestamp(ts: str) -> float | None: s = ts.strip() if not s: return None for fmt in TIMESTAMP_FORMATS: try: return datetime.strptime(s, fmt).timestamp() except ValueError: continue return None def _find_pcaps_for_shard(pcap_dir: Path, prefix: str) -> list[Path]: found: list[Path] = [] seen = set() for pat in (f'{prefix}*', f'{prefix}*.pcap', f'{prefix}*.pcapng'): for p in sorted(pcap_dir.glob(pat)): if p.is_file() and p not in seen: found.append(p) seen.add(p) return found def _parse_csv(csv_path: Path, row_idx_start: int, time_offset_seconds: float, max_per_class: int | None, max_benign: int | None, rng: np.random.Generator) -> tuple[dict[tuple, list[tuple[int, float]]], list[str], int, int, dict[str, int]]: return parse_csv_rows(csv_path=csv_path, row_idx_start=row_idx_start, time_offset_seconds=time_offset_seconds, adapter=CICDDOS2019_ADAPTER, max_per_class=max_per_class, max_benign=max_benign, rng=rng) def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument('--csv-dir', type=Path, default=DEFAULT_CSV_DIR) ap.add_argument('--pcap-dir', type=Path, default=DEFAULT_PCAP_DIR) ap.add_argument('--out-packets', type=Path, default=DEFAULT_OUT_PACKETS) ap.add_argument('--out-flows', type=Path, default=DEFAULT_OUT_FLOWS) ap.add_argument('--out-store', type=Path, default=None, help='Optional sharded packet store output. When set, writes store_root/{metadata,manifest,flows,packets/*} instead of the monolithic packets.npz/flows.parquet pair.') ap.add_argument('--shard-size', type=int, default=100000, help='Rows per packet shard when --out-store is set.') ap.add_argument('--worker-flush-size', type=int, default=10000, help='Matched flows per temporary worker chunk when --out-store is set.') ap.add_argument('--spool-dir', type=Path, default=None, help='Optional temporary spool directory for worker chunks.') ap.add_argument('--match-strategy', choices=('auto', 'hungarian', 'stream_nearest'), default='auto', help='CSV↔pcap matching strategy. auto uses stream_nearest for --out-store and hungarian for legacy npz output.') ap.add_argument('--T-full', type=int, default=256) ap.add_argument('--idle-timeout', type=float, default=120.0) ap.add_argument('--time-tolerance', type=float, default=2.0) ap.add_argument('--time-offset', type=float, default=0.0, help='Extra seconds added to per-shard SHARD_OFFSETS_DEFAULT. Default 0 assumes a UTC+8 host (matches the SHARD_OFFSETS_DEFAULT values: 03-11=39600, 01-12=43200). If the per-shard time-delta diagnostic shows a non-zero median, add that to this flag.') ap.add_argument('--jobs', type=int, default=0, help='0=auto (min(n_shards, cpu_count)). 1=serial.') ap.add_argument('--shards', type=str, nargs='*', default=None, choices=sorted(SHARDS.keys()), help='Subset of shards to process (default: all).') ap.add_argument('--max-per-class', type=int, default=500000, help='Per-file, per-attack-class row cap (random subsample). Default 500k. Pass 0 to disable.') ap.add_argument('--max-benign', type=int, default=None, help='Per-file benign row cap. Default: uncapped (keep all).') ap.add_argument('--max-packets-per-pcap', type=int, default=None, help='Cap per-pcap packets (smoke only).') ap.add_argument('--max-pcap-files-per-shard', type=int, default=None, help='Only process the first N pcap chunks per shard (smoke only).') ap.add_argument('--sample-seed', type=int, default=42) args = ap.parse_args() max_per_class = args.max_per_class or None max_benign = args.max_benign or None rng = np.random.default_rng(args.sample_seed) shards = args.shards or sorted(SHARDS.keys()) csv_rows_by_day: dict[str, dict] = {} all_labels: list[str] = [] total_rows = 0 total_skip = 0 aggregate_counts: dict[str, int] = {} print(f'=== parsing CSVs in {args.csv_dir} ===') print(f' max_per_class={max_per_class} max_benign={max_benign}') print(f' additive time_offset={args.time_offset}s (on top of per-shard defaults)') for shard in shards: shard_offset = SHARD_OFFSETS_DEFAULT.get(shard, 0.0) + args.time_offset print(f'[{shard}] effective time_offset={shard_offset}s (= default {SHARD_OFFSETS_DEFAULT.get(shard, 0.0)} + CLI {args.time_offset})') shard_dir = args.csv_dir / shard if not shard_dir.is_dir(): print(f'[{shard}] {shard_dir} not found — skipping') continue csvs = sorted(shard_dir.glob('*.csv')) if not csvs: print(f'[{shard}] no CSVs under {shard_dir}') continue shard_rows: dict[tuple, list[tuple[int, float]]] = {} for csv_path in csvs: (day_rows, labels, n_emit, n_skip, cls_counts) = _parse_csv(csv_path, row_idx_start=total_rows, time_offset_seconds=shard_offset, max_per_class=max_per_class, max_benign=max_benign, rng=rng) for (ck, rs) in day_rows.items(): shard_rows.setdefault(ck, []).extend(rs) all_labels.extend(labels) total_rows += n_emit total_skip += n_skip for (lbl, c) in cls_counts.items(): aggregate_counts[lbl] = aggregate_counts.get(lbl, 0) + c print(f'[{shard}/{csv_path.name}] emitted {n_emit:,} skipped {n_skip:,} cls={dict(sorted(cls_counts.items()))}') csv_rows_by_day[shard] = shard_rows print(f'[{shard}] shard total: {sum((len(v) for v in shard_rows.values())):,} canonical keys') labels_by_row = np.asarray(all_labels, dtype=object) print(f'\nTotal CSV rows emitted: {total_rows:,} skipped: {total_skip:,}') print(f'Aggregate label distribution (post-subsample):') for (lbl, cnt) in sorted(aggregate_counts.items(), key=lambda x: -x[1]): print(f' {lbl:<40s} {cnt:>12,}') print(f'\n=== locating pcap chunks in {args.pcap_dir} ===') pcap_files_by_day: dict[str, list[Path]] = {} for shard in shards: prefix = SHARDS[shard] files = _find_pcaps_for_shard(args.pcap_dir, prefix) if args.max_pcap_files_per_shard is not None: files = files[:args.max_pcap_files_per_shard] pcap_files_by_day[shard] = files print(f'[{shard}] prefix {prefix!r} → {len(files):,} pcap chunks') print(f'\n=== extracting packet sequences ===') extract_dataset(csv_rows_by_day=csv_rows_by_day, labels_by_row=labels_by_row, pcap_files_by_day=pcap_files_by_day, out_packets=args.out_packets, out_flows=args.out_flows, out_store=args.out_store, shard_size=args.shard_size, worker_flush_size=args.worker_flush_size, spool_dir=args.spool_dir, match_strategy=None if args.match_strategy == 'auto' else args.match_strategy, T_full=args.T_full, idle_timeout=args.idle_timeout, time_tolerance_seconds=args.time_tolerance, max_packets_per_pcap=args.max_packets_per_pcap, n_jobs=args.jobs) if __name__ == '__main__': main()