330 lines
12 KiB
Python
330 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
annotate_kappa.py — 用两个 LLM 独立标注 benchmark 子集,计算 Cohen's kappa。
|
||
|
||
使用方式:
|
||
# 用两个不同模型标注
|
||
python annotate_kappa.py \
|
||
--model1 deepseek-v4-flash --api-base1 https://api.deepseek.com/v1 --api-key1 sk-xxx \
|
||
--model2 claude-sonnet-4-6 --api-base2 https://your-api/v1 --api-key2 sk-xxx \
|
||
--sample-size 70
|
||
|
||
# 只用一个模型标注(跟 ground truth 比)
|
||
python annotate_kappa.py \
|
||
--model1 deepseek-v4-flash --api-base1 https://api.deepseek.com/v1 --api-key1 sk-xxx \
|
||
--sample-size 70 --gt-only
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
import random
|
||
import argparse
|
||
import requests
|
||
from pathlib import Path
|
||
from collections import Counter
|
||
from typing import Dict, List, Any
|
||
|
||
PROJECT_ROOT = Path(__file__).resolve().parent
|
||
sys.path.insert(0, str(PROJECT_ROOT))
|
||
|
||
BENCHMARK_DIR = PROJECT_ROOT / "data" / "benchmark"
|
||
RESULTS_DIR = PROJECT_ROOT / "results" / "kappa"
|
||
|
||
# 标注用的 prompt — 故意简化,不用 EDRC/baseline 的分析框架
|
||
ANNOTATOR_SYSTEM = """你是一位智能家居安全专家,正在审查设备日志标注数据。
|
||
请仔细阅读设备事件日志,独立判断是否存在安全异常。
|
||
不要过度解读正常的日常活动(温度波动、正常开关灯等)。
|
||
只有当你发现明确的异常证据时才判定为异常。"""
|
||
|
||
ANNOTATOR_FORMAT = """请只输出一个 JSON,不要输出其他内容:
|
||
```json
|
||
{
|
||
"is_anomaly": true或false,
|
||
"threat_type": "具体类型或none",
|
||
"confidence": "high/medium/low",
|
||
"brief_reason": "一句话理由"
|
||
}
|
||
```
|
||
|
||
threat_type 从以下选择:intrusion / tailgating / credential_theft / fire_risk / unattended_cooking / carbon_monoxide / sensor_stuck / sensor_drift / sensor_malfunction / actuator_stuck / lock_malfunction / safety_device_failure / water_leak / possible_fall / abnormal_inactivity / health_concern / child_safety / behavioral_anomaly / none"""
|
||
|
||
|
||
def stratified_sample(sample_size: int, seed: int = 42) -> List[str]:
|
||
"""分层抽样:确保每种 SQ × variant 都覆盖"""
|
||
random.seed(seed)
|
||
all_files = {}
|
||
|
||
for sq in ["sq1", "sq2", "sq3", "sq4", "sq5"]:
|
||
sq_dir = BENCHMARK_DIR / sq
|
||
if not sq_dir.exists():
|
||
continue
|
||
for f in sq_dir.glob("*.json"):
|
||
for v in ["TP", "FP", "TN"]:
|
||
if f"_{v}_" in f.stem:
|
||
key = f"{sq.upper()}_{v}"
|
||
if key not in all_files:
|
||
all_files[key] = []
|
||
all_files[key].append(str(f))
|
||
break
|
||
|
||
# 每个 strata 至少抽 2 个,剩余按比例分配
|
||
sampled = []
|
||
per_strata = max(2, sample_size // len(all_files))
|
||
for key, files in sorted(all_files.items()):
|
||
n = min(per_strata, len(files))
|
||
sampled.extend(random.sample(files, n))
|
||
|
||
# 如果不够,随机补
|
||
remaining_pool = [f for files in all_files.values() for f in files if f not in sampled]
|
||
while len(sampled) < sample_size and remaining_pool:
|
||
pick = random.choice(remaining_pool)
|
||
remaining_pool.remove(pick)
|
||
sampled.append(pick)
|
||
|
||
return sampled[:sample_size]
|
||
|
||
|
||
def call_annotator(system, user, model, api_base, api_key, timeout=120, extra_body=None):
|
||
"""调用标注模型"""
|
||
url = api_base.rstrip("/")
|
||
if not url.endswith("/chat/completions"):
|
||
url += "/chat/completions"
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
if api_key and api_key != "not-needed":
|
||
headers["Authorization"] = f"Bearer {api_key}"
|
||
|
||
payload = {
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "system", "content": system},
|
||
{"role": "user", "content": user},
|
||
],
|
||
"temperature": 0.0,
|
||
"max_tokens": 1024,
|
||
}
|
||
if extra_body:
|
||
payload.update(extra_body)
|
||
|
||
resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
return data["choices"][0]["message"]["content"]
|
||
|
||
|
||
def parse_annotation(raw: str) -> Dict:
|
||
"""解析标注结果"""
|
||
import re
|
||
text = raw.strip()
|
||
json_match = re.search(r"```(?:json)?\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
|
||
if json_match:
|
||
text = json_match.group(1).strip()
|
||
try:
|
||
return json.loads(text)
|
||
except:
|
||
first = text.find("{")
|
||
last = text.rfind("}")
|
||
if first != -1 and last > first:
|
||
try:
|
||
return json.loads(text[first:last + 1])
|
||
except:
|
||
pass
|
||
return {"is_anomaly": None, "threat_type": "parse_failed", "_parse_failed": True}
|
||
|
||
|
||
def build_annotation_prompt(episode: Dict) -> str:
|
||
"""构建标注 prompt(简化版,不用 EDRC)"""
|
||
from src.evaluation.prompt_builder import summarize_home_layout, format_event_log
|
||
|
||
home_state = episode.get("home_state", {})
|
||
events = episode.get("event_sequence", [])
|
||
query = episode.get("query", "")
|
||
|
||
clean_events = []
|
||
for evt in events:
|
||
clean = {"timestamp": evt.get("timestamp"), "device_id": evt.get("device_id"), "event_type": evt.get("event_type")}
|
||
if evt.get("event_type") == "attribute_change":
|
||
clean["cluster"] = evt.get("cluster", "")
|
||
clean["attribute"] = evt.get("attribute", "")
|
||
clean["value"] = evt.get("value")
|
||
elif evt.get("event_type") == "device_event":
|
||
clean["event_name"] = evt.get("event_name", "")
|
||
clean["fields"] = evt.get("fields", {})
|
||
elif evt.get("event_type") == "command":
|
||
clean["command"] = evt.get("command", "")
|
||
clean_events.append(clean)
|
||
|
||
parts = [
|
||
"## 家庭环境\n",
|
||
summarize_home_layout(home_state),
|
||
"\n## 设备日志\n",
|
||
format_event_log(clean_events),
|
||
f"\n## 任务\n{query}\n",
|
||
ANNOTATOR_FORMAT,
|
||
]
|
||
return "\n".join(parts)
|
||
|
||
|
||
def compute_kappa(labels1: List, labels2: List) -> float:
|
||
"""计算 Cohen's kappa"""
|
||
assert len(labels1) == len(labels2)
|
||
n = len(labels1)
|
||
if n == 0:
|
||
return 0.0
|
||
|
||
all_labels = sorted(set(labels1 + labels2))
|
||
label_to_idx = {l: i for i, l in enumerate(all_labels)}
|
||
k = len(all_labels)
|
||
|
||
matrix = [[0] * k for _ in range(k)]
|
||
for l1, l2 in zip(labels1, labels2):
|
||
matrix[label_to_idx[l1]][label_to_idx[l2]] += 1
|
||
|
||
po = sum(matrix[i][i] for i in range(k)) / n
|
||
pe = sum(
|
||
sum(matrix[i][j] for j in range(k)) * sum(matrix[j][i] for j in range(k))
|
||
for i in range(k)
|
||
) / (n * n)
|
||
|
||
if pe == 1.0:
|
||
return 1.0
|
||
return (po - pe) / (1 - pe)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="LLM 标注一致性验证 (Cohen's kappa)")
|
||
parser.add_argument("--model1", required=True, help="标注模型1")
|
||
parser.add_argument("--api-base1", required=True, help="模型1 API 地址")
|
||
parser.add_argument("--api-key1", required=True, help="模型1 API Key")
|
||
parser.add_argument("--model2", default=None, help="标注模型2(不指定则只跟 GT 比)")
|
||
parser.add_argument("--api-base2", default=None, help="模型2 API 地址")
|
||
parser.add_argument("--api-key2", default=None, help="模型2 API Key")
|
||
parser.add_argument("--sample-size", type=int, default=70, help="抽样数量")
|
||
parser.add_argument("--seed", type=int, default=42, help="随机种子")
|
||
parser.add_argument("--output-dir", default=None, help="输出目录")
|
||
parser.add_argument("--extra-json1", default=None, help="模型1额外参数JSON")
|
||
parser.add_argument("--extra-json2", default=None, help="模型2额外参数JSON")
|
||
args = parser.parse_args()
|
||
|
||
extra1 = json.loads(args.extra_json1) if args.extra_json1 else None
|
||
extra2 = json.loads(args.extra_json2) if args.extra_json2 else None
|
||
|
||
output_dir = Path(args.output_dir) if args.output_dir else RESULTS_DIR
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 分层抽样
|
||
print(f"Stratified sampling {args.sample_size} episodes...")
|
||
sampled_files = stratified_sample(args.sample_size, args.seed)
|
||
print(f" Sampled {len(sampled_files)} episodes")
|
||
|
||
# 标注
|
||
results = []
|
||
for i, ep_path in enumerate(sampled_files):
|
||
with open(ep_path, "r", encoding="utf-8") as f:
|
||
episode = json.load(f)
|
||
|
||
prompt_text = build_annotation_prompt(episode)
|
||
gt = episode["ground_truth"]
|
||
meta = episode["metadata"]
|
||
|
||
result = {
|
||
"episode_id": episode.get("episode_id", ""),
|
||
"sq_type": meta.get("sq_type", ""),
|
||
"variant": meta.get("variant", ""),
|
||
"category": gt.get("category", ""),
|
||
"gt_is_anomaly": gt.get("is_anomaly"),
|
||
"gt_threat_type": gt.get("threat_type", "none"),
|
||
}
|
||
|
||
# 模型1标注
|
||
try:
|
||
raw1 = call_annotator(ANNOTATOR_SYSTEM, prompt_text, args.model1, args.api_base1, args.api_key1, extra_body=extra1)
|
||
ann1 = parse_annotation(raw1)
|
||
except Exception as e:
|
||
ann1 = {"is_anomaly": None, "threat_type": "error", "_error": str(e)}
|
||
result["model1_is_anomaly"] = ann1.get("is_anomaly")
|
||
result["model1_threat_type"] = ann1.get("threat_type", "none")
|
||
|
||
# 模型2标注(如果有)
|
||
if args.model2:
|
||
try:
|
||
raw2 = call_annotator(ANNOTATOR_SYSTEM, prompt_text, args.model2, args.api_base2, args.api_key2, extra_body=extra2)
|
||
ann2 = parse_annotation(raw2)
|
||
except Exception as e:
|
||
ann2 = {"is_anomaly": None, "threat_type": "error", "_error": str(e)}
|
||
result["model2_is_anomaly"] = ann2.get("is_anomaly")
|
||
result["model2_threat_type"] = ann2.get("threat_type", "none")
|
||
|
||
results.append(result)
|
||
m1_label = "anomaly" if result["model1_is_anomaly"] else "normal"
|
||
print(f" [{i+1}/{len(sampled_files)}] {result['episode_id']}: GT={result['gt_is_anomaly']} M1={m1_label}")
|
||
time.sleep(0.5)
|
||
|
||
# 保存原始标注
|
||
with open(output_dir / "annotations.json", "w", encoding="utf-8") as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
|
||
# 计算 kappa
|
||
print(f"\n{'='*60}")
|
||
print(f" Cohen's Kappa Analysis")
|
||
print(f"{'='*60}\n")
|
||
|
||
# 过滤掉 parse 失败的
|
||
valid = [r for r in results if r.get("model1_is_anomaly") is not None]
|
||
print(f"Valid annotations: {len(valid)}/{len(results)}")
|
||
|
||
# Model1 vs GT
|
||
gt_labels = [str(r["gt_is_anomaly"]) for r in valid]
|
||
m1_labels = [str(r["model1_is_anomaly"]) for r in valid]
|
||
kappa_m1_gt = compute_kappa(gt_labels, m1_labels)
|
||
agree_m1_gt = sum(1 for g, m in zip(gt_labels, m1_labels) if g == m) / len(valid)
|
||
print(f"\n{args.model1} vs Ground Truth:")
|
||
print(f" Agreement: {agree_m1_gt:.1%}")
|
||
print(f" Cohen's kappa: {kappa_m1_gt:.3f}")
|
||
|
||
# Model1 vs Model2(如果有)
|
||
if args.model2:
|
||
valid2 = [r for r in valid if r.get("model2_is_anomaly") is not None]
|
||
m1_labels2 = [str(r["model1_is_anomaly"]) for r in valid2]
|
||
m2_labels2 = [str(r["model2_is_anomaly"]) for r in valid2]
|
||
kappa_m1_m2 = compute_kappa(m1_labels2, m2_labels2)
|
||
agree_m1_m2 = sum(1 for a, b in zip(m1_labels2, m2_labels2) if a == b) / len(valid2)
|
||
print(f"\n{args.model1} vs {args.model2}:")
|
||
print(f" Agreement: {agree_m1_m2:.1%}")
|
||
print(f" Cohen's kappa: {kappa_m1_m2:.3f}")
|
||
|
||
# Model2 vs GT
|
||
gt_labels2 = [str(r["gt_is_anomaly"]) for r in valid2]
|
||
m2_gt_labels = [str(r["model2_is_anomaly"]) for r in valid2]
|
||
kappa_m2_gt = compute_kappa(gt_labels2, m2_gt_labels)
|
||
agree_m2_gt = sum(1 for g, m in zip(gt_labels2, m2_gt_labels) if g == m) / len(valid2)
|
||
print(f"\n{args.model2} vs Ground Truth:")
|
||
print(f" Agreement: {agree_m2_gt:.1%}")
|
||
print(f" Cohen's kappa: {kappa_m2_gt:.3f}")
|
||
|
||
# 保存汇总
|
||
summary = {
|
||
"sample_size": len(results),
|
||
"valid_annotations": len(valid),
|
||
"model1": args.model1,
|
||
"model1_vs_gt_agreement": round(agree_m1_gt, 4),
|
||
"model1_vs_gt_kappa": round(kappa_m1_gt, 4),
|
||
}
|
||
if args.model2:
|
||
summary["model2"] = args.model2
|
||
summary["model1_vs_model2_agreement"] = round(agree_m1_m2, 4)
|
||
summary["model1_vs_model2_kappa"] = round(kappa_m1_m2, 4)
|
||
summary["model2_vs_gt_agreement"] = round(agree_m2_gt, 4)
|
||
summary["model2_vs_gt_kappa"] = round(kappa_m2_gt, 4)
|
||
|
||
with open(output_dir / "kappa_summary.json", "w", encoding="utf-8") as f:
|
||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\nResults saved to {output_dir}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|