from __future__ import annotations import argparse import csv import sys from datetime import datetime from pathlib import Path import numpy as np sys.path.insert(0, str(Path(__file__).resolve().parent)) from extract_lib import extract_dataset, _canonical_key from csv_adapter import CsvFlowAdapter, parse_csv_rows JOIN_COLS = {'src_ip': 'Src IP', 'src_port': 'Src Port', 'dst_ip': 'Dst IP', 'dst_port': 'Dst Port', 'protocol': 'Protocol', 'timestamp': 'Timestamp'} LABEL_COL = 'Label' TIMESTAMP_FORMATS = ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %H:%M') BENIGN_ALIASES = {'BENIGN', 'Benign', 'benign'} BENIGN_TOKEN = 'normal' DROP_LABEL_PATTERNS = ('- Attempted',) SHARDS = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday') DEFAULT_CSV_DIR = Path('datasets/cicids2017/raw/csv') DEFAULT_PCAP_DIR = Path('datasets/cicids2017/raw/pcap') DEFAULT_OUT_PACKETS = Path('datasets/cicids2017/processed/packets.npz') DEFAULT_OUT_FLOWS = Path('datasets/cicids2017/processed/flows.parquet') CICIDS2017_ADAPTER = CsvFlowAdapter(join_cols=JOIN_COLS, label_col=LABEL_COL, timestamp_formats=TIMESTAMP_FORMATS, benign_aliases=frozenset(BENIGN_ALIASES), benign_token=BENIGN_TOKEN, drop_label_patterns=DROP_LABEL_PATTERNS) def _normalize_label(raw: str) -> str: s = raw.strip() return BENIGN_TOKEN if s in BENIGN_ALIASES else s def _parse_timestamp(ts: str) -> float | None: s = ts.strip() if not s: return None for fmt in TIMESTAMP_FORMATS: try: return datetime.strptime(s, fmt).timestamp() except ValueError: continue return None def _find_pcaps_for_day(pcap_dir: Path, day: str) -> list[Path]: day_lc = day.lower() day_cap = day.capitalize() pats = [f'*{day_lc}*.pcap', f'*{day_lc}*.pcapng', f'*{day_cap}*.pcap', f'*{day_cap}*.pcapng'] found: list[Path] = [] seen = set() for pat in pats: for p in sorted(pcap_dir.glob(pat)): if p not in seen: found.append(p) seen.add(p) return found def _parse_day_csv(csv_path: Path, row_idx_start: int, time_offset_seconds: float) -> tuple[dict[tuple, list[tuple[int, float]]], list[str], int, int]: (day_rows, labels, n_emit, n_skip, _) = parse_csv_rows(csv_path=csv_path, row_idx_start=row_idx_start, time_offset_seconds=time_offset_seconds, adapter=CICIDS2017_ADAPTER) return (day_rows, labels, n_emit, n_skip) def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument('--csv-dir', type=Path, default=DEFAULT_CSV_DIR) ap.add_argument('--pcap-dir', type=Path, default=DEFAULT_PCAP_DIR) ap.add_argument('--out-packets', type=Path, default=DEFAULT_OUT_PACKETS) ap.add_argument('--out-flows', type=Path, default=DEFAULT_OUT_FLOWS) ap.add_argument('--out-store', type=Path, default=None, help='Optional sharded packet store output. When set, writes store_root/{metadata,manifest,flows,packets/*} instead of the monolithic packets.npz/flows.parquet pair.') ap.add_argument('--shard-size', type=int, default=100000, help='Rows per packet shard when --out-store is set.') ap.add_argument('--worker-flush-size', type=int, default=10000, help='Matched flows per temporary worker chunk when --out-store is set.') ap.add_argument('--spool-dir', type=Path, default=None, help='Optional temporary spool directory for worker chunks.') ap.add_argument('--match-strategy', choices=('auto', 'hungarian', 'stream_nearest'), default='auto', help='CSV↔pcap matching strategy. auto uses stream_nearest for --out-store and hungarian for legacy npz output.') ap.add_argument('--T-full', type=int, default=256) ap.add_argument('--idle-timeout', type=float, default=120.0) ap.add_argument('--time-tolerance', type=float, default=2.0, help='Max |t_csv - t_pcap| seconds for flow match.') ap.add_argument('--time-offset', type=float, default=0.0, help='Seconds added to CSV timestamps before matching.') ap.add_argument('--jobs', type=int, default=0, help='0 = auto (min(n_days, cpu_count)). 1 = serial.') ap.add_argument('--days', type=str, nargs='*', default=None, help='Subset of shards to process (default: all 5).') ap.add_argument('--max-packets-per-pcap', type=int, default=None, help='Cap per-pcap packets (smoke tests only).') args = ap.parse_args() days = tuple(args.days) if args.days else SHARDS csv_rows_by_day: dict[str, dict] = {} all_labels: list[str] = [] total_rows = 0 total_skip = 0 print(f'=== parsing CSVs in {args.csv_dir} ===') for day in days: csv_path = args.csv_dir / f'{day}.csv' if not csv_path.exists(): print(f'[{day}] {csv_path} not found, skipping') continue (day_rows, labels, n_emit, n_skip) = _parse_day_csv(csv_path, row_idx_start=total_rows, time_offset_seconds=args.time_offset) csv_rows_by_day[day] = day_rows all_labels.extend(labels) total_rows += n_emit total_skip += n_skip print(f'[{day}] emitted {n_emit:,} rows skipped {n_skip:,} canonical keys {len(day_rows):,}') labels_by_row = np.asarray(all_labels, dtype=object) print(f'Total CSV rows emitted: {total_rows:,} (skipped {total_skip:,})') print(f'\n=== locating pcap files in {args.pcap_dir} ===') pcap_files_by_day: dict[str, list[Path]] = {} for day in days: files = _find_pcaps_for_day(args.pcap_dir, day) pcap_files_by_day[day] = files names = [p.name for p in files] print(f'[{day}] {len(files)} pcap(s): {names}') print(f'\n=== extracting packet sequences ===') extract_dataset(csv_rows_by_day=csv_rows_by_day, labels_by_row=labels_by_row, pcap_files_by_day=pcap_files_by_day, out_packets=args.out_packets, out_flows=args.out_flows, out_store=args.out_store, shard_size=args.shard_size, worker_flush_size=args.worker_flush_size, spool_dir=args.spool_dir, match_strategy=None if args.match_strategy == 'auto' else args.match_strategy, T_full=args.T_full, idle_timeout=args.idle_timeout, time_tolerance_seconds=args.time_tolerance, max_packets_per_pcap=args.max_packets_per_pcap, n_jobs=args.jobs) if __name__ == '__main__': main()