105 lines
6.1 KiB
Python
105 lines
6.1 KiB
Python
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import numpy as np
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
from extract_lib import extract_dataset, _canonical_key
|
|
from csv_adapter import CsvFlowAdapter, parse_csv_rows
|
|
JOIN_COLS = {'src_ip': 'Src IP', 'src_port': 'Src Port', 'dst_ip': 'Dst IP', 'dst_port': 'Dst Port', 'protocol': 'Protocol', 'timestamp': 'Timestamp'}
|
|
LABEL_COL = 'Label'
|
|
TIMESTAMP_FORMATS = ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %H:%M')
|
|
BENIGN_ALIASES = {'BENIGN', 'Benign', 'benign'}
|
|
BENIGN_TOKEN = 'normal'
|
|
DROP_LABEL_PATTERNS = ('- Attempted',)
|
|
SHARDS = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday')
|
|
DEFAULT_CSV_DIR = Path('datasets/cicids2017/raw/csv')
|
|
DEFAULT_PCAP_DIR = Path('datasets/cicids2017/raw/pcap')
|
|
DEFAULT_OUT_PACKETS = Path('datasets/cicids2017/processed/packets.npz')
|
|
DEFAULT_OUT_FLOWS = Path('datasets/cicids2017/processed/flows.parquet')
|
|
CICIDS2017_ADAPTER = CsvFlowAdapter(join_cols=JOIN_COLS, label_col=LABEL_COL, timestamp_formats=TIMESTAMP_FORMATS, benign_aliases=frozenset(BENIGN_ALIASES), benign_token=BENIGN_TOKEN, drop_label_patterns=DROP_LABEL_PATTERNS)
|
|
|
|
def _normalize_label(raw: str) -> str:
|
|
s = raw.strip()
|
|
return BENIGN_TOKEN if s in BENIGN_ALIASES else s
|
|
|
|
def _parse_timestamp(ts: str) -> float | None:
|
|
s = ts.strip()
|
|
if not s:
|
|
return None
|
|
for fmt in TIMESTAMP_FORMATS:
|
|
try:
|
|
return datetime.strptime(s, fmt).timestamp()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
def _find_pcaps_for_day(pcap_dir: Path, day: str) -> list[Path]:
|
|
day_lc = day.lower()
|
|
day_cap = day.capitalize()
|
|
pats = [f'*{day_lc}*.pcap', f'*{day_lc}*.pcapng', f'*{day_cap}*.pcap', f'*{day_cap}*.pcapng']
|
|
found: list[Path] = []
|
|
seen = set()
|
|
for pat in pats:
|
|
for p in sorted(pcap_dir.glob(pat)):
|
|
if p not in seen:
|
|
found.append(p)
|
|
seen.add(p)
|
|
return found
|
|
|
|
def _parse_day_csv(csv_path: Path, row_idx_start: int, time_offset_seconds: float) -> tuple[dict[tuple, list[tuple[int, float]]], list[str], int, int]:
|
|
(day_rows, labels, n_emit, n_skip, _) = parse_csv_rows(csv_path=csv_path, row_idx_start=row_idx_start, time_offset_seconds=time_offset_seconds, adapter=CICIDS2017_ADAPTER)
|
|
return (day_rows, labels, n_emit, n_skip)
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument('--csv-dir', type=Path, default=DEFAULT_CSV_DIR)
|
|
ap.add_argument('--pcap-dir', type=Path, default=DEFAULT_PCAP_DIR)
|
|
ap.add_argument('--out-packets', type=Path, default=DEFAULT_OUT_PACKETS)
|
|
ap.add_argument('--out-flows', type=Path, default=DEFAULT_OUT_FLOWS)
|
|
ap.add_argument('--out-store', type=Path, default=None, help='Optional sharded packet store output. When set, writes store_root/{metadata,manifest,flows,packets/*} instead of the monolithic packets.npz/flows.parquet pair.')
|
|
ap.add_argument('--shard-size', type=int, default=100000, help='Rows per packet shard when --out-store is set.')
|
|
ap.add_argument('--worker-flush-size', type=int, default=10000, help='Matched flows per temporary worker chunk when --out-store is set.')
|
|
ap.add_argument('--spool-dir', type=Path, default=None, help='Optional temporary spool directory for worker chunks.')
|
|
ap.add_argument('--match-strategy', choices=('auto', 'hungarian', 'stream_nearest'), default='auto', help='CSV↔pcap matching strategy. auto uses stream_nearest for --out-store and hungarian for legacy npz output.')
|
|
ap.add_argument('--T-full', type=int, default=256)
|
|
ap.add_argument('--idle-timeout', type=float, default=120.0)
|
|
ap.add_argument('--time-tolerance', type=float, default=2.0, help='Max |t_csv - t_pcap| seconds for flow match.')
|
|
ap.add_argument('--time-offset', type=float, default=0.0, help='Seconds added to CSV timestamps before matching.')
|
|
ap.add_argument('--jobs', type=int, default=0, help='0 = auto (min(n_days, cpu_count)). 1 = serial.')
|
|
ap.add_argument('--days', type=str, nargs='*', default=None, help='Subset of shards to process (default: all 5).')
|
|
ap.add_argument('--max-packets-per-pcap', type=int, default=None, help='Cap per-pcap packets (smoke tests only).')
|
|
args = ap.parse_args()
|
|
days = tuple(args.days) if args.days else SHARDS
|
|
csv_rows_by_day: dict[str, dict] = {}
|
|
all_labels: list[str] = []
|
|
total_rows = 0
|
|
total_skip = 0
|
|
print(f'=== parsing CSVs in {args.csv_dir} ===')
|
|
for day in days:
|
|
csv_path = args.csv_dir / f'{day}.csv'
|
|
if not csv_path.exists():
|
|
print(f'[{day}] {csv_path} not found, skipping')
|
|
continue
|
|
(day_rows, labels, n_emit, n_skip) = _parse_day_csv(csv_path, row_idx_start=total_rows, time_offset_seconds=args.time_offset)
|
|
csv_rows_by_day[day] = day_rows
|
|
all_labels.extend(labels)
|
|
total_rows += n_emit
|
|
total_skip += n_skip
|
|
print(f'[{day}] emitted {n_emit:,} rows skipped {n_skip:,} canonical keys {len(day_rows):,}')
|
|
labels_by_row = np.asarray(all_labels, dtype=object)
|
|
print(f'Total CSV rows emitted: {total_rows:,} (skipped {total_skip:,})')
|
|
print(f'\n=== locating pcap files in {args.pcap_dir} ===')
|
|
pcap_files_by_day: dict[str, list[Path]] = {}
|
|
for day in days:
|
|
files = _find_pcaps_for_day(args.pcap_dir, day)
|
|
pcap_files_by_day[day] = files
|
|
names = [p.name for p in files]
|
|
print(f'[{day}] {len(files)} pcap(s): {names}')
|
|
print(f'\n=== extracting packet sequences ===')
|
|
extract_dataset(csv_rows_by_day=csv_rows_by_day, labels_by_row=labels_by_row, pcap_files_by_day=pcap_files_by_day, out_packets=args.out_packets, out_flows=args.out_flows, out_store=args.out_store, shard_size=args.shard_size, worker_flush_size=args.worker_flush_size, spool_dir=args.spool_dir, match_strategy=None if args.match_strategy == 'auto' else args.match_strategy, T_full=args.T_full, idle_timeout=args.idle_timeout, time_tolerance_seconds=args.time_tolerance, max_packets_per_pcap=args.max_packets_per_pcap, n_jobs=args.jobs)
|
|
if __name__ == '__main__':
|
|
main()
|