87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
import json
|
|
from typing import Dict
|
|
|
|
|
|
THREAT_TYPES = (
|
|
"intrusion / tailgating / credential_theft / fire_risk / unattended_cooking / "
|
|
"carbon_monoxide / sensor_stuck / sensor_drift / sensor_malfunction / actuator_stuck / "
|
|
"lock_malfunction / safety_device_failure / water_leak / possible_fall / "
|
|
"abnormal_inactivity / health_concern / child_safety / behavioral_anomaly / none"
|
|
)
|
|
|
|
|
|
STAGE1_SYSTEM = """You are an evidence analyst for smart-home anomaly reasoning.
|
|
You do not make the final decision yet. You compress long IoT logs into competing hypotheses,
|
|
supporting evidence, and contradictions."""
|
|
|
|
|
|
STAGE2_SYSTEM = """You are the final verifier for smart-home anomaly reasoning.
|
|
Use the evidence packet and the hypothesis analysis to make a conservative but precise decision.
|
|
Avoid false alarms when the evidence also supports a normal explanation."""
|
|
|
|
|
|
def build_stage1_prompt(packet: Dict) -> Dict[str, str]:
|
|
user = f"""## Task
|
|
Analyze the following evidence packet for a smart-home anomaly query.
|
|
|
|
## Query
|
|
{packet['query']}
|
|
|
|
## Layout
|
|
{packet['layout_summary']}
|
|
|
|
## Evidence Packet
|
|
{json.dumps({
|
|
'sq_type': packet['sq_type'],
|
|
'event_count': packet['event_count'],
|
|
'salient_event_count': packet['salient_event_count'],
|
|
'focus_event_count': packet['focus_event_count'],
|
|
'room_activity': packet['room_activity'],
|
|
'temperature_patterns': packet['temperature_patterns'],
|
|
'suspicious_signals': packet['suspicious_signals'],
|
|
}, ensure_ascii=False, indent=2)}
|
|
|
|
Return JSON only:
|
|
{{
|
|
"candidate_hypotheses": [
|
|
{{"id": "H1", "type": "normal|anomaly", "description": "...", "supported_by": ["..."], "contradicted_by": ["..."]}}
|
|
],
|
|
"most_concerning_signals": ["..."],
|
|
"what_to_verify_in_raw_log": ["..."],
|
|
"provisional_risk": "none|low|medium|high|critical"
|
|
}}"""
|
|
return {"system": STAGE1_SYSTEM, "user": user}
|
|
|
|
|
|
def build_stage2_prompt(packet: Dict, stage1_text: str) -> Dict[str, str]:
|
|
user = f"""## Task
|
|
Make the final anomaly decision from compressed evidence plus a focused raw-log excerpt.
|
|
|
|
## Query
|
|
{packet['query']}
|
|
|
|
## Stage-1 Analysis
|
|
{stage1_text}
|
|
|
|
## Focused Raw Log
|
|
{packet['focus_log_text']}
|
|
|
|
## Output Format
|
|
Return JSON only:
|
|
{{
|
|
"is_anomaly": true/false,
|
|
"confidence": "high/medium/low",
|
|
"threat_type": "{THREAT_TYPES}",
|
|
"threat_description": "one-sentence conclusion",
|
|
"reasoning": ["short step 1", "short step 2", "short step 3"],
|
|
"key_evidence": ["evidence 1", "evidence 2"],
|
|
"recommended_actions": ["action 1", "action 2"]
|
|
}}
|
|
|
|
Rules:
|
|
- Prefer precise evidence over speculation.
|
|
- If the evidence is ambiguous between a near-miss false positive and a real anomaly, be explicit.
|
|
- Do not mention unavailable metadata or hidden labels."""
|
|
return {"system": STAGE2_SYSTEM, "user": user}
|
|
|