import re from collections import Counter, defaultdict from datetime import datetime, timedelta from typing import Dict, List LOCK_STATE_LABELS = { 0: "not_fully_locked", 1: "locked", 2: "unlocked", 3: "unlatched", } def _device_room_map(home_state: Dict) -> Dict[str, str]: mapping = {} for device_id, info in home_state.get("devices", {}).items(): mapping[device_id] = info.get("room_id", "unknown") return mapping def _format_temperature_value(value) -> str: if not isinstance(value, (int, float)): return str(value) celsius = value / 100 if abs(value) >= 100 else value return f"{celsius:.2f} C (raw={value})" def _format_boolean_value(attribute: str, value) -> str: if not isinstance(value, bool): return str(value) label_map = { "OnOff": {True: "on", False: "off"}, } if attribute in label_map: return f"{label_map[attribute][value]} (raw={value})" return f"{str(value).lower()} (raw={value})" def _format_int_state(cluster: str, attribute: str, value) -> str: if not isinstance(value, int): return str(value) if cluster == "OccupancySensing" and attribute == "Occupancy": label = "occupied" if value else "unoccupied" return f"{label} (raw={value})" if cluster == "DoorLock" and attribute == "LockState": label = LOCK_STATE_LABELS.get(value, "unknown_lock_state") return f"{label} (raw={value})" return str(value) def _format_value(cluster: str, attribute: str, value) -> str: if attribute == "MeasuredValue" and "Temperature" in cluster: return _format_temperature_value(value) if isinstance(value, bool): return _format_boolean_value(attribute, value) if isinstance(value, int): return _format_int_state(cluster, attribute, value) return str(value) def _format_event_line(event: Dict) -> str: ts = event.get("timestamp", "?") dev = event.get("device_id", "?") etype = event.get("event_type", "") if etype == "attribute_change": cluster = event.get("cluster", "") attr = event.get("attribute", "") value = _format_value(cluster, attr, event.get("value", "")) left = f"{cluster}.{attr}" if cluster else attr return f"[{ts}] {dev} | {left} = {value}" if etype == "device_event": fields = ", ".join(f"{k}={v}" for k, v in event.get("fields", {}).items()) return f"[{ts}] {dev} | Event: {event.get('event_name', '')}({fields})" if etype == "command": return f"[{ts}] {dev} | Command: {event.get('command', '')}" return f"[{ts}] {dev} | {etype}" def _filter_query_window(events: List[Dict], query: str) -> List[Dict]: match = re.search(r"(?:过去|杩囧幓)\s*(\d+)\s*(?:小时|灏忔椂)", query) if not match or not events: return events try: last_dt = datetime.fromisoformat(events[-1]["timestamp"]) except Exception: return events cutoff = last_dt - timedelta(hours=int(match.group(1))) cutoff_ts = cutoff.isoformat() return [evt for evt in events if evt.get("timestamp", "") >= cutoff_ts] def build_layout_summary(home_state: Dict) -> str: by_room = defaultdict(list) for device_id, info in home_state.get("devices", {}).items(): by_room[info.get("room_id", "unknown")].append( f"{info.get('display_name', device_id)}<{info.get('device_type', '?')}>" ) lines = [f"Layout: {home_state.get('layout_name', 'unknown')}"] for room in sorted(by_room): lines.append(f"- {room}: {', '.join(sorted(by_room[room]))}") return "\n".join(lines) def build_protocol_notes() -> List[str]: return [ "These logs follow Matter-style smart-home event semantics.", "TemperatureMeasurement.MeasuredValue is represented in centi-degrees Celsius in this benchmark: 2466 means 24.66 C, not 2466 C.", "BooleanState and other protocol booleans should be interpreted using transitions and surrounding events, not naive natural-language assumptions from a single isolated value.", "OccupancySensing.Occupancy uses 0=unoccupied and 1=occupied.", "DoorLock.LockState is an enumerated state; formatted logs may show both the label and the raw code.", "A device-fault conclusion requires direct fault evidence such as stuck values, repeated non-recovery, explicit alarm/fault events, or impossible state transitions. Scaled temperature values alone are not enough.", ] def chunk_events(events: List[Dict], room_map: Dict[str, str], chunk_size: int = 80) -> List[Dict]: chunks = [] for start in range(0, len(events), chunk_size): block = events[start : start + chunk_size] rooms = Counter() devices = Counter() alarmish = 0 for evt in block: device_id = evt.get("device_id", "?") devices[device_id] += 1 if any(token in _format_event_line(evt) for token in ["Alarm", "Fault", "Error", "LockState", "Occupancy", "Battery"]): alarmish += 1 rooms[room_map.get(device_id, "unknown")] += 1 chunks.append( { "chunk_id": f"C{len(chunks):02d}", "start_ts": block[0].get("timestamp", "?"), "end_ts": block[-1].get("timestamp", "?"), "event_count": len(block), "rooms": dict(rooms), "top_devices": [k for k, _ in devices.most_common(6)], "alarmish_count": alarmish, "preview_lines": [_format_event_line(evt) for evt in block[:12]], "raw_events": block, } ) return chunks def compute_structured_signals(events: List[Dict], room_map: Dict[str, str]) -> Dict: room_activity = Counter() attr_counter = Counter() lock_state_flips = 0 occupancy_events = 0 temperature_series = defaultdict(list) for evt in events: room_activity[room_map.get(evt.get("device_id", ""), "unknown")] += 1 if evt.get("event_type") != "attribute_change": continue attr = evt.get("attribute", "") attr_counter[attr] += 1 if attr == "LockState": lock_state_flips += 1 if attr == "Occupancy": occupancy_events += 1 if attr == "MeasuredValue" and "Temperature" in evt.get("cluster", ""): value = evt.get("value") if isinstance(value, (int, float)): temperature_series[evt["device_id"]].append(value) temp_patterns = [] for device_id, values in temperature_series.items(): if len(values) < 12: continue span_raw = max(values) - min(values) span_c = span_raw / 100 if abs(span_raw) >= 100 else span_raw monotonic_up = sum(1 for a, b in zip(values, values[1:]) if b >= a) monotonic_down = sum(1 for a, b in zip(values, values[1:]) if b <= a) if span_c <= 0.2: temp_patterns.append(f"{device_id}: near-flat temperature readings across {len(values)} samples, span={span_c:.2f} C") elif max(monotonic_up, monotonic_down) / max(1, len(values) - 1) >= 0.85: direction = "upward" if monotonic_up >= monotonic_down else "downward" temp_patterns.append(f"{device_id}: mostly {direction} temperature trend, span={span_c:.2f} C") return { "room_activity": dict(room_activity), "attribute_frequency": dict(attr_counter.most_common(12)), "lock_state_flips": lock_state_flips, "occupancy_events": occupancy_events, "temperature_patterns": temp_patterns, "value_format_hints": { "temperature": "MeasuredValue is shown as Celsius with the raw integer preserved in parentheses.", "occupancy": "0=unoccupied, 1=occupied", "lock_state": "enumerated lock state rendered as label + raw code", }, } def build_case_material(episode: Dict, chunk_size: int = 80) -> Dict: events = _filter_query_window(episode.get("event_sequence", []), episode.get("query", "")) home_state = episode.get("home_state", {}) room_map = _device_room_map(home_state) chunks = chunk_events(events, room_map=room_map, chunk_size=chunk_size) signals = compute_structured_signals(events, room_map=room_map) chunk_index = [] for chunk in chunks: chunk_index.append( { "chunk_id": chunk["chunk_id"], "start_ts": chunk["start_ts"], "end_ts": chunk["end_ts"], "event_count": chunk["event_count"], "rooms": chunk["rooms"], "top_devices": chunk["top_devices"], "alarmish_count": chunk["alarmish_count"], "preview_lines": chunk["preview_lines"], } ) return { "episode_id": episode.get("episode_id", ""), "query": episode.get("query", ""), "layout_summary": build_layout_summary(home_state), "protocol_notes": build_protocol_notes(), "chunk_index": chunk_index, "chunks": chunks, "signals": signals, "event_count": len(events), } def materialize_chunks(chunks: List[Dict], chunk_ids: List[str], max_chunks: int = 5) -> str: selected = [chunk for chunk in chunks if chunk["chunk_id"] in set(chunk_ids)] if not selected: selected = chunks[:max_chunks] selected = selected[:max_chunks] parts = [] for chunk in selected: parts.append(f"## {chunk['chunk_id']} {chunk['start_ts']} -> {chunk['end_ts']}") for evt in chunk["raw_events"]: parts.append(_format_event_line(evt)) return "\n".join(parts)