Files
do-server-config/scripts/jarvis-agent.py
T

506 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""
JARVIS Agent — lightweight system monitor for Linux machines.
Registers with JARVIS, reports metrics, and executes commands.
Install: sudo bash /opt/jarvis-agent/install.sh
Config: /etc/jarvis-agent/config.json
Logs: journalctl -u jarvis-agent -f
"""
import json
import os
import platform
import socket
import subprocess
import sys
import time
import urllib.request
import urllib.error
import uuid
from datetime import datetime
from pathlib import Path
CONFIG_PATH = "/etc/jarvis-agent/config.json"
STATE_PATH = "/var/lib/jarvis-agent/state.json"
AGENT_VERSION = "3.1"
# ── Config helpers ────────────────────────────────────────────────────────────
def load_config() -> dict:
legacy_path = "/opt/jarvis-agent/config.json"
if not os.path.exists(CONFIG_PATH):
if os.path.exists(legacy_path):
print(f"[JARVIS] Config found at legacy path {legacy_path} - migrating...", flush=True)
Path(CONFIG_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(legacy_path) as f:
cfg = json.load(f)
else:
print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.", flush=True)
sys.exit(1)
else:
with open(CONFIG_PATH) as f:
cfg = json.load(f)
# Migrate old key names so the agent self-heals instead of crash-looping
import re as _re
changed = False
if "server_url" in cfg and "jarvis_url" not in cfg:
cfg["jarvis_url"] = cfg.pop("server_url")
print("[JARVIS] Config migrated: server_url -> jarvis_url", flush=True)
changed = True
if "api_key" in cfg and "registration_key" not in cfg:
cfg["registration_key"] = cfg.pop("api_key")
print("[JARVIS] Config migrated: api_key -> registration_key", flush=True)
changed = True
if "hostname" not in cfg:
cfg["hostname"] = socket.gethostname()
changed = True
if "ssl_verify" not in cfg:
cfg["ssl_verify"] = not bool(_re.match(r"https?://\d+\.\d+\.\d+\.\d+", cfg.get("jarvis_url", "")))
changed = True
if changed:
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
print("[JARVIS] Config saved after migration.", flush=True)
return cfg
def load_state() -> dict:
if os.path.exists(STATE_PATH):
with open(STATE_PATH) as f:
return json.load(f)
return {}
def save_state(state: dict):
Path(STATE_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(STATE_PATH, "w") as f:
json.dump(state, f, indent=2)
# ── HTTP helpers ──────────────────────────────────────────────────────────────
import ssl as _ssl
def _make_ssl_ctx(verify: bool) -> _ssl.SSLContext | None:
if not verify:
ctx = _ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = _ssl.CERT_NONE
return ctx
return None
_host_header: str = "" # set from config at startup
def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15,
ssl_verify: bool = True) -> dict:
body = json.dumps(payload).encode()
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "JARVIS-Agent/1.0")
if _host_header:
req.add_header("Host", _host_header)
for k, v in headers.items():
req.add_header(k, v)
try:
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"}
except Exception as e:
return {"error": str(e)}
def api_get(url: str, headers: dict = {}, timeout: int = 10,
ssl_verify: bool = True) -> dict:
req = urllib.request.Request(url)
req.add_header("User-Agent", "JARVIS-Agent/1.0")
if _host_header:
req.add_header("Host", _host_header)
for k, v in headers.items():
req.add_header(k, v)
try:
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return json.loads(resp.read().decode())
except Exception as e:
return {"error": str(e)}
# ── Registration ──────────────────────────────────────────────────────────────
def get_local_ip() -> str:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "unknown"
def detect_capabilities(cfg: dict) -> list:
caps = ["metrics", "commands"]
# Check for Proxmox
if os.path.exists("/usr/bin/pvesh") or os.path.exists("/usr/sbin/pveversion"):
caps.append("proxmox")
# Check for Docker
if os.path.exists("/usr/bin/docker") or os.path.exists("/usr/local/bin/docker"):
caps.append("docker")
# Check for Ollama
if os.path.exists("/usr/local/bin/ollama") or os.path.exists("/usr/bin/ollama"):
caps.append("ollama")
# Check for Home Assistant
if os.path.exists("/etc/homeassistant") or os.path.exists("/config/configuration.yaml"):
caps.append("homeassistant")
return caps
def register(cfg: dict, state: dict) -> str:
"""Register with JARVIS. Returns api_key."""
hostname = cfg.get("hostname", socket.gethostname())
agent_type = cfg.get("agent_type", "linux")
ip = get_local_ip()
capabilities = detect_capabilities(cfg)
agent_id = cfg.get("agent_id", f"{hostname}_{socket.gethostname()[:8]}")
ssl_verify = bool(cfg.get("ssl_verify", True))
print(f"[JARVIS] Registering as '{agent_id}' ({agent_type}) from {ip}...", flush=True)
result = api_post(
f"{cfg['jarvis_url']}/api/agent/register",
{
"hostname": hostname,
"version": AGENT_VERSION,
"agent_type": agent_type,
"ip_address": ip,
"capabilities": capabilities,
"agent_id": agent_id,
},
headers={"X-Registration-Key": cfg["registration_key"]},
ssl_verify=ssl_verify,
)
if "error" in result:
print(f"[ERROR] Registration failed: {result['error']}", flush=True)
return ""
api_key = result.get("api_key", "")
if api_key:
state["api_key"] = api_key
state["agent_id"] = result.get("agent_id", agent_id)
save_state(state)
print(f"[JARVIS] Registered. agent_id={state['agent_id']}", flush=True)
return api_key
# ── Metrics collection ────────────────────────────────────────────────────────
def read_cpu_percent() -> float:
try:
with open("/proc/stat") as f:
line = f.readline()
fields = list(map(int, line.split()[1:]))
idle = fields[3]
total = sum(fields)
return round((1 - idle / total) * 100, 1) if total else 0.0
except Exception:
return 0.0
_last_cpu = None
def get_cpu_percent() -> float:
global _last_cpu
try:
with open("/proc/stat") as f:
line = f.readline()
fields = list(map(int, line.split()[1:]))
idle = fields[3] + fields[4] # idle + iowait
total = sum(fields)
if _last_cpu:
d_idle = idle - _last_cpu[0]
d_total = total - _last_cpu[1]
result = round((1 - d_idle / d_total) * 100, 1) if d_total else 0.0
else:
result = 0.0
_last_cpu = (idle, total)
return result
except Exception:
return 0.0
def get_memory() -> dict:
mem = {}
try:
with open("/proc/meminfo") as f:
for line in f:
parts = line.split()
if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:", "Buffers:", "Cached:"):
mem[parts[0].rstrip(":")] = int(parts[1])
total = mem.get("MemTotal", 0)
available = mem.get("MemAvailable", 0)
used = total - available
return {
"total_mb": round(total / 1024, 1),
"used_mb": round(used / 1024, 1),
"free_mb": round(available / 1024, 1),
"percent": round(used / total * 100, 1) if total else 0,
}
except Exception:
return {}
def get_disk() -> list:
disks = []
try:
result = subprocess.run(["df", "-h", "--output=source,fstype,size,used,avail,pcent,target"],
capture_output=True, text=True, timeout=5)
lines = result.stdout.strip().split("\n")[1:]
for line in lines:
parts = line.split()
if len(parts) >= 7:
mount = parts[6]
if not any(mount.startswith(x) for x in ["/sys", "/proc", "/dev/pts", "/run", "/snap"]):
disks.append({
"mount": mount,
"size": parts[2],
"used": parts[3],
"avail": parts[4],
"percent": parts[5].rstrip("%"),
})
except Exception:
pass
return disks
def get_uptime() -> dict:
try:
with open("/proc/uptime") as f:
secs = float(f.read().split()[0])
days = int(secs // 86400)
hours = int((secs % 86400) // 3600)
minutes = int((secs % 3600) // 60)
return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes,
"human": f"{days}d {hours}h {minutes}m"}
except Exception:
return {}
def get_services(cfg: dict) -> list:
watch = cfg.get("watch_services", ["ollama", "homeassistant", "mysql", "nginx", "apache2"])
statuses = []
for svc in watch:
try:
r = subprocess.run(["systemctl", "is-active", svc], capture_output=True, text=True, timeout=3)
statuses.append({"service": svc, "status": r.stdout.strip()})
except Exception:
statuses.append({"service": svc, "status": "unknown"})
return statuses
def get_load() -> list:
try:
with open("/proc/loadavg") as f:
parts = f.read().split()
return [float(parts[0]), float(parts[1]), float(parts[2])]
except Exception:
return [0, 0, 0]
def get_nordvpn_status() -> dict | None:
"""Check nordlynx WireGuard interface. Returns None if nordlynx not present on this host."""
try:
r = subprocess.run(["ip", "link", "show", "nordlynx"],
capture_output=True, text=True, timeout=3)
if r.returncode != 0:
return None
active = "UP,LOWER_UP" in r.stdout or "state UP" in r.stdout
return {"active": active, "interface": "nordlynx"}
except Exception:
return None
def collect_metrics(cfg: dict) -> dict:
# First reading for CPU delta
get_cpu_percent()
time.sleep(1)
metrics = {
"hostname": cfg.get("hostname", socket.gethostname()),
"cpu_percent": get_cpu_percent(),
"memory": get_memory(),
"disk": get_disk(),
"uptime": get_uptime(),
"load": get_load(),
"services": get_services(cfg),
"platform": platform.system(),
"timestamp": datetime.utcnow().isoformat() + "Z",
}
nordvpn = get_nordvpn_status()
if nordvpn is not None:
metrics["nordvpn"] = nordvpn
return metrics
# ── Proxmox metrics ───────────────────────────────────────────────────────────
def collect_proxmox_metrics(cfg: dict) -> dict | None:
try:
result = subprocess.run(
["pvesh", "get", "/nodes/pve/status", "--output-format", "json"],
capture_output=True, text=True, timeout=10
)
node_status = json.loads(result.stdout)
vms_result = subprocess.run(
["pvesh", "get", "/nodes/pve/qemu", "--output-format", "json"],
capture_output=True, text=True, timeout=10
)
vms = json.loads(vms_result.stdout)
return {"node": node_status, "vms": vms}
except Exception as e:
return {"error": str(e)}
# ── Command execution ─────────────────────────────────────────────────────────
def execute_command(cmd: dict) -> dict:
cmd_type = cmd.get("command_type", "")
cmd_data = cmd.get("command_data", {})
try:
if cmd_type == "restart_service":
svc = cmd_data.get("service", "")
if not svc or "/" in svc:
return {"success": False, "error": "Invalid service name"}
r = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True, timeout=30)
return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr}
elif cmd_type == "get_logs":
svc = cmd_data.get("service", "")
lines = min(int(cmd_data.get("lines", 50)), 200)
if not svc or "/" in svc:
return {"success": False, "error": "Invalid service name"}
r = subprocess.run(["journalctl", "-u", svc, "-n", str(lines), "--no-pager"],
capture_output=True, text=True, timeout=15)
return {"success": True, "output": r.stdout}
elif cmd_type == "ping":
host = cmd_data.get("host", "8.8.8.8")
r = subprocess.run(["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=15)
return {"success": r.returncode == 0, "output": r.stdout}
elif cmd_type == "update":
updated = self_update(cfg)
return {"success": True, "updated": updated}
elif cmd_type == "shell":
# Only allow if explicitly enabled in config
if not cmd_data.get("allowed", False):
return {"success": False, "error": "Shell commands not enabled"}
cmd_str = cmd_data.get("command", "")
r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30)
return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]}
else:
return {"success": False, "error": f"Unknown command type: {cmd_type}"}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Command timed out"}
except Exception as e:
return {"success": False, "error": str(e)}
# ── Main loop ─────────────────────────────────────────────────────────────────
def main():
global _host_header
cfg = load_config()
state = load_state()
jarvis_url = cfg["jarvis_url"].rstrip("/")
ssl_verify = bool(cfg.get("ssl_verify", True))
_host_header = cfg.get("host_header", "")
poll_interval = int(cfg.get("poll_interval", 30))
heartbeat_every = int(cfg.get("heartbeat_every", 10))
# Register if no API key yet
api_key = state.get("api_key", "")
if not api_key:
api_key = register(cfg, state)
if not api_key:
print("[ERROR] Could not register with JARVIS. Retrying in 60s...", flush=True)
time.sleep(60)
main()
return
headers = {"X-Agent-Key": api_key}
last_metrics = 0
last_update_chk = 0
update_interval = int(cfg.get("update_check_hours", 24)) * 3600
tick = 0
print(f"[JARVIS] Agent v{AGENT_VERSION} running. Polling {jarvis_url} every {heartbeat_every}s.", flush=True)
while True:
tick += 1
now = time.time()
try:
# Heartbeat + get commands
hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {"version": AGENT_VERSION}, headers, ssl_verify=ssl_verify)
if "error" in hb:
print(f"[WARN] Heartbeat failed: {hb['error']}", flush=True)
else:
commands = hb.get("commands", [])
for cmd in commands:
print(f"[CMD] Executing: {cmd['command_type']}", flush=True)
result = execute_command(cmd)
api_post(f"{jarvis_url}/api/agent/command_result",
{"command_id": cmd["id"], "success": result.get("success", False), "result": result},
headers, ssl_verify=ssl_verify)
# Self-update check (every update_interval seconds, default 24h)
if now - last_update_chk >= update_interval:
last_update_chk = now
self_update(cfg) # restarts process if update found
# Push metrics every poll_interval seconds
if now - last_metrics >= poll_interval:
metrics = collect_metrics(cfg)
api_post(f"{jarvis_url}/api/agent/metrics",
{"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify)
# Proxmox metrics if available
if "proxmox" in detect_capabilities(cfg):
px = collect_proxmox_metrics(cfg)
if px:
api_post(f"{jarvis_url}/api/agent/metrics",
{"type": "proxmox", "data": px}, headers, ssl_verify=ssl_verify)
last_metrics = now
except Exception as e:
print(f"[ERROR] Loop error: {e}", flush=True)
time.sleep(heartbeat_every)
# ── Self-update ────────────────────────────────────────────────────────────────
def self_update(cfg: dict) -> bool:
"""Check JARVIS server for a newer version of this script. If different, replace and restart."""
jarvis_url = cfg.get("jarvis_url", "").rstrip("/")
default_update_url = f"{jarvis_url}/agent/jarvis-agent.py" if jarvis_url else ""
update_url = cfg.get("update_url", default_update_url)
if not update_url:
return False
script_path = os.path.abspath(__file__)
try:
req = urllib.request.Request(update_url)
req.add_header("User-Agent", "JARVIS-Agent/1.0")
with urllib.request.urlopen(req, timeout=30) as resp:
new_content = resp.read()
with open(script_path, "rb") as f:
current = f.read()
if new_content != current:
print(f"[JARVIS] Update available — replacing {script_path} and restarting...", flush=True)
with open(script_path, "wb") as f:
f.write(new_content)
os.execv(sys.executable, [sys.executable] + sys.argv)
return True
return False
except Exception as e:
print(f"[JARVIS] Self-update check failed: {e}", flush=True)
return False
if __name__ == "__main__":
main()