#!/usr/bin/env python3 """ JARVIS Agent — lightweight system monitor for Linux machines. Registers with JARVIS, reports metrics, and executes commands. Install: sudo bash /opt/jarvis-agent/install.sh Config: /etc/jarvis-agent/config.json Logs: journalctl -u jarvis-agent -f """ import json import os import platform import socket import subprocess import sys import time import urllib.request import urllib.error import uuid from datetime import datetime from pathlib import Path CONFIG_PATH = "/etc/jarvis-agent/config.json" STATE_PATH = "/var/lib/jarvis-agent/state.json" AGENT_VERSION = "3.1" # ── Config helpers ──────────────────────────────────────────────────────────── def load_config() -> dict: if not os.path.exists(CONFIG_PATH): print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.", flush=True) sys.exit(1) with open(CONFIG_PATH) as f: return json.load(f) def load_state() -> dict: if os.path.exists(STATE_PATH): with open(STATE_PATH) as f: return json.load(f) return {} def save_state(state: dict): Path(STATE_PATH).parent.mkdir(parents=True, exist_ok=True) with open(STATE_PATH, "w") as f: json.dump(state, f, indent=2) # ── HTTP helpers ────────────────────────────────────────────────────────────── import ssl as _ssl def _make_ssl_ctx(verify: bool) -> _ssl.SSLContext | None: if not verify: ctx = _ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = _ssl.CERT_NONE return ctx return None _host_header: str = "" # set from config at startup def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15, ssl_verify: bool = True) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"} except Exception as e: return {"error": str(e)} def api_get(url: str, headers: dict = {}, timeout: int = 10, ssl_verify: bool = True) -> dict: req = urllib.request.Request(url) req.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except Exception as e: return {"error": str(e)} # ── Registration ────────────────────────────────────────────────────────────── def get_local_ip() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "unknown" def detect_capabilities(cfg: dict) -> list: caps = ["metrics", "commands"] # Check for Proxmox if os.path.exists("/usr/bin/pvesh") or os.path.exists("/usr/sbin/pveversion"): caps.append("proxmox") # Check for Docker if os.path.exists("/usr/bin/docker") or os.path.exists("/usr/local/bin/docker"): caps.append("docker") # Check for Ollama if os.path.exists("/usr/local/bin/ollama") or os.path.exists("/usr/bin/ollama"): caps.append("ollama") # Check for Home Assistant if os.path.exists("/etc/homeassistant") or os.path.exists("/config/configuration.yaml"): caps.append("homeassistant") return caps def register(cfg: dict, state: dict) -> str: """Register with JARVIS. Returns api_key.""" hostname = cfg.get("hostname", socket.gethostname()) agent_type = cfg.get("agent_type", "linux") ip = get_local_ip() capabilities = detect_capabilities(cfg) agent_id = cfg.get("agent_id", f"{hostname}_{socket.gethostname()[:8]}") ssl_verify = bool(cfg.get("ssl_verify", True)) print(f"[JARVIS] Registering as '{agent_id}' ({agent_type}) from {ip}...", flush=True) result = api_post( f"{cfg['jarvis_url']}/api/agent/register", { "hostname": hostname, "version": AGENT_VERSION, "agent_type": agent_type, "ip_address": ip, "capabilities": capabilities, "agent_id": agent_id, }, headers={"X-Registration-Key": cfg["registration_key"]}, ssl_verify=ssl_verify, ) if "error" in result: print(f"[ERROR] Registration failed: {result['error']}", flush=True) return "" api_key = result.get("api_key", "") if api_key: state["api_key"] = api_key state["agent_id"] = result.get("agent_id", agent_id) save_state(state) print(f"[JARVIS] Registered. agent_id={state['agent_id']}", flush=True) return api_key # ── Metrics collection ──────────────────────────────────────────────────────── def read_cpu_percent() -> float: try: with open("/proc/stat") as f: line = f.readline() fields = list(map(int, line.split()[1:])) idle = fields[3] total = sum(fields) return round((1 - idle / total) * 100, 1) if total else 0.0 except Exception: return 0.0 _last_cpu = None def get_cpu_percent() -> float: global _last_cpu try: with open("/proc/stat") as f: line = f.readline() fields = list(map(int, line.split()[1:])) idle = fields[3] + fields[4] # idle + iowait total = sum(fields) if _last_cpu: d_idle = idle - _last_cpu[0] d_total = total - _last_cpu[1] result = round((1 - d_idle / d_total) * 100, 1) if d_total else 0.0 else: result = 0.0 _last_cpu = (idle, total) return result except Exception: return 0.0 def get_memory() -> dict: mem = {} try: with open("/proc/meminfo") as f: for line in f: parts = line.split() if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:", "Buffers:", "Cached:"): mem[parts[0].rstrip(":")] = int(parts[1]) total = mem.get("MemTotal", 0) available = mem.get("MemAvailable", 0) used = total - available return { "total_mb": round(total / 1024, 1), "used_mb": round(used / 1024, 1), "free_mb": round(available / 1024, 1), "percent": round(used / total * 100, 1) if total else 0, } except Exception: return {} def get_disk() -> list: disks = [] try: result = subprocess.run(["df", "-h", "--output=source,fstype,size,used,avail,pcent,target"], capture_output=True, text=True, timeout=5) lines = result.stdout.strip().split("\n")[1:] for line in lines: parts = line.split() if len(parts) >= 7: mount = parts[6] if not any(mount.startswith(x) for x in ["/sys", "/proc", "/dev/pts", "/run", "/snap"]): disks.append({ "mount": mount, "size": parts[2], "used": parts[3], "avail": parts[4], "percent": parts[5].rstrip("%"), }) except Exception: pass return disks def get_uptime() -> dict: try: with open("/proc/uptime") as f: secs = float(f.read().split()[0]) days = int(secs // 86400) hours = int((secs % 86400) // 3600) minutes = int((secs % 3600) // 60) return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes, "human": f"{days}d {hours}h {minutes}m"} except Exception: return {} def get_services(cfg: dict) -> list: watch = cfg.get("watch_services", ["ollama", "homeassistant", "mysql", "nginx", "apache2"]) statuses = [] for svc in watch: try: r = subprocess.run(["systemctl", "is-active", svc], capture_output=True, text=True, timeout=3) statuses.append({"service": svc, "status": r.stdout.strip()}) except Exception: statuses.append({"service": svc, "status": "unknown"}) return statuses def get_load() -> list: try: with open("/proc/loadavg") as f: parts = f.read().split() return [float(parts[0]), float(parts[1]), float(parts[2])] except Exception: return [0, 0, 0] def get_nordvpn_status() -> dict | None: """Check nordlynx WireGuard interface. Returns None if nordlynx not present on this host.""" try: r = subprocess.run(["ip", "link", "show", "nordlynx"], capture_output=True, text=True, timeout=3) if r.returncode != 0: return None active = "UP,LOWER_UP" in r.stdout or "state UP" in r.stdout return {"active": active, "interface": "nordlynx"} except Exception: return None def collect_metrics(cfg: dict) -> dict: # First reading for CPU delta get_cpu_percent() time.sleep(1) metrics = { "hostname": cfg.get("hostname", socket.gethostname()), "cpu_percent": get_cpu_percent(), "memory": get_memory(), "disk": get_disk(), "uptime": get_uptime(), "load": get_load(), "services": get_services(cfg), "platform": platform.system(), "timestamp": datetime.utcnow().isoformat() + "Z", } nordvpn = get_nordvpn_status() if nordvpn is not None: metrics["nordvpn"] = nordvpn return metrics # ── Proxmox metrics ─────────────────────────────────────────────────────────── def collect_proxmox_metrics(cfg: dict) -> dict | None: try: result = subprocess.run( ["pvesh", "get", "/nodes/pve/status", "--output-format", "json"], capture_output=True, text=True, timeout=10 ) node_status = json.loads(result.stdout) vms_result = subprocess.run( ["pvesh", "get", "/nodes/pve/qemu", "--output-format", "json"], capture_output=True, text=True, timeout=10 ) vms = json.loads(vms_result.stdout) return {"node": node_status, "vms": vms} except Exception as e: return {"error": str(e)} # ── Command execution ───────────────────────────────────────────────────────── def execute_command(cmd: dict) -> dict: cmd_type = cmd.get("command_type", "") cmd_data = cmd.get("command_data", {}) try: if cmd_type == "restart_service": svc = cmd_data.get("service", "") if not svc or "/" in svc: return {"success": False, "error": "Invalid service name"} r = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True, timeout=30) return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr} elif cmd_type == "get_logs": svc = cmd_data.get("service", "") lines = min(int(cmd_data.get("lines", 50)), 200) if not svc or "/" in svc: return {"success": False, "error": "Invalid service name"} r = subprocess.run(["journalctl", "-u", svc, "-n", str(lines), "--no-pager"], capture_output=True, text=True, timeout=15) return {"success": True, "output": r.stdout} elif cmd_type == "ping": host = cmd_data.get("host", "8.8.8.8") r = subprocess.run(["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=15) return {"success": r.returncode == 0, "output": r.stdout} elif cmd_type == "update": updated = self_update(cfg) return {"success": True, "updated": updated} elif cmd_type == "shell": # Only allow if explicitly enabled in config if not cmd_data.get("allowed", False): return {"success": False, "error": "Shell commands not enabled"} cmd_str = cmd_data.get("command", "") r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30) return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]} else: return {"success": False, "error": f"Unknown command type: {cmd_type}"} except subprocess.TimeoutExpired: return {"success": False, "error": "Command timed out"} except Exception as e: return {"success": False, "error": str(e)} # ── Main loop ───────────────────────────────────────────────────────────────── def main(): global _host_header cfg = load_config() state = load_state() jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", True)) _host_header = cfg.get("host_header", "") poll_interval = int(cfg.get("poll_interval", 30)) heartbeat_every = int(cfg.get("heartbeat_every", 10)) # Register if no API key yet api_key = state.get("api_key", "") if not api_key: api_key = register(cfg, state) if not api_key: print("[ERROR] Could not register with JARVIS. Retrying in 60s...", flush=True) time.sleep(60) main() return headers = {"X-Agent-Key": api_key} last_metrics = 0 last_update_chk = 0 update_interval = int(cfg.get("update_check_hours", 24)) * 3600 tick = 0 print(f"[JARVIS] Agent v{AGENT_VERSION} running. Polling {jarvis_url} every {heartbeat_every}s.", flush=True) while True: tick += 1 now = time.time() try: # Heartbeat + get commands hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {}, headers, ssl_verify=ssl_verify) if "error" in hb: print(f"[WARN] Heartbeat failed: {hb['error']}", flush=True) else: commands = hb.get("commands", []) for cmd in commands: print(f"[CMD] Executing: {cmd['command_type']}", flush=True) result = execute_command(cmd) api_post(f"{jarvis_url}/api/agent/command_result", {"command_id": cmd["id"], "success": result.get("success", False), "result": result}, headers, ssl_verify=ssl_verify) # Self-update check (every update_interval seconds, default 24h) if now - last_update_chk >= update_interval: last_update_chk = now self_update(cfg) # restarts process if update found # Push metrics every poll_interval seconds if now - last_metrics >= poll_interval: metrics = collect_metrics(cfg) api_post(f"{jarvis_url}/api/agent/metrics", {"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify) # Proxmox metrics if available if "proxmox" in detect_capabilities(cfg): px = collect_proxmox_metrics(cfg) if px: api_post(f"{jarvis_url}/api/agent/metrics", {"type": "proxmox", "data": px}, headers, ssl_verify=ssl_verify) last_metrics = now except Exception as e: print(f"[ERROR] Loop error: {e}", flush=True) time.sleep(heartbeat_every) # ── Self-update ──────────────────────────────────────────────────────────────── def self_update(cfg: dict) -> bool: """Check JARVIS server for a newer version of this script. If different, replace and restart.""" jarvis_url = cfg.get("jarvis_url", "").rstrip("/") default_update_url = f"{jarvis_url}/agent/jarvis-agent.py" if jarvis_url else "" update_url = cfg.get("update_url", default_update_url) if not update_url: return False script_path = os.path.abspath(__file__) try: req = urllib.request.Request(update_url) req.add_header("User-Agent", "JARVIS-Agent/1.0") with urllib.request.urlopen(req, timeout=30) as resp: new_content = resp.read() with open(script_path, "rb") as f: current = f.read() if new_content != current: print(f"[JARVIS] Update available — replacing {script_path} and restarting...", flush=True) with open(script_path, "wb") as f: f.write(new_content) os.execv(sys.executable, [sys.executable] + sys.argv) return True return False except Exception as e: print(f"[JARVIS] Self-update check failed: {e}", flush=True) return False if __name__ == "__main__": main()