Files
JANUS/scripts/extract_cicddos2019.py

133 lines
8.4 KiB
Python

from __future__ import annotations
import argparse
import csv
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parent))
from extract_lib import extract_dataset, _canonical_key
from csv_adapter import CsvFlowAdapter, parse_csv_rows
JOIN_COLS = {'src_ip': 'Source IP', 'src_port': 'Source Port', 'dst_ip': 'Destination IP', 'dst_port': 'Destination Port', 'protocol': 'Protocol', 'timestamp': 'Timestamp'}
LABEL_COL = 'Label'
TIMESTAMP_FORMATS = ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S')
BENIGN_ALIASES = {'BENIGN', 'Benign', 'benign'}
BENIGN_TOKEN = 'normal'
DROP_LABEL_PATTERNS: tuple[str, ...] = ()
LABEL_ALIASES = {'UDP-lag': 'UDPLag'}
SHARDS = {'01-12': 'SAT-01-12-2018', '03-11': 'SAT-03-11-2018'}
SHARD_OFFSETS_DEFAULT = {'01-12': 43200.0, '03-11': 39600.0}
DEFAULT_CSV_DIR = Path('datasets/cicddos2019/raw/csv')
DEFAULT_PCAP_DIR = Path('datasets/cicddos2019/raw/pcap')
DEFAULT_OUT_PACKETS = Path('datasets/cicddos2019/processed/packets.npz')
DEFAULT_OUT_FLOWS = Path('datasets/cicddos2019/processed/flows.parquet')
CICDDOS2019_ADAPTER = CsvFlowAdapter(join_cols=JOIN_COLS, label_col=LABEL_COL, timestamp_formats=TIMESTAMP_FORMATS, benign_aliases=frozenset(BENIGN_ALIASES), benign_token=BENIGN_TOKEN, drop_label_patterns=DROP_LABEL_PATTERNS, label_aliases=LABEL_ALIASES)
def _normalize_label(raw: str) -> str:
s = raw.strip()
if s in BENIGN_ALIASES:
return BENIGN_TOKEN
return LABEL_ALIASES.get(s, s)
def _parse_timestamp(ts: str) -> float | None:
s = ts.strip()
if not s:
return None
for fmt in TIMESTAMP_FORMATS:
try:
return datetime.strptime(s, fmt).timestamp()
except ValueError:
continue
return None
def _find_pcaps_for_shard(pcap_dir: Path, prefix: str) -> list[Path]:
found: list[Path] = []
seen = set()
for pat in (f'{prefix}*', f'{prefix}*.pcap', f'{prefix}*.pcapng'):
for p in sorted(pcap_dir.glob(pat)):
if p.is_file() and p not in seen:
found.append(p)
seen.add(p)
return found
def _parse_csv(csv_path: Path, row_idx_start: int, time_offset_seconds: float, max_per_class: int | None, max_benign: int | None, rng: np.random.Generator) -> tuple[dict[tuple, list[tuple[int, float]]], list[str], int, int, dict[str, int]]:
return parse_csv_rows(csv_path=csv_path, row_idx_start=row_idx_start, time_offset_seconds=time_offset_seconds, adapter=CICDDOS2019_ADAPTER, max_per_class=max_per_class, max_benign=max_benign, rng=rng)
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument('--csv-dir', type=Path, default=DEFAULT_CSV_DIR)
ap.add_argument('--pcap-dir', type=Path, default=DEFAULT_PCAP_DIR)
ap.add_argument('--out-packets', type=Path, default=DEFAULT_OUT_PACKETS)
ap.add_argument('--out-flows', type=Path, default=DEFAULT_OUT_FLOWS)
ap.add_argument('--out-store', type=Path, default=None, help='Optional sharded packet store output. When set, writes store_root/{metadata,manifest,flows,packets/*} instead of the monolithic packets.npz/flows.parquet pair.')
ap.add_argument('--shard-size', type=int, default=100000, help='Rows per packet shard when --out-store is set.')
ap.add_argument('--worker-flush-size', type=int, default=10000, help='Matched flows per temporary worker chunk when --out-store is set.')
ap.add_argument('--spool-dir', type=Path, default=None, help='Optional temporary spool directory for worker chunks.')
ap.add_argument('--match-strategy', choices=('auto', 'hungarian', 'stream_nearest'), default='auto', help='CSV↔pcap matching strategy. auto uses stream_nearest for --out-store and hungarian for legacy npz output.')
ap.add_argument('--T-full', type=int, default=256)
ap.add_argument('--idle-timeout', type=float, default=120.0)
ap.add_argument('--time-tolerance', type=float, default=2.0)
ap.add_argument('--time-offset', type=float, default=0.0, help='Extra seconds added to per-shard SHARD_OFFSETS_DEFAULT. Default 0 assumes a UTC+8 host (matches the SHARD_OFFSETS_DEFAULT values: 03-11=39600, 01-12=43200). If the per-shard time-delta diagnostic shows a non-zero median, add that to this flag.')
ap.add_argument('--jobs', type=int, default=0, help='0=auto (min(n_shards, cpu_count)). 1=serial.')
ap.add_argument('--shards', type=str, nargs='*', default=None, choices=sorted(SHARDS.keys()), help='Subset of shards to process (default: all).')
ap.add_argument('--max-per-class', type=int, default=500000, help='Per-file, per-attack-class row cap (random subsample). Default 500k. Pass 0 to disable.')
ap.add_argument('--max-benign', type=int, default=None, help='Per-file benign row cap. Default: uncapped (keep all).')
ap.add_argument('--max-packets-per-pcap', type=int, default=None, help='Cap per-pcap packets (smoke only).')
ap.add_argument('--max-pcap-files-per-shard', type=int, default=None, help='Only process the first N pcap chunks per shard (smoke only).')
ap.add_argument('--sample-seed', type=int, default=42)
args = ap.parse_args()
max_per_class = args.max_per_class or None
max_benign = args.max_benign or None
rng = np.random.default_rng(args.sample_seed)
shards = args.shards or sorted(SHARDS.keys())
csv_rows_by_day: dict[str, dict] = {}
all_labels: list[str] = []
total_rows = 0
total_skip = 0
aggregate_counts: dict[str, int] = {}
print(f'=== parsing CSVs in {args.csv_dir} ===')
print(f' max_per_class={max_per_class} max_benign={max_benign}')
print(f' additive time_offset={args.time_offset}s (on top of per-shard defaults)')
for shard in shards:
shard_offset = SHARD_OFFSETS_DEFAULT.get(shard, 0.0) + args.time_offset
print(f'[{shard}] effective time_offset={shard_offset}s (= default {SHARD_OFFSETS_DEFAULT.get(shard, 0.0)} + CLI {args.time_offset})')
shard_dir = args.csv_dir / shard
if not shard_dir.is_dir():
print(f'[{shard}] {shard_dir} not found — skipping')
continue
csvs = sorted(shard_dir.glob('*.csv'))
if not csvs:
print(f'[{shard}] no CSVs under {shard_dir}')
continue
shard_rows: dict[tuple, list[tuple[int, float]]] = {}
for csv_path in csvs:
(day_rows, labels, n_emit, n_skip, cls_counts) = _parse_csv(csv_path, row_idx_start=total_rows, time_offset_seconds=shard_offset, max_per_class=max_per_class, max_benign=max_benign, rng=rng)
for (ck, rs) in day_rows.items():
shard_rows.setdefault(ck, []).extend(rs)
all_labels.extend(labels)
total_rows += n_emit
total_skip += n_skip
for (lbl, c) in cls_counts.items():
aggregate_counts[lbl] = aggregate_counts.get(lbl, 0) + c
print(f'[{shard}/{csv_path.name}] emitted {n_emit:,} skipped {n_skip:,} cls={dict(sorted(cls_counts.items()))}')
csv_rows_by_day[shard] = shard_rows
print(f'[{shard}] shard total: {sum((len(v) for v in shard_rows.values())):,} canonical keys')
labels_by_row = np.asarray(all_labels, dtype=object)
print(f'\nTotal CSV rows emitted: {total_rows:,} skipped: {total_skip:,}')
print(f'Aggregate label distribution (post-subsample):')
for (lbl, cnt) in sorted(aggregate_counts.items(), key=lambda x: -x[1]):
print(f' {lbl:<40s} {cnt:>12,}')
print(f'\n=== locating pcap chunks in {args.pcap_dir} ===')
pcap_files_by_day: dict[str, list[Path]] = {}
for shard in shards:
prefix = SHARDS[shard]
files = _find_pcaps_for_shard(args.pcap_dir, prefix)
if args.max_pcap_files_per_shard is not None:
files = files[:args.max_pcap_files_per_shard]
pcap_files_by_day[shard] = files
print(f'[{shard}] prefix {prefix!r}{len(files):,} pcap chunks')
print(f'\n=== extracting packet sequences ===')
extract_dataset(csv_rows_by_day=csv_rows_by_day, labels_by_row=labels_by_row, pcap_files_by_day=pcap_files_by_day, out_packets=args.out_packets, out_flows=args.out_flows, out_store=args.out_store, shard_size=args.shard_size, worker_flush_size=args.worker_flush_size, spool_dir=args.spool_dir, match_strategy=None if args.match_strategy == 'auto' else args.match_strategy, T_full=args.T_full, idle_timeout=args.idle_timeout, time_tolerance_seconds=args.time_tolerance, max_packets_per_pcap=args.max_packets_per_pcap, n_jobs=args.jobs)
if __name__ == '__main__':
main()