174 lines
6.1 KiB
Python
174 lines
6.1 KiB
Python
from collections import Counter, defaultdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Tuple
|
|
|
|
|
|
IMPORTANT_ATTR_TERMS = {
|
|
"Alarm": 10,
|
|
"Error": 9,
|
|
"Fault": 9,
|
|
"Battery": 6,
|
|
"LockState": 5,
|
|
"Occupancy": 4,
|
|
"BooleanState": 4,
|
|
"OperationalStatus": 5,
|
|
"Position": 5,
|
|
"OnOff": 3,
|
|
"Smoke": 6,
|
|
"CO": 6,
|
|
"Water": 6,
|
|
}
|
|
|
|
|
|
def _format_value(attribute: str, value) -> str:
|
|
if isinstance(value, bool):
|
|
return str(value)
|
|
if isinstance(value, (int, float)) and "Temperature" in attribute:
|
|
return f"{value / 100:.2f}C" if abs(value) > 100 else str(value)
|
|
return str(value)
|
|
|
|
|
|
def format_event_line(event: Dict) -> str:
|
|
ts = event.get("timestamp", "?")
|
|
dev = event.get("device_id", "?")
|
|
etype = event.get("event_type", "")
|
|
if etype == "attribute_change":
|
|
cluster = event.get("cluster", "")
|
|
attr = event.get("attribute", "")
|
|
value = _format_value(attr, event.get("value"))
|
|
left = f"{cluster}.{attr}" if cluster else attr
|
|
return f"[{ts}] {dev} | {left} = {value}"
|
|
if etype == "device_event":
|
|
name = event.get("event_name", "")
|
|
fields = event.get("fields", {})
|
|
field_text = ", ".join(f"{k}={v}" for k, v in fields.items())
|
|
return f"[{ts}] {dev} | Event: {name}({field_text})"
|
|
if etype == "command":
|
|
return f"[{ts}] {dev} | Command: {event.get('command', '')}"
|
|
return f"[{ts}] {dev} | {etype}"
|
|
|
|
|
|
def _device_room_map(home_state: Dict) -> Dict[str, str]:
|
|
mapping = {}
|
|
for device_id, info in home_state.get("devices", {}).items():
|
|
mapping[device_id] = info.get("room_id", "unknown")
|
|
return mapping
|
|
|
|
|
|
def _filter_events_by_query_window(events: List[Dict], query: str) -> List[Dict]:
|
|
import re
|
|
|
|
match = re.search(r"过去\s*(\d+)\s*小时", query)
|
|
if not match or not events:
|
|
return events
|
|
try:
|
|
last_dt = datetime.fromisoformat(events[-1]["timestamp"])
|
|
except Exception:
|
|
return events
|
|
cutoff = last_dt - timedelta(hours=int(match.group(1)))
|
|
cutoff_ts = cutoff.isoformat()
|
|
return [evt for evt in events if evt.get("timestamp", "") >= cutoff_ts]
|
|
|
|
|
|
def _event_salience(event: Dict) -> int:
|
|
score = 0
|
|
etype = event.get("event_type", "")
|
|
if etype == "device_event":
|
|
score += 8
|
|
elif etype == "command":
|
|
score += 4
|
|
attr = f"{event.get('cluster', '')}.{event.get('attribute', '')}"
|
|
for term, weight in IMPORTANT_ATTR_TERMS.items():
|
|
if term in attr:
|
|
score += weight
|
|
if event.get("attribute") == "MeasuredValue":
|
|
score += 1
|
|
return score
|
|
|
|
|
|
def _temperature_patterns(events: List[Dict]) -> List[str]:
|
|
series = defaultdict(list)
|
|
for evt in events:
|
|
if evt.get("event_type") != "attribute_change":
|
|
continue
|
|
if evt.get("attribute") != "MeasuredValue":
|
|
continue
|
|
if "Temperature" not in evt.get("cluster", ""):
|
|
continue
|
|
value = evt.get("value")
|
|
if isinstance(value, (int, float)):
|
|
series[evt["device_id"]].append(value)
|
|
|
|
findings = []
|
|
for device_id, values in series.items():
|
|
if len(values) < 12:
|
|
continue
|
|
span = max(values) - min(values)
|
|
same_ratio = max(Counter(values).values()) / len(values)
|
|
increasing = sum(1 for a, b in zip(values, values[1:]) if b >= a)
|
|
decreasing = sum(1 for a, b in zip(values, values[1:]) if b <= a)
|
|
trend_ratio = max(increasing, decreasing) / max(1, len(values) - 1)
|
|
if span <= 2 and same_ratio >= 0.9:
|
|
findings.append(f"{device_id}: near-flat temperature readings across {len(values)} samples")
|
|
elif span >= 80 and trend_ratio >= 0.85:
|
|
direction = "upward" if increasing >= decreasing else "downward"
|
|
findings.append(f"{device_id}: strong {direction} monotonic temperature trend, span={span/100:.2f}C")
|
|
return findings
|
|
|
|
|
|
def _summarize_layout(home_state: Dict) -> str:
|
|
by_room = defaultdict(list)
|
|
for device_id, info in home_state.get("devices", {}).items():
|
|
by_room[info.get("room_id", "unknown")].append(
|
|
f"{info.get('display_name', device_id)}<{info.get('device_type', '?')}>"
|
|
)
|
|
lines = [f"Layout: {home_state.get('layout_name', 'unknown')}"]
|
|
for room in sorted(by_room):
|
|
lines.append(f"- {room}: {', '.join(sorted(by_room[room]))}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_evidence_packet(
|
|
episode: Dict,
|
|
max_salient_events: int = 60,
|
|
max_focus_events: int = 120,
|
|
) -> Dict:
|
|
events = _filter_events_by_query_window(episode.get("event_sequence", []), episode.get("query", ""))
|
|
home_state = episode.get("home_state", {})
|
|
room_map = _device_room_map(home_state)
|
|
|
|
scored: List[Tuple[int, int, Dict]] = []
|
|
for idx, evt in enumerate(events):
|
|
score = _event_salience(evt)
|
|
if score > 0:
|
|
scored.append((score, idx, evt))
|
|
scored.sort(key=lambda item: (-item[0], item[1]))
|
|
top = scored[:max_salient_events]
|
|
|
|
salient_indices = sorted({idx for _, idx, _ in top})
|
|
focus_indices = set()
|
|
for idx in salient_indices:
|
|
for near in range(max(0, idx - 1), min(len(events), idx + 2)):
|
|
focus_indices.add(near)
|
|
focus_events = [events[i] for i in sorted(focus_indices)[:max_focus_events]]
|
|
|
|
room_activity = Counter(room_map.get(evt.get("device_id", ""), "unknown") for evt in focus_events)
|
|
suspicious_signals = [format_event_line(evt) for _, _, evt in top[:20]]
|
|
focus_log_text = "\n".join(format_event_line(evt) for evt in focus_events)
|
|
|
|
packet = {
|
|
"episode_id": episode.get("episode_id", ""),
|
|
"sq_type": episode.get("metadata", {}).get("sq_type", ""),
|
|
"query": episode.get("query", ""),
|
|
"layout_summary": _summarize_layout(home_state),
|
|
"event_count": len(events),
|
|
"salient_event_count": len(salient_indices),
|
|
"focus_event_count": len(focus_events),
|
|
"room_activity": dict(room_activity),
|
|
"temperature_patterns": _temperature_patterns(events),
|
|
"suspicious_signals": suspicious_signals,
|
|
"focus_log_text": focus_log_text,
|
|
}
|
|
return packet
|
|
|