252 lines
11 KiB
Python
252 lines
11 KiB
Python
import json
|
|
from typing import Dict
|
|
|
|
|
|
THREAT_TYPES = (
|
|
"intrusion / tailgating / credential_theft / fire_risk / unattended_cooking / "
|
|
"carbon_monoxide / sensor_stuck / sensor_drift / sensor_malfunction / actuator_stuck / "
|
|
"lock_malfunction / safety_device_failure / water_leak / possible_fall / "
|
|
"abnormal_inactivity / health_concern / child_safety / behavioral_anomaly / none"
|
|
)
|
|
|
|
|
|
def _protocol_notes_block(material: Dict) -> str:
|
|
notes = material.get("protocol_notes", [])
|
|
return "\n".join(f"- {note}" for note in notes)
|
|
|
|
|
|
def build_triage_prompt(material: Dict) -> Dict[str, str]:
|
|
system = (
|
|
"You are the triage coordinator for smart-home log analysis. "
|
|
"Infer the task from the query without hidden benchmark labels. "
|
|
"Anchor to the query target first, then choose a small number of chunks for inspection."
|
|
)
|
|
user = f"""## Query
|
|
{material['query']}
|
|
|
|
## Matter Notes
|
|
{_protocol_notes_block(material)}
|
|
|
|
## Layout
|
|
{material['layout_summary']}
|
|
|
|
## Deterministic Signals
|
|
{json.dumps(material['signals'], ensure_ascii=False, indent=2)}
|
|
|
|
## Chunk Index
|
|
{json.dumps(material['chunk_index'], ensure_ascii=False, indent=2)}
|
|
|
|
Rules:
|
|
- Choose exactly one `primary_task_profile`.
|
|
- Use `secondary_task_profile` only if it materially helps.
|
|
- If the query asks whether a specific device/room works normally, prefer `device-health`.
|
|
- If the query asks for an action or response plan, prefer `emergency-response`.
|
|
- Select only 1-4 `focus_chunk_ids` in round 1.
|
|
- For `composite-safety` or `emergency-response`, do not narrow the case to a single suspicious device too early. Prefer chunk selection that covers:
|
|
- the possible hazard trigger,
|
|
- nearby human/activity context,
|
|
- and any recovery or consequence evidence.
|
|
- Missing logs for a device are not themselves evidence of failure.
|
|
|
|
Return JSON only:
|
|
{{
|
|
"primary_task_profile": "device-health | single-event-safety | behavior-sequence | composite-safety | emergency-response",
|
|
"secondary_task_profile": "none | device-health | single-event-safety | behavior-sequence | composite-safety | emergency-response",
|
|
"query_anchor": {{
|
|
"target_rooms": ["..."],
|
|
"target_devices": ["..."],
|
|
"target_question": "..."
|
|
}},
|
|
"focus_rooms": ["..."],
|
|
"focus_devices": ["..."],
|
|
"focus_chunk_ids": ["C00", "C01"],
|
|
"suspected_patterns": ["..."],
|
|
"why_these_chunks": ["..."]
|
|
}}"""
|
|
return {"system": system, "user": user}
|
|
|
|
|
|
def build_investigator_prompt(
|
|
material: Dict,
|
|
triage_text: str,
|
|
focused_chunks: str,
|
|
supervisor_text: str = "",
|
|
round_index: int = 1,
|
|
) -> Dict[str, str]:
|
|
system = (
|
|
"You are the investigator. Work only from the query, Matter notes, structured signals, and focused raw chunks. "
|
|
"Construct competing normal and anomaly hypotheses with explicit evidence. "
|
|
"Distinguish device faults from behavior or safety anomalies carefully."
|
|
)
|
|
supervisor_block = ""
|
|
if supervisor_text:
|
|
supervisor_block = f"""
|
|
## Supervisor Feedback
|
|
{supervisor_text}
|
|
|
|
Apply the supervisor feedback explicitly in this round.
|
|
"""
|
|
user = f"""## Query
|
|
{material['query']}
|
|
|
|
## Matter Notes
|
|
{_protocol_notes_block(material)}
|
|
|
|
## Structured Signals
|
|
{json.dumps(material['signals'], ensure_ascii=False, indent=2)}
|
|
|
|
## Triage Output
|
|
{triage_text}
|
|
{supervisor_block}
|
|
## Focused Chunks
|
|
{focused_chunks}
|
|
|
|
Round: {round_index}
|
|
|
|
Rules:
|
|
- A raw value like `2466` for `TemperatureMeasurement.MeasuredValue` means `24.66 C`, not `2466 C`.
|
|
- Do not infer `sensor_malfunction` or `sensor_drift` from scaled temperature values alone.
|
|
- Device-fault hypotheses require direct evidence such as explicit alarm/fault events, repeated non-recovery, stuck values, impossible state transitions, or actuator commands failing to take effect.
|
|
- Behavior, intrusion, safety, and emergency hypotheses may be supported by coherent temporal patterns, cross-device inconsistencies, absence where an event should appear, or risky multi-step sequences even when no explicit fault code exists.
|
|
- If the query asks for abnormal behavior or safety risk, do not discard the anomaly path just because the system eventually recovered.
|
|
- Keep the normal hypothesis competitive, but also keep at least one anomaly hypothesis whenever the logs contain a plausible risk pattern that still needs explanation.
|
|
- A single transient `None`, one brief telemetry dropout, or the mere absence of logs for a device is not enough to claim `sensor_malfunction`, `safety_device_failure`, or a monitoring blind spot.
|
|
- For `sensor_malfunction` / `sensor_stuck` / `sensor_drift`, require persistence, repetition, failed recovery, or direct contradiction with other signals.
|
|
- For `unattended_cooking` / `fire_risk`, require not only heat/cook activity but also evidence of missing supervision, dangerous duration, failed mitigation, or hazardous escalation.
|
|
- For `intrusion` / `tailgating` / `credential_theft`, occupancy alone is insufficient. Prefer corroboration from access-control inconsistencies, motion progression, lock/contact conflicts, or impossible entry timing.
|
|
- For `child_safety`, `possible_fall`, and `abnormal_inactivity`, require subject-specific risky context, not just generic household quietness or sparse logs.
|
|
- For `composite-safety` or `emergency-response`, explicitly check whether your story is overly dependent on a single device or a single chunk. If yes, weaken the anomaly claim.
|
|
|
|
Return JSON only:
|
|
{{
|
|
"normal_hypotheses": [
|
|
{{"id": "N1", "description": "...", "evidence": ["..."], "weaknesses": ["..."]}}
|
|
],
|
|
"anomaly_hypotheses": [
|
|
{{"id": "A1", "description": "...", "threat_type": "{THREAT_TYPES}", "evidence": ["..."], "weaknesses": ["..."]}}
|
|
],
|
|
"most_discriminative_evidence": ["..."],
|
|
"missing_information": ["..."]
|
|
}}"""
|
|
return {"system": system, "user": user}
|
|
|
|
|
|
def build_supervisor_prompt(
|
|
material: Dict,
|
|
triage_text: str,
|
|
investigator_text: str,
|
|
focused_chunks: str,
|
|
round_index: int = 1,
|
|
) -> Dict[str, str]:
|
|
system = (
|
|
"You are the supervisor. Check whether the current evidence collection is on-topic and sufficient. "
|
|
"Flag protocol-format misunderstandings, false-alarm risk, and missing checks."
|
|
)
|
|
user = f"""## Query
|
|
{material['query']}
|
|
|
|
## Matter Notes
|
|
{_protocol_notes_block(material)}
|
|
|
|
## Triage Output
|
|
{triage_text}
|
|
|
|
## Investigator Output
|
|
{investigator_text}
|
|
|
|
## Focused Chunks
|
|
{focused_chunks}
|
|
|
|
## Available Chunk IDs
|
|
{json.dumps([chunk['chunk_id'] for chunk in material['chunk_index']], ensure_ascii=False)}
|
|
|
|
Round: {round_index}
|
|
|
|
Rules:
|
|
- If the investigator seems to misread Matter-scaled values as literal impossible temperatures, mark false-alarm risk high.
|
|
- If the analysis drifted away from the query target, mark `on_topic=false`.
|
|
- Distinguish two standards of evidence:
|
|
- device-fault labels need direct fault evidence;
|
|
- behavior/safety/emergency labels may rely on coherent temporal and cross-device evidence.
|
|
- If the evidence is still ambiguous after this round, do not automatically convert that into a normal verdict.
|
|
- Use `recommended_action=refine_investigation` when another round could realistically help.
|
|
- Use `recommended_action=abstain` only when the current anomaly story is weak, underspecified, or mostly speculative after review.
|
|
- Mark false-alarm risk `high` if the anomaly story depends mainly on:
|
|
- one transient `None` or brief data dropout,
|
|
- missing logs from a device,
|
|
- one isolated suspicious reading without consequence,
|
|
- or a broad safety conclusion built from a single local device issue.
|
|
- For `composite-safety` and `emergency-response`, only mark `evidence_sufficient=true` if the analysis covers both the local trigger and the wider human/safety context.
|
|
- If the conclusion is effectively "a device was not logged, therefore the home is unsafe", treat that as weak evidence unless there is corroboration.
|
|
|
|
Return JSON only:
|
|
{{
|
|
"on_topic": true/false,
|
|
"evidence_sufficient": true/false,
|
|
"risk_of_false_alarm": "low | medium | high",
|
|
"recommended_action": "allow_final_verdict | refine_investigation | abstain",
|
|
"needs_more_chunks": ["C03", "C05"],
|
|
"missing_checks": ["..."],
|
|
"supervisor_notes": ["..."]
|
|
}}"""
|
|
return {"system": system, "user": user}
|
|
|
|
|
|
def build_verifier_prompt(
|
|
material: Dict,
|
|
triage_text: str,
|
|
investigator_text: str,
|
|
supervisor_text: str,
|
|
focused_chunks: str,
|
|
) -> Dict[str, str]:
|
|
system = (
|
|
"You are the final verifier. Make a precise final decision from the query, Matter notes, evidence, and competing hypotheses. "
|
|
"You must obey the supervisor gate."
|
|
)
|
|
user = f"""## Query
|
|
{material['query']}
|
|
|
|
## Matter Notes
|
|
{_protocol_notes_block(material)}
|
|
|
|
## Triage
|
|
{triage_text}
|
|
|
|
## Investigator
|
|
{investigator_text}
|
|
|
|
## Supervisor
|
|
{supervisor_text}
|
|
|
|
## Focused Chunks
|
|
{focused_chunks}
|
|
|
|
Rules:
|
|
- If the supervisor recommends `abstain`, you must return `is_anomaly=false`, `threat_type=none`, and `confidence=low`.
|
|
- Do not equate `evidence_sufficient=false` with `no anomaly`. It means the case may still require a cautious low/medium-confidence decision.
|
|
- Device-fault labels require direct device-fault evidence; scaled Matter temperature values alone are insufficient.
|
|
- Behavior, intrusion, safety, and emergency anomalies may be concluded from coherent temporal, causal, or cross-device evidence even when no explicit fault code exists.
|
|
- If the anomaly pattern is strong but type selection is uncertain, prefer the best-supported non-device-fault label over collapsing to `none`.
|
|
- Do not dismiss an anomaly just because the system later recovered, if the logged sequence itself reflects a real unsafe or abnormal event.
|
|
- If the investigator presents a plausible anomaly hypothesis and the supervisor does not abstain, you must explicitly refute that anomaly in your reasoning before returning `none`.
|
|
- Use `none` when the evidence supports a normal explanation better than an anomaly explanation.
|
|
- Never escalate to `sensor_malfunction`, `safety_device_failure`, or `sensor_stuck` from a single transient dropout or from missing logs alone.
|
|
- For `composite-safety` and `emergency-response`, check scope before finalizing:
|
|
- Did you assess the broader human/safety context?
|
|
- Or did you overfit to one device/chunk?
|
|
If you overfit to one local signal, lower confidence or reject the anomaly claim.
|
|
- For `unattended_cooking` / `fire_risk`, require a hazardous sequence, not merely cooking plus incomplete telemetry.
|
|
- For `intrusion` / `tailgating`, require access-path evidence, not just unusual occupancy or periodic motion.
|
|
|
|
Return JSON only:
|
|
{{
|
|
"is_anomaly": true/false,
|
|
"confidence": "high/medium/low",
|
|
"threat_type": "{THREAT_TYPES}",
|
|
"threat_description": "one-sentence conclusion",
|
|
"reasoning": ["step 1", "step 2", "step 3"],
|
|
"key_evidence": ["evidence 1", "evidence 2"],
|
|
"recommended_actions": ["action 1", "action 2"]
|
|
}}"""
|
|
return {"system": system, "user": user}
|