#!/usr/bin/env python3 """ JARVIS Agent — lightweight system monitor for Linux machines. Registers with JARVIS, reports metrics, and executes commands. Install: sudo bash /opt/jarvis-agent/install.sh Config: /etc/jarvis-agent/config.json Logs: journalctl -u jarvis-agent -f """ import json import os import platform import socket import subprocess import sys import time import urllib.request import urllib.error import uuid from datetime import datetime from pathlib import Path CONFIG_PATH = "/etc/jarvis-agent/config.json" STATE_PATH = "/var/lib/jarvis-agent/state.json" AGENT_VERSION = "3.0" # Phase 4: screenshot + sysinfo commands # ── Config helpers ──────────────────────────────────────────────────────────── def load_config() -> dict: if not os.path.exists(CONFIG_PATH): print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.", flush=True) sys.exit(1) with open(CONFIG_PATH) as f: return json.load(f) def load_state() -> dict: if os.path.exists(STATE_PATH): with open(STATE_PATH) as f: return json.load(f) return {} def save_state(state: dict): Path(STATE_PATH).parent.mkdir(parents=True, exist_ok=True) with open(STATE_PATH, "w") as f: json.dump(state, f, indent=2) # ── HTTP helpers ────────────────────────────────────────────────────────────── import ssl as _ssl def _make_ssl_ctx(verify: bool) -> _ssl.SSLContext | None: if not verify: ctx = _ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = _ssl.CERT_NONE return ctx return None _host_header: str = "" # set from config at startup def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15, ssl_verify: bool = True) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"} except Exception as e: return {"error": str(e)} def api_get(url: str, headers: dict = {}, timeout: int = 10, ssl_verify: bool = True) -> dict: req = urllib.request.Request(url) req.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except Exception as e: return {"error": str(e)} # ── Registration ────────────────────────────────────────────────────────────── def get_local_ip() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "unknown" def detect_capabilities(cfg: dict) -> list: caps = ["metrics", "commands"] # Check for Proxmox if os.path.exists("/usr/bin/pvesh") or os.path.exists("/usr/sbin/pveversion"): caps.append("proxmox") # Check for Docker if os.path.exists("/usr/bin/docker") or os.path.exists("/usr/local/bin/docker"): caps.append("docker") # Check for Ollama if os.path.exists("/usr/local/bin/ollama") or os.path.exists("/usr/bin/ollama"): caps.append("ollama") # Check for Home Assistant if os.path.exists("/etc/homeassistant") or os.path.exists("/config/configuration.yaml"): caps.append("homeassistant") # Phase 4: screenshot capability import shutil as _shutil if (_shutil.which("scrot") or _shutil.which("import") or _shutil.which("fbcat") or _shutil.which("convert")): caps.append("screenshot") caps.append("sysinfo") return caps def register(cfg: dict, state: dict) -> str: """Register with JARVIS. Returns api_key.""" hostname = cfg.get("hostname", socket.gethostname()) agent_type = cfg.get("agent_type", "linux") ip = get_local_ip() capabilities = detect_capabilities(cfg) agent_id = cfg.get("agent_id", f"{hostname}_{socket.gethostname()[:8]}") ssl_verify = bool(cfg.get("ssl_verify", True)) print(f"[JARVIS] Registering as '{agent_id}' ({agent_type}) from {ip}...", flush=True) result = api_post( f"{cfg['jarvis_url']}/api/agent/register", { "hostname": hostname, "agent_type": agent_type, "ip_address": ip, "capabilities": capabilities, "agent_id": agent_id, }, headers={"X-Registration-Key": cfg["registration_key"]}, ssl_verify=ssl_verify, ) if "error" in result: print(f"[ERROR] Registration failed: {result['error']}", flush=True) return "" api_key = result.get("api_key", "") if api_key: state["api_key"] = api_key state["agent_id"] = result.get("agent_id", agent_id) save_state(state) print(f"[JARVIS] Registered. agent_id={state['agent_id']}", flush=True) return api_key # ── Metrics collection ──────────────────────────────────────────────────────── def read_cpu_percent() -> float: try: with open("/proc/stat") as f: line = f.readline() fields = list(map(int, line.split()[1:])) idle = fields[3] total = sum(fields) return round((1 - idle / total) * 100, 1) if total else 0.0 except Exception: return 0.0 _last_cpu = None def get_cpu_percent() -> float: global _last_cpu try: with open("/proc/stat") as f: line = f.readline() fields = list(map(int, line.split()[1:])) idle = fields[3] + fields[4] # idle + iowait total = sum(fields) if _last_cpu: d_idle = idle - _last_cpu[0] d_total = total - _last_cpu[1] result = round((1 - d_idle / d_total) * 100, 1) if d_total else 0.0 else: result = 0.0 _last_cpu = (idle, total) return result except Exception: return 0.0 def get_memory() -> dict: mem = {} try: with open("/proc/meminfo") as f: for line in f: parts = line.split() if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:", "Buffers:", "Cached:"): mem[parts[0].rstrip(":")] = int(parts[1]) total = mem.get("MemTotal", 0) available = mem.get("MemAvailable", 0) used = total - available return { "total_mb": round(total / 1024, 1), "used_mb": round(used / 1024, 1), "free_mb": round(available / 1024, 1), "percent": round(used / total * 100, 1) if total else 0, } except Exception: return {} def get_disk() -> list: disks = [] try: result = subprocess.run(["df", "-h", "--output=source,fstype,size,used,avail,pcent,target"], capture_output=True, text=True, timeout=5) lines = result.stdout.strip().split("\n")[1:] for line in lines: parts = line.split() if len(parts) >= 7: mount = parts[6] if not any(mount.startswith(x) for x in ["/sys", "/proc", "/dev/pts", "/run", "/snap"]): disks.append({ "mount": mount, "size": parts[2], "used": parts[3], "avail": parts[4], "percent": parts[5].rstrip("%"), }) except Exception: pass return disks def get_uptime() -> dict: try: with open("/proc/uptime") as f: secs = float(f.read().split()[0]) days = int(secs // 86400) hours = int((secs % 86400) // 3600) minutes = int((secs % 3600) // 60) return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes, "human": f"{days}d {hours}h {minutes}m"} except Exception: return {} def get_services(cfg: dict) -> list: watch = cfg.get("watch_services", ["ollama", "homeassistant", "mysql", "nginx", "apache2"]) statuses = [] for svc in watch: try: r = subprocess.run(["systemctl", "is-active", svc], capture_output=True, text=True, timeout=3) statuses.append({"service": svc, "status": r.stdout.strip()}) except Exception: statuses.append({"service": svc, "status": "unknown"}) return statuses def get_load() -> list: try: with open("/proc/loadavg") as f: parts = f.read().split() return [float(parts[0]), float(parts[1]), float(parts[2])] except Exception: return [0, 0, 0] def collect_metrics(cfg: dict) -> dict: # First reading for CPU delta get_cpu_percent() time.sleep(1) return { "hostname": cfg.get("hostname", socket.gethostname()), "cpu_percent": get_cpu_percent(), "memory": get_memory(), "disk": get_disk(), "uptime": get_uptime(), "load": get_load(), "services": get_services(cfg), "platform": platform.system(), "timestamp": datetime.utcnow().isoformat() + "Z", } # ── Proxmox metrics ─────────────────────────────────────────────────────────── def collect_proxmox_metrics(cfg: dict) -> dict | None: try: result = subprocess.run( ["pvesh", "get", "/nodes/pve/status", "--output-format", "json"], capture_output=True, text=True, timeout=10 ) node_status = json.loads(result.stdout) vms_result = subprocess.run( ["pvesh", "get", "/nodes/pve/qemu", "--output-format", "json"], capture_output=True, text=True, timeout=10 ) vms = json.loads(vms_result.stdout) return {"node": node_status, "vms": vms} except Exception as e: return {"error": str(e)} # ── Screenshot / Vision helpers ─────────────────────────────────────────────── def _take_screenshot(cmd_data: dict) -> dict: """ Attempts to capture a screenshot using available tools. For headless servers, falls back to a rich text system snapshot. Returns base64-encoded PNG and metadata. """ import base64, tempfile, shutil tmp = tempfile.mktemp(suffix=".png") width = height = 0 method = "unknown" # 1. Try scrot (X11 desktop) if shutil.which("scrot") and os.environ.get("DISPLAY"): try: r = subprocess.run(["scrot", "-z", tmp], capture_output=True, timeout=10) if r.returncode == 0 and os.path.exists(tmp): method = "scrot" except Exception: pass # 2. Try import (ImageMagick X11) if method == "unknown" and shutil.which("import") and os.environ.get("DISPLAY"): try: r = subprocess.run(["import", "-window", "root", tmp], capture_output=True, timeout=10) if r.returncode == 0 and os.path.exists(tmp): method = "import" except Exception: pass # 3. Try fbcat (Linux framebuffer — headless VMs with framebuffer) if method == "unknown" and shutil.which("fbcat") and os.path.exists("/dev/fb0"): try: ppm = tempfile.mktemp(suffix=".ppm") r = subprocess.run(["fbcat", "-s", "/dev/fb0"], stdout=open(ppm, "wb"), stderr=subprocess.PIPE, timeout=10) if r.returncode == 0 and shutil.which("convert"): subprocess.run(["convert", ppm, tmp], capture_output=True, timeout=10) os.unlink(ppm) if os.path.exists(tmp): method = "framebuffer" except Exception: pass # 4. Headless fallback: build a PNG system dashboard from text stats if method == "unknown": try: result = _render_sysinfo_png(tmp) if result: method = "sysinfo_render" except Exception: pass if method == "unknown" or not os.path.exists(tmp): # Last resort: return text snapshot only snap = _sysinfo_snapshot() snap["screenshot_available"] = False snap["method"] = "text_only" return snap # Read image try: with open(tmp, "rb") as f: raw = f.read() b64 = base64.b64encode(raw).decode() fsize = len(raw) os.unlink(tmp) # Try to get dimensions via file command try: r = subprocess.run(["identify", "-format", "%wx%h", tmp], capture_output=True, text=True, timeout=5) if "x" in r.stdout: w, h = r.stdout.strip().split("x", 1) width, height = int(w), int(h) except Exception: pass return { "success": True, "method": method, "image_b64": b64, "image_mime": "image/png", "file_size": fsize, "width": width, "height": height, "hostname": socket.gethostname(), } except Exception as e: return {"success": False, "error": str(e), "method": method} def _render_sysinfo_png(out_path: str) -> bool: """Render a system info text snapshot as a PNG using ansi2image or ImageMagick.""" import shutil snap = _build_sysinfo_text() # Try convert (ImageMagick) to render text → PNG if shutil.which("convert"): try: r = subprocess.run([ "convert", "-size", "900x600", "xc:#0a0f14", "-font", "Courier-New", "-pointsize", "13", "-fill", "#00d4ff", "-annotate", "+20+30", snap[:3000], out_path, ], capture_output=True, timeout=15) return r.returncode == 0 and os.path.exists(out_path) except Exception: pass return False def _build_sysinfo_text() -> str: """Build a rich text system snapshot for headless machines.""" lines = [f"JARVIS FIELD STATION — {socket.gethostname()}", f"Timestamp: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}", "─" * 60] try: # CPU / mem / disk with open("/proc/loadavg") as f: load = f.read().split()[:3] lines.append(f"Load avg: {' '.join(load)}") except Exception: pass try: with open("/proc/meminfo") as f: minfo = {l.split(":")[0].strip(): int(l.split()[1]) for l in f if ":" in l} total = minfo.get("MemTotal", 0) avail = minfo.get("MemAvailable", 0) used = total - avail lines.append(f"Memory: {used//1024}MB used / {total//1024}MB total") except Exception: pass try: r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5) lines.append("Disk:\n" + r.stdout.strip()) except Exception: pass try: r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5) lines.append("Top processes:\n" + "\n".join(r.stdout.splitlines()[1:8])) except Exception: pass try: r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) lines.append("Listening ports:\n" + r.stdout.strip()[:500]) except Exception: pass return "\n".join(lines) def _sysinfo_snapshot() -> dict: """Return structured system snapshot (no image) for text-based analysis.""" data = {"success": True, "hostname": socket.gethostname(), "snapshot_type": "text", "screenshot_available": False} try: with open("/proc/loadavg") as f: parts = f.read().split() data["load_1m"], data["load_5m"], data["load_15m"] = parts[0], parts[1], parts[2] except Exception: pass try: with open("/proc/meminfo") as f: m = {l.split(":")[0].strip(): int(l.split()[1]) for l in f if ":" in l and len(l.split()) >= 2} data["mem_total_mb"] = m.get("MemTotal", 0) // 1024 data["mem_avail_mb"] = m.get("MemAvailable", 0) // 1024 data["mem_used_mb"] = data["mem_total_mb"] - data["mem_avail_mb"] except Exception: pass try: r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5) data["disk"] = r.stdout.splitlines()[1] if r.stdout else "" except Exception: pass try: r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5) data["top_procs"] = r.stdout.splitlines()[1:8] except Exception: pass try: r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) data["listening_ports"] = r.stdout.strip()[:800] except Exception: pass return data # ── Command execution ───────────────────────────────────────────────────────── def execute_command(cmd: dict) -> dict: cmd_type = cmd.get("command_type", "") cmd_data = cmd.get("command_data", {}) try: if cmd_type == "restart_service": svc = cmd_data.get("service", "") if not svc or "/" in svc: return {"success": False, "error": "Invalid service name"} r = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True, timeout=30) return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr} elif cmd_type == "get_logs": svc = cmd_data.get("service", "") lines = min(int(cmd_data.get("lines", 50)), 200) if not svc or "/" in svc: return {"success": False, "error": "Invalid service name"} r = subprocess.run(["journalctl", "-u", svc, "-n", str(lines), "--no-pager"], capture_output=True, text=True, timeout=15) return {"success": True, "output": r.stdout} elif cmd_type == "ping": host = cmd_data.get("host", "8.8.8.8") r = subprocess.run(["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=15) return {"success": r.returncode == 0, "output": r.stdout} elif cmd_type == "update": _cfg = load_config() updated = self_update(_cfg) return {"success": True, "updated": updated} elif cmd_type == "shell": # Guard reads LOCAL config, not the server-supplied payload _cfg = load_config() if not _cfg.get("allow_shell_commands", False): return {"success": False, "error": "Shell commands not enabled in agent config"} cmd_str = cmd_data.get("command", "") r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30) return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]} elif cmd_type == "screenshot": return _take_screenshot(cmd_data) elif cmd_type == "sysinfo": return _sysinfo_snapshot() else: return {"success": False, "error": f"Unknown command type: {cmd_type}"} except subprocess.TimeoutExpired: return {"success": False, "error": "Command timed out"} except Exception as e: return {"success": False, "error": str(e)} # ── Main loop ───────────────────────────────────────────────────────────────── def main(): global _host_header cfg = load_config() state = load_state() jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", True)) _host_header = cfg.get("host_header", "") poll_interval = int(cfg.get("poll_interval", 30)) heartbeat_every = int(cfg.get("heartbeat_every", 10)) # Always re-register on startup to refresh capabilities, version, and IP. # Server does an UPDATE when agent_id already exists, so api_key is preserved. api_key = state.get("api_key", "") registered_key = register(cfg, state) if registered_key: api_key = registered_key elif not api_key: while not api_key: api_key = register(cfg, state) if not api_key: print("[ERROR] Could not register with JARVIS. Retrying in 60s...", flush=True) time.sleep(60) headers = {"X-Agent-Key": api_key} last_metrics = 0 last_update_chk = 0 update_interval = int(cfg.get("update_check_hours", 24)) * 3600 tick = 0 print(f"[JARVIS] Agent v{AGENT_VERSION} running. Polling {jarvis_url} every {heartbeat_every}s.", flush=True) while True: tick += 1 now = time.time() try: # Heartbeat + get commands hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {}, headers, ssl_verify=ssl_verify) if "error" in hb: print(f"[WARN] Heartbeat failed: {hb['error']}", flush=True) else: commands = hb.get("commands", []) for cmd in commands: print(f"[CMD] Executing: {cmd['command_type']}", flush=True) result = execute_command(cmd) api_post(f"{jarvis_url}/api/agent/command_result", {"command_id": cmd["id"], "success": result.get("success", False), "result": result}, headers, ssl_verify=ssl_verify) # Self-update check (every update_interval seconds, default 24h) if now - last_update_chk >= update_interval: last_update_chk = now self_update(cfg) # restarts process if update found # Push metrics every poll_interval seconds if now - last_metrics >= poll_interval: metrics = collect_metrics(cfg) api_post(f"{jarvis_url}/api/agent/metrics", {"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify) # Proxmox metrics if available if "proxmox" in detect_capabilities(cfg): px = collect_proxmox_metrics(cfg) if px: api_post(f"{jarvis_url}/api/agent/metrics", {"type": "proxmox", "data": px}, headers, ssl_verify=ssl_verify) last_metrics = now except Exception as e: print(f"[ERROR] Loop error: {e}", flush=True) time.sleep(heartbeat_every) # ── Self-update ──────────────────────────────────────────────────────────────── def self_update(cfg: dict) -> bool: """Check JARVIS server for a newer version of this script. Verifies SHA-256 hash from .sha256 before replacing.""" import hashlib jarvis_url = cfg.get("jarvis_url", "").rstrip("/") default_update_url = f"{jarvis_url}/agent/jarvis-agent.py" if jarvis_url else "" update_url = cfg.get("update_url", default_update_url) if not update_url: return False script_path = os.path.abspath(__file__) try: # Download expected hash first hash_url = update_url + ".sha256" req_hash = urllib.request.Request(hash_url) req_hash.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req_hash.add_header("Host", _host_header) try: with urllib.request.urlopen(req_hash, timeout=10) as resp: expected_hash = resp.read().decode().strip().split()[0] except Exception: expected_hash = None # Download new script req = urllib.request.Request(update_url) req.add_header("User-Agent", "JARVIS-Agent/1.0") if _host_header: req.add_header("Host", _host_header) with urllib.request.urlopen(req, timeout=30) as resp: new_content = resp.read() # Verify hash if available — abort if mismatch if expected_hash: actual_hash = hashlib.sha256(new_content).hexdigest() if actual_hash != expected_hash: print(f"[JARVIS] Update hash mismatch (expected {expected_hash[:16]}… got {actual_hash[:16]}…) — aborting", flush=True) return False with open(script_path, "rb") as f: current = f.read() if new_content != current: print(f"[JARVIS] Update verified — replacing {script_path} and restarting...", flush=True) with open(script_path, "wb") as f: f.write(new_content) os.execv(sys.executable, [sys.executable] + sys.argv) return True return False except Exception as e: print(f"[JARVIS] Self-update check failed: {e}", flush=True) return False if __name__ == "__main__": main()