Files
llmiotsafe/annotate_kappa.py
2026-05-12 17:01:39 +08:00

330 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
annotate_kappa.py — 用两个 LLM 独立标注 benchmark 子集,计算 Cohen's kappa。
使用方式:
# 用两个不同模型标注
python annotate_kappa.py \
--model1 deepseek-v4-flash --api-base1 https://api.deepseek.com/v1 --api-key1 sk-xxx \
--model2 claude-sonnet-4-6 --api-base2 https://your-api/v1 --api-key2 sk-xxx \
--sample-size 70
# 只用一个模型标注(跟 ground truth 比)
python annotate_kappa.py \
--model1 deepseek-v4-flash --api-base1 https://api.deepseek.com/v1 --api-key1 sk-xxx \
--sample-size 70 --gt-only
"""
import json
import os
import sys
import time
import random
import argparse
import requests
from pathlib import Path
from collections import Counter
from typing import Dict, List, Any
PROJECT_ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(PROJECT_ROOT))
BENCHMARK_DIR = PROJECT_ROOT / "data" / "benchmark"
RESULTS_DIR = PROJECT_ROOT / "results" / "kappa"
# 标注用的 prompt — 故意简化,不用 EDRC/baseline 的分析框架
ANNOTATOR_SYSTEM = """你是一位智能家居安全专家,正在审查设备日志标注数据。
请仔细阅读设备事件日志,独立判断是否存在安全异常。
不要过度解读正常的日常活动(温度波动、正常开关灯等)。
只有当你发现明确的异常证据时才判定为异常。"""
ANNOTATOR_FORMAT = """请只输出一个 JSON不要输出其他内容
```json
{
"is_anomaly": true或false,
"threat_type": "具体类型或none",
"confidence": "high/medium/low",
"brief_reason": "一句话理由"
}
```
threat_type 从以下选择intrusion / tailgating / credential_theft / fire_risk / unattended_cooking / carbon_monoxide / sensor_stuck / sensor_drift / sensor_malfunction / actuator_stuck / lock_malfunction / safety_device_failure / water_leak / possible_fall / abnormal_inactivity / health_concern / child_safety / behavioral_anomaly / none"""
def stratified_sample(sample_size: int, seed: int = 42) -> List[str]:
"""分层抽样:确保每种 SQ × variant 都覆盖"""
random.seed(seed)
all_files = {}
for sq in ["sq1", "sq2", "sq3", "sq4", "sq5"]:
sq_dir = BENCHMARK_DIR / sq
if not sq_dir.exists():
continue
for f in sq_dir.glob("*.json"):
for v in ["TP", "FP", "TN"]:
if f"_{v}_" in f.stem:
key = f"{sq.upper()}_{v}"
if key not in all_files:
all_files[key] = []
all_files[key].append(str(f))
break
# 每个 strata 至少抽 2 个,剩余按比例分配
sampled = []
per_strata = max(2, sample_size // len(all_files))
for key, files in sorted(all_files.items()):
n = min(per_strata, len(files))
sampled.extend(random.sample(files, n))
# 如果不够,随机补
remaining_pool = [f for files in all_files.values() for f in files if f not in sampled]
while len(sampled) < sample_size and remaining_pool:
pick = random.choice(remaining_pool)
remaining_pool.remove(pick)
sampled.append(pick)
return sampled[:sample_size]
def call_annotator(system, user, model, api_base, api_key, timeout=120, extra_body=None):
"""调用标注模型"""
url = api_base.rstrip("/")
if not url.endswith("/chat/completions"):
url += "/chat/completions"
headers = {"Content-Type": "application/json"}
if api_key and api_key != "not-needed":
headers["Authorization"] = f"Bearer {api_key}"
payload = {
"model": model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": 0.0,
"max_tokens": 1024,
}
if extra_body:
payload.update(extra_body)
resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
def parse_annotation(raw: str) -> Dict:
"""解析标注结果"""
import re
text = raw.strip()
json_match = re.search(r"```(?:json)?\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
if json_match:
text = json_match.group(1).strip()
try:
return json.loads(text)
except:
first = text.find("{")
last = text.rfind("}")
if first != -1 and last > first:
try:
return json.loads(text[first:last + 1])
except:
pass
return {"is_anomaly": None, "threat_type": "parse_failed", "_parse_failed": True}
def build_annotation_prompt(episode: Dict) -> str:
"""构建标注 prompt简化版不用 EDRC"""
from src.evaluation.prompt_builder import summarize_home_layout, format_event_log
home_state = episode.get("home_state", {})
events = episode.get("event_sequence", [])
query = episode.get("query", "")
clean_events = []
for evt in events:
clean = {"timestamp": evt.get("timestamp"), "device_id": evt.get("device_id"), "event_type": evt.get("event_type")}
if evt.get("event_type") == "attribute_change":
clean["cluster"] = evt.get("cluster", "")
clean["attribute"] = evt.get("attribute", "")
clean["value"] = evt.get("value")
elif evt.get("event_type") == "device_event":
clean["event_name"] = evt.get("event_name", "")
clean["fields"] = evt.get("fields", {})
elif evt.get("event_type") == "command":
clean["command"] = evt.get("command", "")
clean_events.append(clean)
parts = [
"## 家庭环境\n",
summarize_home_layout(home_state),
"\n## 设备日志\n",
format_event_log(clean_events),
f"\n## 任务\n{query}\n",
ANNOTATOR_FORMAT,
]
return "\n".join(parts)
def compute_kappa(labels1: List, labels2: List) -> float:
"""计算 Cohen's kappa"""
assert len(labels1) == len(labels2)
n = len(labels1)
if n == 0:
return 0.0
all_labels = sorted(set(labels1 + labels2))
label_to_idx = {l: i for i, l in enumerate(all_labels)}
k = len(all_labels)
matrix = [[0] * k for _ in range(k)]
for l1, l2 in zip(labels1, labels2):
matrix[label_to_idx[l1]][label_to_idx[l2]] += 1
po = sum(matrix[i][i] for i in range(k)) / n
pe = sum(
sum(matrix[i][j] for j in range(k)) * sum(matrix[j][i] for j in range(k))
for i in range(k)
) / (n * n)
if pe == 1.0:
return 1.0
return (po - pe) / (1 - pe)
def main():
parser = argparse.ArgumentParser(description="LLM 标注一致性验证 (Cohen's kappa)")
parser.add_argument("--model1", required=True, help="标注模型1")
parser.add_argument("--api-base1", required=True, help="模型1 API 地址")
parser.add_argument("--api-key1", required=True, help="模型1 API Key")
parser.add_argument("--model2", default=None, help="标注模型2不指定则只跟 GT 比)")
parser.add_argument("--api-base2", default=None, help="模型2 API 地址")
parser.add_argument("--api-key2", default=None, help="模型2 API Key")
parser.add_argument("--sample-size", type=int, default=70, help="抽样数量")
parser.add_argument("--seed", type=int, default=42, help="随机种子")
parser.add_argument("--output-dir", default=None, help="输出目录")
parser.add_argument("--extra-json1", default=None, help="模型1额外参数JSON")
parser.add_argument("--extra-json2", default=None, help="模型2额外参数JSON")
args = parser.parse_args()
extra1 = json.loads(args.extra_json1) if args.extra_json1 else None
extra2 = json.loads(args.extra_json2) if args.extra_json2 else None
output_dir = Path(args.output_dir) if args.output_dir else RESULTS_DIR
output_dir.mkdir(parents=True, exist_ok=True)
# 分层抽样
print(f"Stratified sampling {args.sample_size} episodes...")
sampled_files = stratified_sample(args.sample_size, args.seed)
print(f" Sampled {len(sampled_files)} episodes")
# 标注
results = []
for i, ep_path in enumerate(sampled_files):
with open(ep_path, "r", encoding="utf-8") as f:
episode = json.load(f)
prompt_text = build_annotation_prompt(episode)
gt = episode["ground_truth"]
meta = episode["metadata"]
result = {
"episode_id": episode.get("episode_id", ""),
"sq_type": meta.get("sq_type", ""),
"variant": meta.get("variant", ""),
"category": gt.get("category", ""),
"gt_is_anomaly": gt.get("is_anomaly"),
"gt_threat_type": gt.get("threat_type", "none"),
}
# 模型1标注
try:
raw1 = call_annotator(ANNOTATOR_SYSTEM, prompt_text, args.model1, args.api_base1, args.api_key1, extra_body=extra1)
ann1 = parse_annotation(raw1)
except Exception as e:
ann1 = {"is_anomaly": None, "threat_type": "error", "_error": str(e)}
result["model1_is_anomaly"] = ann1.get("is_anomaly")
result["model1_threat_type"] = ann1.get("threat_type", "none")
# 模型2标注如果有
if args.model2:
try:
raw2 = call_annotator(ANNOTATOR_SYSTEM, prompt_text, args.model2, args.api_base2, args.api_key2, extra_body=extra2)
ann2 = parse_annotation(raw2)
except Exception as e:
ann2 = {"is_anomaly": None, "threat_type": "error", "_error": str(e)}
result["model2_is_anomaly"] = ann2.get("is_anomaly")
result["model2_threat_type"] = ann2.get("threat_type", "none")
results.append(result)
m1_label = "anomaly" if result["model1_is_anomaly"] else "normal"
print(f" [{i+1}/{len(sampled_files)}] {result['episode_id']}: GT={result['gt_is_anomaly']} M1={m1_label}")
time.sleep(0.5)
# 保存原始标注
with open(output_dir / "annotations.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
# 计算 kappa
print(f"\n{'='*60}")
print(f" Cohen's Kappa Analysis")
print(f"{'='*60}\n")
# 过滤掉 parse 失败的
valid = [r for r in results if r.get("model1_is_anomaly") is not None]
print(f"Valid annotations: {len(valid)}/{len(results)}")
# Model1 vs GT
gt_labels = [str(r["gt_is_anomaly"]) for r in valid]
m1_labels = [str(r["model1_is_anomaly"]) for r in valid]
kappa_m1_gt = compute_kappa(gt_labels, m1_labels)
agree_m1_gt = sum(1 for g, m in zip(gt_labels, m1_labels) if g == m) / len(valid)
print(f"\n{args.model1} vs Ground Truth:")
print(f" Agreement: {agree_m1_gt:.1%}")
print(f" Cohen's kappa: {kappa_m1_gt:.3f}")
# Model1 vs Model2如果有
if args.model2:
valid2 = [r for r in valid if r.get("model2_is_anomaly") is not None]
m1_labels2 = [str(r["model1_is_anomaly"]) for r in valid2]
m2_labels2 = [str(r["model2_is_anomaly"]) for r in valid2]
kappa_m1_m2 = compute_kappa(m1_labels2, m2_labels2)
agree_m1_m2 = sum(1 for a, b in zip(m1_labels2, m2_labels2) if a == b) / len(valid2)
print(f"\n{args.model1} vs {args.model2}:")
print(f" Agreement: {agree_m1_m2:.1%}")
print(f" Cohen's kappa: {kappa_m1_m2:.3f}")
# Model2 vs GT
gt_labels2 = [str(r["gt_is_anomaly"]) for r in valid2]
m2_gt_labels = [str(r["model2_is_anomaly"]) for r in valid2]
kappa_m2_gt = compute_kappa(gt_labels2, m2_gt_labels)
agree_m2_gt = sum(1 for g, m in zip(gt_labels2, m2_gt_labels) if g == m) / len(valid2)
print(f"\n{args.model2} vs Ground Truth:")
print(f" Agreement: {agree_m2_gt:.1%}")
print(f" Cohen's kappa: {kappa_m2_gt:.3f}")
# 保存汇总
summary = {
"sample_size": len(results),
"valid_annotations": len(valid),
"model1": args.model1,
"model1_vs_gt_agreement": round(agree_m1_gt, 4),
"model1_vs_gt_kappa": round(kappa_m1_gt, 4),
}
if args.model2:
summary["model2"] = args.model2
summary["model1_vs_model2_agreement"] = round(agree_m1_m2, 4)
summary["model1_vs_model2_kappa"] = round(kappa_m1_m2, 4)
summary["model2_vs_gt_agreement"] = round(agree_m2_gt, 4)
summary["model2_vs_gt_kappa"] = round(kappa_m2_gt, 4)
with open(output_dir / "kappa_summary.json", "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"\nResults saved to {output_dir}")
if __name__ == "__main__":
main()