#!/usr/bin/env python3 """ JARVIS HA Poller — pulls entity states from Home Assistant REST API and pushes them to JARVIS as a homeassistant-type agent. Runs on VM211 as a systemd service (jarvis-ha-poller). Config: /etc/jarvis-agent/ha-poller.json """ import json import os import socket import sys import time import urllib.request import urllib.error import ssl from datetime import datetime, timezone from pathlib import Path CONFIG_PATH = "/etc/jarvis-agent/ha-poller.json" STATE_PATH = "/var/lib/jarvis-agent/ha-poller-state.json" AGENT_VERSION = "1.0" AGENT_ID = "homeassistant_ha" HOSTNAME = "homeassistant" # Domains to skip — don't send to JARVIS (saves DB space, keeps UI clean) SKIP_DOMAINS = { 'sensor', 'binary_sensor', 'button', 'update', 'select', 'number', 'device_tracker', 'event', 'image', 'person', 'zone', 'tts', 'conversation', 'assist_satellite', 'input_button', 'media_player', 'scene', 'water_heater', 'alarm_control_panel', 'automation', 'script', 'calendar', 'notify', 'weather', 'sun', 'persistent_notification', 'tag', 'system_health', 'timer', 'counter', 'camera', 'siren', 'remote', 'todo', 'lawn_mower', } def log(msg: str): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{ts}] {msg}", flush=True) def load_config() -> dict: if not os.path.exists(CONFIG_PATH): print(f"[ERROR] Config not found at {CONFIG_PATH}", flush=True) sys.exit(1) with open(CONFIG_PATH) as f: return json.load(f) def load_state() -> dict: if os.path.exists(STATE_PATH): with open(STATE_PATH) as f: return json.load(f) return {} def save_state(state: dict): Path(STATE_PATH).parent.mkdir(parents=True, exist_ok=True) with open(STATE_PATH, "w") as f: json.dump(state, f, indent=2) def _ssl_ctx(verify: bool): if not verify: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx return None def jarvis_post(url: str, payload: dict, headers: dict, ssl_verify: bool, timeout: int = 15) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") for k, v in headers.items(): req.add_header(k, v) try: ctx = _ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"} except Exception as e: return {"error": str(e)} def ha_get(url: str, token: str, timeout: int = 15) -> dict | list | None: req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception as e: log(f"HA API error: {e}") return None def register(cfg: dict, state: dict) -> str: jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", False)) reg_key = cfg["registration_key"] log(f"Registering HA poller with JARVIS at {jarvis_url}...") result = jarvis_post( f"{jarvis_url}/api/agent/register", { "hostname": HOSTNAME, "version": AGENT_VERSION, "agent_type": "homeassistant", "ip_address": cfg.get("ha_url", "").split("//")[-1].split(":")[0], "capabilities": ["ha_entities", "ha_state"], "agent_id": AGENT_ID, }, {"X-Registration-Key": reg_key}, ssl_verify, ) if "error" in result: log(f"Registration failed: {result['error']}") return "" api_key = result.get("api_key", "") if api_key: state["api_key"] = api_key state["agent_id"] = AGENT_ID save_state(state) log(f"Registered. agent_id={AGENT_ID}") return api_key def push_entities(cfg: dict, api_key: str, entities: list) -> bool: jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", False)) headers = {"X-Agent-Key": api_key} # Send in batches of 200 batch_size = 200 total = len(entities) ok = True for i in range(0, total, batch_size): batch = entities[i:i+batch_size] result = jarvis_post( f"{jarvis_url}/api/agent/ha_state", {"entities": batch}, headers, ssl_verify, ) if "error" in result: log(f"Push batch {i//batch_size+1} failed: {result['error']}") ok = False return ok def heartbeat(cfg: dict, api_key: str) -> bool: jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", False)) result = jarvis_post( f"{jarvis_url}/api/agent/heartbeat", {"version": AGENT_VERSION}, {"X-Agent-Key": api_key}, ssl_verify, timeout=10, ) return "error" not in result def fetch_ha_states(cfg: dict) -> list: ha_url = cfg["ha_url"].rstrip("/") token = cfg["ha_token"] states = ha_get(f"{ha_url}/api/states", token) if not states or not isinstance(states, list): return [] entities = [] for s in states: entity_id = s.get("entity_id", "") domain = entity_id.split(".")[0] if "." in entity_id else "" if domain in SKIP_DOMAINS: continue attrs = s.get("attributes", {}) # Convert ISO 8601 (e.g. "2026-06-28T21:26:01.922366+00:00") to MySQL datetime lc = s.get("last_changed", "") try: dt = datetime.fromisoformat(lc.replace("Z", "+00:00")) lc = dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") except Exception: lc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") entities.append({ "entity_id": entity_id, "name": attrs.get("friendly_name") or entity_id, "state": s.get("state", ""), "attributes": attrs, "last_changed": lc, }) return entities def main(): cfg = load_config() state = load_state() poll_interval = int(cfg.get("poll_interval", 30)) heartbeat_every = int(cfg.get("heartbeat_every", 10)) api_key = state.get("api_key", "") while not api_key: api_key = register(cfg, state) if not api_key: log("Could not register. Retrying in 60s...") time.sleep(60) headers = {"X-Agent-Key": api_key} last_push = 0 log(f"HA Poller v{AGENT_VERSION} running. Polling HA every {poll_interval}s, heartbeat every {heartbeat_every}s.") while True: now = time.time() # Heartbeat if not heartbeat(cfg, api_key): log("Heartbeat failed (401?) — re-registering...") state.clear() save_state(state) api_key = register(cfg, state) if not api_key: time.sleep(60) continue # Push entity states every poll_interval if now - last_push >= poll_interval: entities = fetch_ha_states(cfg) if entities: ok = push_entities(cfg, api_key, entities) if ok: log(f"Pushed {len(entities)} HA entities to JARVIS.") last_push = now else: log("No HA entities fetched (HA down or token invalid?)") last_push = now time.sleep(heartbeat_every) if __name__ == "__main__": main()