diff --git a/pve2/scripts/jarvis-agent.py b/pve2/scripts/jarvis-agent.py new file mode 100755 index 0000000..8dcfe1e --- /dev/null +++ b/pve2/scripts/jarvis-agent.py @@ -0,0 +1,688 @@ +#!/usr/bin/env python3 +""" +JARVIS Agent — lightweight system monitor for Linux machines. +Registers with JARVIS, reports metrics, and executes commands. + +Install: sudo bash /opt/jarvis-agent/install.sh +Config: /etc/jarvis-agent/config.json +Logs: journalctl -u jarvis-agent -f +""" + +import json +import os +import platform +import socket +import subprocess +import sys +import time +import urllib.request +import urllib.error +import uuid +from datetime import datetime +from pathlib import Path + +CONFIG_PATH = "/etc/jarvis-agent/config.json" +STATE_PATH = "/var/lib/jarvis-agent/state.json" +AGENT_VERSION = "3.0" # Phase 4: screenshot + sysinfo commands + +# ── Config helpers ──────────────────────────────────────────────────────────── + +def load_config() -> dict: + if not os.path.exists(CONFIG_PATH): + print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.", flush=True) + sys.exit(1) + with open(CONFIG_PATH) as f: + return json.load(f) + +def load_state() -> dict: + if os.path.exists(STATE_PATH): + with open(STATE_PATH) as f: + return json.load(f) + return {} + +def save_state(state: dict): + Path(STATE_PATH).parent.mkdir(parents=True, exist_ok=True) + with open(STATE_PATH, "w") as f: + json.dump(state, f, indent=2) + +# ── HTTP helpers ────────────────────────────────────────────────────────────── + +import ssl as _ssl + +def _make_ssl_ctx(verify: bool) -> _ssl.SSLContext | None: + if not verify: + ctx = _ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = _ssl.CERT_NONE + return ctx + return None + +_host_header: str = "" # set from config at startup + +def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15, + ssl_verify: bool = True) -> dict: + body = json.dumps(payload).encode() + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/json") + req.add_header("User-Agent", "JARVIS-Agent/1.0") + if _host_header: + req.add_header("Host", _host_header) + for k, v in headers.items(): + req.add_header(k, v) + try: + ctx = _make_ssl_ctx(ssl_verify) + with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"} + except Exception as e: + return {"error": str(e)} + +def api_get(url: str, headers: dict = {}, timeout: int = 10, + ssl_verify: bool = True) -> dict: + req = urllib.request.Request(url) + req.add_header("User-Agent", "JARVIS-Agent/1.0") + if _host_header: + req.add_header("Host", _host_header) + for k, v in headers.items(): + req.add_header(k, v) + try: + ctx = _make_ssl_ctx(ssl_verify) + with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: + return json.loads(resp.read().decode()) + except Exception as e: + return {"error": str(e)} + +# ── Registration ────────────────────────────────────────────────────────────── + +def get_local_ip() -> str: + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + s.close() + return ip + except Exception: + return "unknown" + +def detect_capabilities(cfg: dict) -> list: + caps = ["metrics", "commands"] + # Check for Proxmox + if os.path.exists("/usr/bin/pvesh") or os.path.exists("/usr/sbin/pveversion"): + caps.append("proxmox") + # Check for Docker + if os.path.exists("/usr/bin/docker") or os.path.exists("/usr/local/bin/docker"): + caps.append("docker") + # Check for Ollama + if os.path.exists("/usr/local/bin/ollama") or os.path.exists("/usr/bin/ollama"): + caps.append("ollama") + # Check for Home Assistant + if os.path.exists("/etc/homeassistant") or os.path.exists("/config/configuration.yaml"): + caps.append("homeassistant") + # Phase 4: screenshot capability + import shutil as _shutil + if (_shutil.which("scrot") or _shutil.which("import") or + _shutil.which("fbcat") or _shutil.which("convert")): + caps.append("screenshot") + caps.append("sysinfo") + return caps + +def register(cfg: dict, state: dict) -> str: + """Register with JARVIS. Returns api_key.""" + hostname = cfg.get("hostname", socket.gethostname()) + agent_type = cfg.get("agent_type", "linux") + ip = get_local_ip() + capabilities = detect_capabilities(cfg) + agent_id = cfg.get("agent_id", f"{hostname}_{socket.gethostname()[:8]}") + ssl_verify = bool(cfg.get("ssl_verify", True)) + + print(f"[JARVIS] Registering as '{agent_id}' ({agent_type}) from {ip}...", flush=True) + + result = api_post( + f"{cfg['jarvis_url']}/api/agent/register", + { + "hostname": hostname, + "agent_type": agent_type, + "ip_address": ip, + "capabilities": capabilities, + "agent_id": agent_id, + }, + headers={"X-Registration-Key": cfg["registration_key"]}, + ssl_verify=ssl_verify, + ) + + if "error" in result: + print(f"[ERROR] Registration failed: {result['error']}", flush=True) + return "" + + api_key = result.get("api_key", "") + if api_key: + state["api_key"] = api_key + state["agent_id"] = result.get("agent_id", agent_id) + save_state(state) + print(f"[JARVIS] Registered. agent_id={state['agent_id']}", flush=True) + return api_key + +# ── Metrics collection ──────────────────────────────────────────────────────── + +def read_cpu_percent() -> float: + try: + with open("/proc/stat") as f: + line = f.readline() + fields = list(map(int, line.split()[1:])) + idle = fields[3] + total = sum(fields) + return round((1 - idle / total) * 100, 1) if total else 0.0 + except Exception: + return 0.0 + +_last_cpu = None + +def get_cpu_percent() -> float: + global _last_cpu + try: + with open("/proc/stat") as f: + line = f.readline() + fields = list(map(int, line.split()[1:])) + idle = fields[3] + fields[4] # idle + iowait + total = sum(fields) + if _last_cpu: + d_idle = idle - _last_cpu[0] + d_total = total - _last_cpu[1] + result = round((1 - d_idle / d_total) * 100, 1) if d_total else 0.0 + else: + result = 0.0 + _last_cpu = (idle, total) + return result + except Exception: + return 0.0 + +def get_memory() -> dict: + mem = {} + try: + with open("/proc/meminfo") as f: + for line in f: + parts = line.split() + if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:", "Buffers:", "Cached:"): + mem[parts[0].rstrip(":")] = int(parts[1]) + total = mem.get("MemTotal", 0) + available = mem.get("MemAvailable", 0) + used = total - available + return { + "total_mb": round(total / 1024, 1), + "used_mb": round(used / 1024, 1), + "free_mb": round(available / 1024, 1), + "percent": round(used / total * 100, 1) if total else 0, + } + except Exception: + return {} + +def get_disk() -> list: + disks = [] + try: + result = subprocess.run(["df", "-h", "--output=source,fstype,size,used,avail,pcent,target"], + capture_output=True, text=True, timeout=5) + lines = result.stdout.strip().split("\n")[1:] + for line in lines: + parts = line.split() + if len(parts) >= 7: + mount = parts[6] + if not any(mount.startswith(x) for x in ["/sys", "/proc", "/dev/pts", "/run", "/snap"]): + disks.append({ + "mount": mount, + "size": parts[2], + "used": parts[3], + "avail": parts[4], + "percent": parts[5].rstrip("%"), + }) + except Exception: + pass + return disks + +def get_uptime() -> dict: + try: + with open("/proc/uptime") as f: + secs = float(f.read().split()[0]) + days = int(secs // 86400) + hours = int((secs % 86400) // 3600) + minutes = int((secs % 3600) // 60) + return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes, + "human": f"{days}d {hours}h {minutes}m"} + except Exception: + return {} + +def get_services(cfg: dict) -> list: + watch = cfg.get("watch_services", ["ollama", "homeassistant", "mysql", "nginx", "apache2"]) + statuses = [] + for svc in watch: + try: + r = subprocess.run(["systemctl", "is-active", svc], capture_output=True, text=True, timeout=3) + statuses.append({"service": svc, "status": r.stdout.strip()}) + except Exception: + statuses.append({"service": svc, "status": "unknown"}) + return statuses + +def get_load() -> list: + try: + with open("/proc/loadavg") as f: + parts = f.read().split() + return [float(parts[0]), float(parts[1]), float(parts[2])] + except Exception: + return [0, 0, 0] + +def collect_metrics(cfg: dict) -> dict: + # First reading for CPU delta + get_cpu_percent() + time.sleep(1) + return { + "hostname": cfg.get("hostname", socket.gethostname()), + "cpu_percent": get_cpu_percent(), + "memory": get_memory(), + "disk": get_disk(), + "uptime": get_uptime(), + "load": get_load(), + "services": get_services(cfg), + "platform": platform.system(), + "timestamp": datetime.utcnow().isoformat() + "Z", + } + +# ── Proxmox metrics ─────────────────────────────────────────────────────────── + +def collect_proxmox_metrics(cfg: dict) -> dict | None: + try: + result = subprocess.run( + ["pvesh", "get", "/nodes/pve/status", "--output-format", "json"], + capture_output=True, text=True, timeout=10 + ) + node_status = json.loads(result.stdout) + vms_result = subprocess.run( + ["pvesh", "get", "/nodes/pve/qemu", "--output-format", "json"], + capture_output=True, text=True, timeout=10 + ) + vms = json.loads(vms_result.stdout) + return {"node": node_status, "vms": vms} + except Exception as e: + return {"error": str(e)} + +# ── Screenshot / Vision helpers ─────────────────────────────────────────────── + +def _take_screenshot(cmd_data: dict) -> dict: + """ + Attempts to capture a screenshot using available tools. + For headless servers, falls back to a rich text system snapshot. + Returns base64-encoded PNG and metadata. + """ + import base64, tempfile, shutil + + tmp = tempfile.mktemp(suffix=".png") + width = height = 0 + method = "unknown" + + # 1. Try scrot (X11 desktop) + if shutil.which("scrot") and os.environ.get("DISPLAY"): + try: + r = subprocess.run(["scrot", "-z", tmp], capture_output=True, timeout=10) + if r.returncode == 0 and os.path.exists(tmp): + method = "scrot" + except Exception: + pass + + # 2. Try import (ImageMagick X11) + if method == "unknown" and shutil.which("import") and os.environ.get("DISPLAY"): + try: + r = subprocess.run(["import", "-window", "root", tmp], capture_output=True, timeout=10) + if r.returncode == 0 and os.path.exists(tmp): + method = "import" + except Exception: + pass + + # 3. Try fbcat (Linux framebuffer — headless VMs with framebuffer) + if method == "unknown" and shutil.which("fbcat") and os.path.exists("/dev/fb0"): + try: + ppm = tempfile.mktemp(suffix=".ppm") + r = subprocess.run(["fbcat", "-s", "/dev/fb0"], stdout=open(ppm, "wb"), + stderr=subprocess.PIPE, timeout=10) + if r.returncode == 0 and shutil.which("convert"): + subprocess.run(["convert", ppm, tmp], capture_output=True, timeout=10) + os.unlink(ppm) + if os.path.exists(tmp): + method = "framebuffer" + except Exception: + pass + + # 4. Headless fallback: build a PNG system dashboard from text stats + if method == "unknown": + try: + result = _render_sysinfo_png(tmp) + if result: + method = "sysinfo_render" + except Exception: + pass + + if method == "unknown" or not os.path.exists(tmp): + # Last resort: return text snapshot only + snap = _sysinfo_snapshot() + snap["screenshot_available"] = False + snap["method"] = "text_only" + return snap + + # Read image + try: + with open(tmp, "rb") as f: + raw = f.read() + b64 = base64.b64encode(raw).decode() + fsize = len(raw) + os.unlink(tmp) + + # Try to get dimensions via file command + try: + r = subprocess.run(["identify", "-format", "%wx%h", tmp], + capture_output=True, text=True, timeout=5) + if "x" in r.stdout: + w, h = r.stdout.strip().split("x", 1) + width, height = int(w), int(h) + except Exception: + pass + + return { + "success": True, + "method": method, + "image_b64": b64, + "image_mime": "image/png", + "file_size": fsize, + "width": width, + "height": height, + "hostname": socket.gethostname(), + } + except Exception as e: + return {"success": False, "error": str(e), "method": method} + + +def _render_sysinfo_png(out_path: str) -> bool: + """Render a system info text snapshot as a PNG using ansi2image or ImageMagick.""" + import shutil + snap = _build_sysinfo_text() + # Try convert (ImageMagick) to render text → PNG + if shutil.which("convert"): + try: + r = subprocess.run([ + "convert", + "-size", "900x600", "xc:#0a0f14", + "-font", "Courier-New", + "-pointsize", "13", + "-fill", "#00d4ff", + "-annotate", "+20+30", snap[:3000], + out_path, + ], capture_output=True, timeout=15) + return r.returncode == 0 and os.path.exists(out_path) + except Exception: + pass + return False + + +def _build_sysinfo_text() -> str: + """Build a rich text system snapshot for headless machines.""" + lines = [f"JARVIS FIELD STATION — {socket.gethostname()}", + f"Timestamp: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}", + "─" * 60] + try: + # CPU / mem / disk + with open("/proc/loadavg") as f: + load = f.read().split()[:3] + lines.append(f"Load avg: {' '.join(load)}") + except Exception: + pass + try: + with open("/proc/meminfo") as f: + minfo = {l.split(":")[0].strip(): int(l.split()[1]) for l in f if ":" in l} + total = minfo.get("MemTotal", 0) + avail = minfo.get("MemAvailable", 0) + used = total - avail + lines.append(f"Memory: {used//1024}MB used / {total//1024}MB total") + except Exception: + pass + try: + r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5) + lines.append("Disk:\n" + r.stdout.strip()) + except Exception: + pass + try: + r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5) + lines.append("Top processes:\n" + "\n".join(r.stdout.splitlines()[1:8])) + except Exception: + pass + try: + r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) + lines.append("Listening ports:\n" + r.stdout.strip()[:500]) + except Exception: + pass + return "\n".join(lines) + + +def _sysinfo_snapshot() -> dict: + """Return structured system snapshot (no image) for text-based analysis.""" + data = {"success": True, "hostname": socket.gethostname(), + "snapshot_type": "text", "screenshot_available": False} + try: + with open("/proc/loadavg") as f: + parts = f.read().split() + data["load_1m"], data["load_5m"], data["load_15m"] = parts[0], parts[1], parts[2] + except Exception: + pass + try: + with open("/proc/meminfo") as f: + m = {l.split(":")[0].strip(): int(l.split()[1]) + for l in f if ":" in l and len(l.split()) >= 2} + data["mem_total_mb"] = m.get("MemTotal", 0) // 1024 + data["mem_avail_mb"] = m.get("MemAvailable", 0) // 1024 + data["mem_used_mb"] = data["mem_total_mb"] - data["mem_avail_mb"] + except Exception: + pass + try: + r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5) + data["disk"] = r.stdout.splitlines()[1] if r.stdout else "" + except Exception: + pass + try: + r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5) + data["top_procs"] = r.stdout.splitlines()[1:8] + except Exception: + pass + try: + r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) + data["listening_ports"] = r.stdout.strip()[:800] + except Exception: + pass + return data + + +# ── Command execution ───────────────────────────────────────────────────────── + +def execute_command(cmd: dict) -> dict: + cmd_type = cmd.get("command_type", "") + cmd_data = cmd.get("command_data", {}) + + try: + if cmd_type == "restart_service": + svc = cmd_data.get("service", "") + if not svc or "/" in svc: + return {"success": False, "error": "Invalid service name"} + r = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True, timeout=30) + return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr} + + elif cmd_type == "get_logs": + svc = cmd_data.get("service", "") + lines = min(int(cmd_data.get("lines", 50)), 200) + if not svc or "/" in svc: + return {"success": False, "error": "Invalid service name"} + r = subprocess.run(["journalctl", "-u", svc, "-n", str(lines), "--no-pager"], + capture_output=True, text=True, timeout=15) + return {"success": True, "output": r.stdout} + + elif cmd_type == "ping": + host = cmd_data.get("host", "8.8.8.8") + r = subprocess.run(["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=15) + return {"success": r.returncode == 0, "output": r.stdout} + + elif cmd_type == "update": + _cfg = load_config() + updated = self_update(_cfg) + return {"success": True, "updated": updated} + + elif cmd_type == "shell": + # Guard reads LOCAL config, not the server-supplied payload + _cfg = load_config() + if not _cfg.get("allow_shell_commands", False): + return {"success": False, "error": "Shell commands not enabled in agent config"} + cmd_str = cmd_data.get("command", "") + r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30) + return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]} + + elif cmd_type == "screenshot": + return _take_screenshot(cmd_data) + + elif cmd_type == "sysinfo": + return _sysinfo_snapshot() + + else: + return {"success": False, "error": f"Unknown command type: {cmd_type}"} + + except subprocess.TimeoutExpired: + return {"success": False, "error": "Command timed out"} + except Exception as e: + return {"success": False, "error": str(e)} + +# ── Main loop ───────────────────────────────────────────────────────────────── + +def main(): + global _host_header + cfg = load_config() + state = load_state() + + jarvis_url = cfg["jarvis_url"].rstrip("/") + ssl_verify = bool(cfg.get("ssl_verify", True)) + _host_header = cfg.get("host_header", "") + poll_interval = int(cfg.get("poll_interval", 30)) + heartbeat_every = int(cfg.get("heartbeat_every", 10)) + + # Always re-register on startup to refresh capabilities, version, and IP. + # Server does an UPDATE when agent_id already exists, so api_key is preserved. + api_key = state.get("api_key", "") + registered_key = register(cfg, state) + if registered_key: + api_key = registered_key + elif not api_key: + while not api_key: + api_key = register(cfg, state) + if not api_key: + print("[ERROR] Could not register with JARVIS. Retrying in 60s...", flush=True) + time.sleep(60) + + headers = {"X-Agent-Key": api_key} + last_metrics = 0 + last_update_chk = 0 + update_interval = int(cfg.get("update_check_hours", 24)) * 3600 + tick = 0 + + print(f"[JARVIS] Agent v{AGENT_VERSION} running. Polling {jarvis_url} every {heartbeat_every}s.", flush=True) + + while True: + tick += 1 + now = time.time() + + try: + # Heartbeat + get commands + hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {}, headers, ssl_verify=ssl_verify) + if "error" in hb: + print(f"[WARN] Heartbeat failed: {hb['error']}", flush=True) + else: + commands = hb.get("commands", []) + for cmd in commands: + print(f"[CMD] Executing: {cmd['command_type']}", flush=True) + result = execute_command(cmd) + api_post(f"{jarvis_url}/api/agent/command_result", + {"command_id": cmd["id"], "success": result.get("success", False), "result": result}, + headers, ssl_verify=ssl_verify) + + # Self-update check (every update_interval seconds, default 24h) + if now - last_update_chk >= update_interval: + last_update_chk = now + self_update(cfg) # restarts process if update found + + # Push metrics every poll_interval seconds + if now - last_metrics >= poll_interval: + metrics = collect_metrics(cfg) + api_post(f"{jarvis_url}/api/agent/metrics", + {"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify) + + # Proxmox metrics if available + if "proxmox" in detect_capabilities(cfg): + px = collect_proxmox_metrics(cfg) + if px: + api_post(f"{jarvis_url}/api/agent/metrics", + {"type": "proxmox", "data": px}, headers, ssl_verify=ssl_verify) + + last_metrics = now + + except Exception as e: + print(f"[ERROR] Loop error: {e}", flush=True) + + time.sleep(heartbeat_every) + + +# ── Self-update ──────────────────────────────────────────────────────────────── + +def self_update(cfg: dict) -> bool: + """Check JARVIS server for a newer version of this script. + Verifies SHA-256 hash from .sha256 before replacing.""" + import hashlib + jarvis_url = cfg.get("jarvis_url", "").rstrip("/") + default_update_url = f"{jarvis_url}/agent/jarvis-agent.py" if jarvis_url else "" + update_url = cfg.get("update_url", default_update_url) + if not update_url: + return False + script_path = os.path.abspath(__file__) + try: + # Download expected hash first + hash_url = update_url + ".sha256" + req_hash = urllib.request.Request(hash_url) + req_hash.add_header("User-Agent", "JARVIS-Agent/1.0") + if _host_header: + req_hash.add_header("Host", _host_header) + try: + with urllib.request.urlopen(req_hash, timeout=10) as resp: + expected_hash = resp.read().decode().strip().split()[0] + except Exception: + expected_hash = None + + # Download new script + req = urllib.request.Request(update_url) + req.add_header("User-Agent", "JARVIS-Agent/1.0") + if _host_header: + req.add_header("Host", _host_header) + with urllib.request.urlopen(req, timeout=30) as resp: + new_content = resp.read() + + # Verify hash if available — abort if mismatch + if expected_hash: + actual_hash = hashlib.sha256(new_content).hexdigest() + if actual_hash != expected_hash: + print(f"[JARVIS] Update hash mismatch (expected {expected_hash[:16]}… got {actual_hash[:16]}…) — aborting", flush=True) + return False + + with open(script_path, "rb") as f: + current = f.read() + if new_content != current: + print(f"[JARVIS] Update verified — replacing {script_path} and restarting...", flush=True) + with open(script_path, "wb") as f: + f.write(new_content) + os.execv(sys.executable, [sys.executable] + sys.argv) + return True + return False + except Exception as e: + print(f"[JARVIS] Self-update check failed: {e}", flush=True) + return False + + +if __name__ == "__main__": + main()