from collections import Counter, defaultdict from datetime import datetime, timedelta from typing import Dict, List, Tuple IMPORTANT_ATTR_TERMS = { "Alarm": 10, "Error": 9, "Fault": 9, "Battery": 6, "LockState": 5, "Occupancy": 4, "BooleanState": 4, "OperationalStatus": 5, "Position": 5, "OnOff": 3, "Smoke": 6, "CO": 6, "Water": 6, } def _format_value(attribute: str, value) -> str: if isinstance(value, bool): return str(value) if isinstance(value, (int, float)) and "Temperature" in attribute: return f"{value / 100:.2f}C" if abs(value) > 100 else str(value) return str(value) def format_event_line(event: Dict) -> str: ts = event.get("timestamp", "?") dev = event.get("device_id", "?") etype = event.get("event_type", "") if etype == "attribute_change": cluster = event.get("cluster", "") attr = event.get("attribute", "") value = _format_value(attr, event.get("value")) left = f"{cluster}.{attr}" if cluster else attr return f"[{ts}] {dev} | {left} = {value}" if etype == "device_event": name = event.get("event_name", "") fields = event.get("fields", {}) field_text = ", ".join(f"{k}={v}" for k, v in fields.items()) return f"[{ts}] {dev} | Event: {name}({field_text})" if etype == "command": return f"[{ts}] {dev} | Command: {event.get('command', '')}" return f"[{ts}] {dev} | {etype}" def _device_room_map(home_state: Dict) -> Dict[str, str]: mapping = {} for device_id, info in home_state.get("devices", {}).items(): mapping[device_id] = info.get("room_id", "unknown") return mapping def _filter_events_by_query_window(events: List[Dict], query: str) -> List[Dict]: import re match = re.search(r"过去\s*(\d+)\s*小时", query) if not match or not events: return events try: last_dt = datetime.fromisoformat(events[-1]["timestamp"]) except Exception: return events cutoff = last_dt - timedelta(hours=int(match.group(1))) cutoff_ts = cutoff.isoformat() return [evt for evt in events if evt.get("timestamp", "") >= cutoff_ts] def _event_salience(event: Dict) -> int: score = 0 etype = event.get("event_type", "") if etype == "device_event": score += 8 elif etype == "command": score += 4 attr = f"{event.get('cluster', '')}.{event.get('attribute', '')}" for term, weight in IMPORTANT_ATTR_TERMS.items(): if term in attr: score += weight if event.get("attribute") == "MeasuredValue": score += 1 return score def _temperature_patterns(events: List[Dict]) -> List[str]: series = defaultdict(list) for evt in events: if evt.get("event_type") != "attribute_change": continue if evt.get("attribute") != "MeasuredValue": continue if "Temperature" not in evt.get("cluster", ""): continue value = evt.get("value") if isinstance(value, (int, float)): series[evt["device_id"]].append(value) findings = [] for device_id, values in series.items(): if len(values) < 12: continue span = max(values) - min(values) same_ratio = max(Counter(values).values()) / len(values) increasing = sum(1 for a, b in zip(values, values[1:]) if b >= a) decreasing = sum(1 for a, b in zip(values, values[1:]) if b <= a) trend_ratio = max(increasing, decreasing) / max(1, len(values) - 1) if span <= 2 and same_ratio >= 0.9: findings.append(f"{device_id}: near-flat temperature readings across {len(values)} samples") elif span >= 80 and trend_ratio >= 0.85: direction = "upward" if increasing >= decreasing else "downward" findings.append(f"{device_id}: strong {direction} monotonic temperature trend, span={span/100:.2f}C") return findings def _summarize_layout(home_state: Dict) -> str: by_room = defaultdict(list) for device_id, info in home_state.get("devices", {}).items(): by_room[info.get("room_id", "unknown")].append( f"{info.get('display_name', device_id)}<{info.get('device_type', '?')}>" ) lines = [f"Layout: {home_state.get('layout_name', 'unknown')}"] for room in sorted(by_room): lines.append(f"- {room}: {', '.join(sorted(by_room[room]))}") return "\n".join(lines) def extract_evidence_packet( episode: Dict, max_salient_events: int = 60, max_focus_events: int = 120, ) -> Dict: events = _filter_events_by_query_window(episode.get("event_sequence", []), episode.get("query", "")) home_state = episode.get("home_state", {}) room_map = _device_room_map(home_state) scored: List[Tuple[int, int, Dict]] = [] for idx, evt in enumerate(events): score = _event_salience(evt) if score > 0: scored.append((score, idx, evt)) scored.sort(key=lambda item: (-item[0], item[1])) top = scored[:max_salient_events] salient_indices = sorted({idx for _, idx, _ in top}) focus_indices = set() for idx in salient_indices: for near in range(max(0, idx - 1), min(len(events), idx + 2)): focus_indices.add(near) focus_events = [events[i] for i in sorted(focus_indices)[:max_focus_events]] room_activity = Counter(room_map.get(evt.get("device_id", ""), "unknown") for evt in focus_events) suspicious_signals = [format_event_line(evt) for _, _, evt in top[:20]] focus_log_text = "\n".join(format_event_line(evt) for evt in focus_events) packet = { "episode_id": episode.get("episode_id", ""), "sq_type": episode.get("metadata", {}).get("sq_type", ""), "query": episode.get("query", ""), "layout_summary": _summarize_layout(home_state), "event_count": len(events), "salient_event_count": len(salient_indices), "focus_event_count": len(focus_events), "room_activity": dict(room_activity), "temperature_patterns": _temperature_patterns(events), "suspicious_signals": suspicious_signals, "focus_log_text": focus_log_text, } return packet