Files
llmiotsafe/EGP/evidence.py
2026-05-12 17:01:39 +08:00

174 lines
6.1 KiB
Python

from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
IMPORTANT_ATTR_TERMS = {
"Alarm": 10,
"Error": 9,
"Fault": 9,
"Battery": 6,
"LockState": 5,
"Occupancy": 4,
"BooleanState": 4,
"OperationalStatus": 5,
"Position": 5,
"OnOff": 3,
"Smoke": 6,
"CO": 6,
"Water": 6,
}
def _format_value(attribute: str, value) -> str:
if isinstance(value, bool):
return str(value)
if isinstance(value, (int, float)) and "Temperature" in attribute:
return f"{value / 100:.2f}C" if abs(value) > 100 else str(value)
return str(value)
def format_event_line(event: Dict) -> str:
ts = event.get("timestamp", "?")
dev = event.get("device_id", "?")
etype = event.get("event_type", "")
if etype == "attribute_change":
cluster = event.get("cluster", "")
attr = event.get("attribute", "")
value = _format_value(attr, event.get("value"))
left = f"{cluster}.{attr}" if cluster else attr
return f"[{ts}] {dev} | {left} = {value}"
if etype == "device_event":
name = event.get("event_name", "")
fields = event.get("fields", {})
field_text = ", ".join(f"{k}={v}" for k, v in fields.items())
return f"[{ts}] {dev} | Event: {name}({field_text})"
if etype == "command":
return f"[{ts}] {dev} | Command: {event.get('command', '')}"
return f"[{ts}] {dev} | {etype}"
def _device_room_map(home_state: Dict) -> Dict[str, str]:
mapping = {}
for device_id, info in home_state.get("devices", {}).items():
mapping[device_id] = info.get("room_id", "unknown")
return mapping
def _filter_events_by_query_window(events: List[Dict], query: str) -> List[Dict]:
import re
match = re.search(r"过去\s*(\d+)\s*小时", query)
if not match or not events:
return events
try:
last_dt = datetime.fromisoformat(events[-1]["timestamp"])
except Exception:
return events
cutoff = last_dt - timedelta(hours=int(match.group(1)))
cutoff_ts = cutoff.isoformat()
return [evt for evt in events if evt.get("timestamp", "") >= cutoff_ts]
def _event_salience(event: Dict) -> int:
score = 0
etype = event.get("event_type", "")
if etype == "device_event":
score += 8
elif etype == "command":
score += 4
attr = f"{event.get('cluster', '')}.{event.get('attribute', '')}"
for term, weight in IMPORTANT_ATTR_TERMS.items():
if term in attr:
score += weight
if event.get("attribute") == "MeasuredValue":
score += 1
return score
def _temperature_patterns(events: List[Dict]) -> List[str]:
series = defaultdict(list)
for evt in events:
if evt.get("event_type") != "attribute_change":
continue
if evt.get("attribute") != "MeasuredValue":
continue
if "Temperature" not in evt.get("cluster", ""):
continue
value = evt.get("value")
if isinstance(value, (int, float)):
series[evt["device_id"]].append(value)
findings = []
for device_id, values in series.items():
if len(values) < 12:
continue
span = max(values) - min(values)
same_ratio = max(Counter(values).values()) / len(values)
increasing = sum(1 for a, b in zip(values, values[1:]) if b >= a)
decreasing = sum(1 for a, b in zip(values, values[1:]) if b <= a)
trend_ratio = max(increasing, decreasing) / max(1, len(values) - 1)
if span <= 2 and same_ratio >= 0.9:
findings.append(f"{device_id}: near-flat temperature readings across {len(values)} samples")
elif span >= 80 and trend_ratio >= 0.85:
direction = "upward" if increasing >= decreasing else "downward"
findings.append(f"{device_id}: strong {direction} monotonic temperature trend, span={span/100:.2f}C")
return findings
def _summarize_layout(home_state: Dict) -> str:
by_room = defaultdict(list)
for device_id, info in home_state.get("devices", {}).items():
by_room[info.get("room_id", "unknown")].append(
f"{info.get('display_name', device_id)}<{info.get('device_type', '?')}>"
)
lines = [f"Layout: {home_state.get('layout_name', 'unknown')}"]
for room in sorted(by_room):
lines.append(f"- {room}: {', '.join(sorted(by_room[room]))}")
return "\n".join(lines)
def extract_evidence_packet(
episode: Dict,
max_salient_events: int = 60,
max_focus_events: int = 120,
) -> Dict:
events = _filter_events_by_query_window(episode.get("event_sequence", []), episode.get("query", ""))
home_state = episode.get("home_state", {})
room_map = _device_room_map(home_state)
scored: List[Tuple[int, int, Dict]] = []
for idx, evt in enumerate(events):
score = _event_salience(evt)
if score > 0:
scored.append((score, idx, evt))
scored.sort(key=lambda item: (-item[0], item[1]))
top = scored[:max_salient_events]
salient_indices = sorted({idx for _, idx, _ in top})
focus_indices = set()
for idx in salient_indices:
for near in range(max(0, idx - 1), min(len(events), idx + 2)):
focus_indices.add(near)
focus_events = [events[i] for i in sorted(focus_indices)[:max_focus_events]]
room_activity = Counter(room_map.get(evt.get("device_id", ""), "unknown") for evt in focus_events)
suspicious_signals = [format_event_line(evt) for _, _, evt in top[:20]]
focus_log_text = "\n".join(format_event_line(evt) for evt in focus_events)
packet = {
"episode_id": episode.get("episode_id", ""),
"sq_type": episode.get("metadata", {}).get("sq_type", ""),
"query": episode.get("query", ""),
"layout_summary": _summarize_layout(home_state),
"event_count": len(events),
"salient_event_count": len(salient_indices),
"focus_event_count": len(focus_events),
"room_activity": dict(room_activity),
"temperature_patterns": _temperature_patterns(events),
"suspicious_signals": suspicious_signals,
"focus_log_text": focus_log_text,
}
return packet