Files
llmiotsafe/EGPv3/signals.py
2026-05-12 17:01:39 +08:00

238 lines
9.5 KiB
Python

import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
LOCK_STATE_LABELS = {
0: "not_fully_locked",
1: "locked",
2: "unlocked",
3: "unlatched",
}
def _device_room_map(home_state: Dict) -> Dict[str, str]:
mapping = {}
for device_id, info in home_state.get("devices", {}).items():
mapping[device_id] = info.get("room_id", "unknown")
return mapping
def _format_temperature_value(value) -> str:
if not isinstance(value, (int, float)):
return str(value)
celsius = value / 100 if abs(value) >= 100 else value
return f"{celsius:.2f} C (raw={value})"
def _format_boolean_value(attribute: str, value) -> str:
if not isinstance(value, bool):
return str(value)
label_map = {
"OnOff": {True: "on", False: "off"},
}
if attribute in label_map:
return f"{label_map[attribute][value]} (raw={value})"
return f"{str(value).lower()} (raw={value})"
def _format_int_state(cluster: str, attribute: str, value) -> str:
if not isinstance(value, int):
return str(value)
if cluster == "OccupancySensing" and attribute == "Occupancy":
label = "occupied" if value else "unoccupied"
return f"{label} (raw={value})"
if cluster == "DoorLock" and attribute == "LockState":
label = LOCK_STATE_LABELS.get(value, "unknown_lock_state")
return f"{label} (raw={value})"
return str(value)
def _format_value(cluster: str, attribute: str, value) -> str:
if attribute == "MeasuredValue" and "Temperature" in cluster:
return _format_temperature_value(value)
if isinstance(value, bool):
return _format_boolean_value(attribute, value)
if isinstance(value, int):
return _format_int_state(cluster, attribute, value)
return str(value)
def _format_event_line(event: Dict) -> str:
ts = event.get("timestamp", "?")
dev = event.get("device_id", "?")
etype = event.get("event_type", "")
if etype == "attribute_change":
cluster = event.get("cluster", "")
attr = event.get("attribute", "")
value = _format_value(cluster, attr, event.get("value", ""))
left = f"{cluster}.{attr}" if cluster else attr
return f"[{ts}] {dev} | {left} = {value}"
if etype == "device_event":
fields = ", ".join(f"{k}={v}" for k, v in event.get("fields", {}).items())
return f"[{ts}] {dev} | Event: {event.get('event_name', '')}({fields})"
if etype == "command":
return f"[{ts}] {dev} | Command: {event.get('command', '')}"
return f"[{ts}] {dev} | {etype}"
def _filter_query_window(events: List[Dict], query: str) -> List[Dict]:
match = re.search(r"(?:过去|杩囧幓)\s*(\d+)\s*(?:小时|灏忔椂)", query)
if not match or not events:
return events
try:
last_dt = datetime.fromisoformat(events[-1]["timestamp"])
except Exception:
return events
cutoff = last_dt - timedelta(hours=int(match.group(1)))
cutoff_ts = cutoff.isoformat()
return [evt for evt in events if evt.get("timestamp", "") >= cutoff_ts]
def build_layout_summary(home_state: Dict) -> str:
by_room = defaultdict(list)
for device_id, info in home_state.get("devices", {}).items():
by_room[info.get("room_id", "unknown")].append(
f"{info.get('display_name', device_id)}<{info.get('device_type', '?')}>"
)
lines = [f"Layout: {home_state.get('layout_name', 'unknown')}"]
for room in sorted(by_room):
lines.append(f"- {room}: {', '.join(sorted(by_room[room]))}")
return "\n".join(lines)
def build_protocol_notes() -> List[str]:
return [
"These logs follow Matter-style smart-home event semantics.",
"TemperatureMeasurement.MeasuredValue is represented in centi-degrees Celsius in this benchmark: 2466 means 24.66 C, not 2466 C.",
"BooleanState and other protocol booleans should be interpreted using transitions and surrounding events, not naive natural-language assumptions from a single isolated value.",
"OccupancySensing.Occupancy uses 0=unoccupied and 1=occupied.",
"DoorLock.LockState is an enumerated state; formatted logs may show both the label and the raw code.",
"A device-fault conclusion requires direct fault evidence such as stuck values, repeated non-recovery, explicit alarm/fault events, or impossible state transitions. Scaled temperature values alone are not enough.",
]
def chunk_events(events: List[Dict], room_map: Dict[str, str], chunk_size: int = 80) -> List[Dict]:
chunks = []
for start in range(0, len(events), chunk_size):
block = events[start : start + chunk_size]
rooms = Counter()
devices = Counter()
alarmish = 0
for evt in block:
device_id = evt.get("device_id", "?")
devices[device_id] += 1
if any(token in _format_event_line(evt) for token in ["Alarm", "Fault", "Error", "LockState", "Occupancy", "Battery"]):
alarmish += 1
rooms[room_map.get(device_id, "unknown")] += 1
chunks.append(
{
"chunk_id": f"C{len(chunks):02d}",
"start_ts": block[0].get("timestamp", "?"),
"end_ts": block[-1].get("timestamp", "?"),
"event_count": len(block),
"rooms": dict(rooms),
"top_devices": [k for k, _ in devices.most_common(6)],
"alarmish_count": alarmish,
"preview_lines": [_format_event_line(evt) for evt in block[:12]],
"raw_events": block,
}
)
return chunks
def compute_structured_signals(events: List[Dict], room_map: Dict[str, str]) -> Dict:
room_activity = Counter()
attr_counter = Counter()
lock_state_flips = 0
occupancy_events = 0
temperature_series = defaultdict(list)
for evt in events:
room_activity[room_map.get(evt.get("device_id", ""), "unknown")] += 1
if evt.get("event_type") != "attribute_change":
continue
attr = evt.get("attribute", "")
attr_counter[attr] += 1
if attr == "LockState":
lock_state_flips += 1
if attr == "Occupancy":
occupancy_events += 1
if attr == "MeasuredValue" and "Temperature" in evt.get("cluster", ""):
value = evt.get("value")
if isinstance(value, (int, float)):
temperature_series[evt["device_id"]].append(value)
temp_patterns = []
for device_id, values in temperature_series.items():
if len(values) < 12:
continue
span_raw = max(values) - min(values)
span_c = span_raw / 100 if abs(span_raw) >= 100 else span_raw
monotonic_up = sum(1 for a, b in zip(values, values[1:]) if b >= a)
monotonic_down = sum(1 for a, b in zip(values, values[1:]) if b <= a)
if span_c <= 0.2:
temp_patterns.append(f"{device_id}: near-flat temperature readings across {len(values)} samples, span={span_c:.2f} C")
elif max(monotonic_up, monotonic_down) / max(1, len(values) - 1) >= 0.85:
direction = "upward" if monotonic_up >= monotonic_down else "downward"
temp_patterns.append(f"{device_id}: mostly {direction} temperature trend, span={span_c:.2f} C")
return {
"room_activity": dict(room_activity),
"attribute_frequency": dict(attr_counter.most_common(12)),
"lock_state_flips": lock_state_flips,
"occupancy_events": occupancy_events,
"temperature_patterns": temp_patterns,
"value_format_hints": {
"temperature": "MeasuredValue is shown as Celsius with the raw integer preserved in parentheses.",
"occupancy": "0=unoccupied, 1=occupied",
"lock_state": "enumerated lock state rendered as label + raw code",
},
}
def build_case_material(episode: Dict, chunk_size: int = 80) -> Dict:
events = _filter_query_window(episode.get("event_sequence", []), episode.get("query", ""))
home_state = episode.get("home_state", {})
room_map = _device_room_map(home_state)
chunks = chunk_events(events, room_map=room_map, chunk_size=chunk_size)
signals = compute_structured_signals(events, room_map=room_map)
chunk_index = []
for chunk in chunks:
chunk_index.append(
{
"chunk_id": chunk["chunk_id"],
"start_ts": chunk["start_ts"],
"end_ts": chunk["end_ts"],
"event_count": chunk["event_count"],
"rooms": chunk["rooms"],
"top_devices": chunk["top_devices"],
"alarmish_count": chunk["alarmish_count"],
"preview_lines": chunk["preview_lines"],
}
)
return {
"episode_id": episode.get("episode_id", ""),
"query": episode.get("query", ""),
"layout_summary": build_layout_summary(home_state),
"protocol_notes": build_protocol_notes(),
"chunk_index": chunk_index,
"chunks": chunks,
"signals": signals,
"event_count": len(events),
}
def materialize_chunks(chunks: List[Dict], chunk_ids: List[str], max_chunks: int = 5) -> str:
selected = [chunk for chunk in chunks if chunk["chunk_id"] in set(chunk_ids)]
if not selected:
selected = chunks[:max_chunks]
selected = selected[:max_chunks]
parts = []
for chunk in selected:
parts.append(f"## {chunk['chunk_id']} {chunk['start_ts']} -> {chunk['end_ts']}")
for evt in chunk["raw_events"]:
parts.append(_format_event_line(evt))
return "\n".join(parts)