Files
jarvis/agent/jarvis-agent-mac.py
myron 0469b31829 Agent version tracking — workers tab shows current vs latest version
- Add version column to registered_agents table
- Agents send version on registration (Linux 3.1, Windows 3.0, macOS 3.0)
- workers_list API returns latest_versions per platform
- Workers tab: VERSION column with green check (up-to-date) or red (outdated)
- Outdated agents highlight row and show blue UPDATE button
- Up-to-date agents show dimmed UPDATE button
- Update button dispatches update command immediately

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-12 01:44:59 +00:00

577 lines
22 KiB
Python

#!/usr/bin/env python3
"""
JARVIS Agent for macOS — system monitor that reports metrics to JARVIS HUD.
Install: bash <(curl -sSL https://jarvis.orbishosting.com/agent/install-mac.sh) --key YOUR_KEY
Config: ~/.jarvis-agent/config.json (or $JARVIS_CONFIG)
Logs: ~/.jarvis-agent/jarvis-agent.log (or journalctl via launchd)
"""
import json
import os
import platform
import re
import socket
import subprocess
import sys
import time
import urllib.request
import urllib.error
import hashlib
from datetime import datetime
from pathlib import Path
AGENT_VERSION = "3.0"
# Config/state paths — env vars let the launchd plist override them
_default_dir = Path.home() / ".jarvis-agent"
CONFIG_PATH = Path(os.environ.get("JARVIS_CONFIG", str(_default_dir / "config.json")))
STATE_PATH = Path(os.environ.get("JARVIS_STATE", str(_default_dir / "state.json")))
LOG_PATH = _default_dir / "jarvis-agent.log"
# ── Logging ────────────────────────────────────────────────────────────────────
def log(msg: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line, flush=True)
try:
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_PATH, "a") as f:
f.write(line + "\n")
except Exception:
pass
# ── Config ─────────────────────────────────────────────────────────────────────
def load_config() -> dict:
if not CONFIG_PATH.exists():
print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.")
sys.exit(1)
with open(CONFIG_PATH) as f:
return json.load(f)
def load_state() -> dict:
if STATE_PATH.exists():
with open(STATE_PATH) as f:
return json.load(f)
return {}
def save_state(state: dict):
STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(STATE_PATH, "w") as f:
json.dump(state, f, indent=2)
# ── HTTP ───────────────────────────────────────────────────────────────────────
import ssl as _ssl
def _make_ssl_ctx(verify: bool):
if not verify:
ctx = _ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = _ssl.CERT_NONE
return ctx
return None
_host_header: str = ""
def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15,
ssl_verify: bool = True) -> dict:
body = json.dumps(payload).encode()
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0")
if _host_header:
req.add_header("Host", _host_header)
for k, v in headers.items():
req.add_header(k, v)
try:
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"}
except Exception as e:
return {"error": str(e)}
def api_get(url: str, headers: dict = {}, timeout: int = 10,
ssl_verify: bool = True) -> dict:
req = urllib.request.Request(url)
req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0")
if _host_header:
req.add_header("Host", _host_header)
for k, v in headers.items():
req.add_header(k, v)
try:
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return json.loads(resp.read().decode())
except Exception as e:
return {"error": str(e)}
# ── Metrics ────────────────────────────────────────────────────────────────────
def get_local_ip() -> str:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "unknown"
_last_cpu_times = None
def get_cpu_percent() -> float:
"""Delta-based CPU % using top (two samples)."""
global _last_cpu_times
try:
# top -l 2 gives two snapshots; second line has the real delta
r = subprocess.run(
["top", "-l", "2", "-s", "0", "-n", "0"],
capture_output=True, text=True, timeout=12
)
idle = None
for line in r.stdout.splitlines():
if "CPU usage:" in line:
m = re.search(r"([\d.]+)%\s+idle", line)
if m:
idle = float(m.group(1))
if idle is not None:
return round(100.0 - idle, 1)
except Exception:
pass
return 0.0
def get_memory() -> dict:
try:
# Total physical memory
r = subprocess.run(["sysctl", "-n", "hw.memsize"], capture_output=True, text=True, timeout=3)
total_bytes = int(r.stdout.strip())
# vm_stat for page counts; default page size on Apple Silicon and Intel = 4096
page_size = 4096
try:
ps_r = subprocess.run(["sysctl", "-n", "hw.pagesize"], capture_output=True, text=True, timeout=3)
page_size = int(ps_r.stdout.strip())
except Exception:
pass
vm_r = subprocess.run(["vm_stat"], capture_output=True, text=True, timeout=5)
pages = {}
for line in vm_r.stdout.splitlines():
m = re.match(r"Pages\s+(.+?):\s+([\d]+)", line)
if m:
pages[m.group(1).strip().lower()] = int(m.group(2))
free_pages = pages.get("free", 0) + pages.get("speculative", 0)
# "available" = free + inactive (can be reclaimed)
avail_pages = free_pages + pages.get("inactive", 0)
used_pages = (total_bytes // page_size) - avail_pages
total_mb = round(total_bytes / (1024 * 1024), 1)
used_mb = round(used_pages * page_size / (1024 * 1024), 1)
avail_mb = round(avail_pages * page_size / (1024 * 1024), 1)
used_mb = max(0, min(used_mb, total_mb))
return {
"total_mb": total_mb,
"used_mb": used_mb,
"free_mb": avail_mb,
"percent": round(used_mb / total_mb * 100, 1) if total_mb else 0,
}
except Exception:
return {}
def get_disk() -> list:
disks = []
try:
r = subprocess.run(["df", "-H", "-l"], capture_output=True, text=True, timeout=5)
for line in r.stdout.splitlines()[1:]:
parts = line.split()
if len(parts) >= 6:
mount = parts[5]
if not any(mount.startswith(x) for x in ["/dev", "/private/var/vm", "/System/Volumes/VM"]):
disks.append({
"mount": mount,
"size": parts[1],
"used": parts[2],
"avail": parts[3],
"percent": parts[4].rstrip("%"),
})
except Exception:
pass
return disks
def get_uptime() -> dict:
try:
r = subprocess.run(["sysctl", "-n", "kern.boottime"], capture_output=True, text=True, timeout=3)
m = re.search(r"sec\s*=\s*(\d+)", r.stdout)
if m:
boot_ts = int(m.group(1))
secs = time.time() - boot_ts
days = int(secs // 86400)
hours = int((secs % 86400) // 3600)
minutes = int((secs % 3600) // 60)
return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes,
"human": f"{days}d {hours}h {minutes}m"}
except Exception:
pass
return {}
def get_load() -> list:
try:
r = subprocess.run(["sysctl", "-n", "vm.loadavg"], capture_output=True, text=True, timeout=3)
# format: "{ 0.12 0.34 0.56 }"
nums = re.findall(r"[\d.]+", r.stdout)
if len(nums) >= 3:
return [float(nums[0]), float(nums[1]), float(nums[2])]
except Exception:
pass
return [0, 0, 0]
def get_services(cfg: dict) -> list:
"""Check macOS LaunchDaemon/LaunchAgent services via launchctl."""
watch = cfg.get("watch_services", [])
if not watch:
return []
statuses = []
try:
r = subprocess.run(["launchctl", "list"], capture_output=True, text=True, timeout=5)
running = set()
for line in r.stdout.splitlines():
parts = line.split()
if len(parts) >= 3:
running.add(parts[2])
except Exception:
running = set()
for svc in watch:
status = "active" if any(svc in s for s in running) else "inactive"
statuses.append({"service": svc, "status": status})
return statuses
def detect_capabilities(cfg: dict) -> list:
import shutil
caps = ["metrics", "commands"]
if shutil.which("docker"):
caps.append("docker")
if shutil.which("ollama"):
caps.append("ollama")
# screencapture is always available on macOS
caps.append("screenshot")
caps.append("sysinfo")
return caps
def collect_metrics(cfg: dict) -> dict:
return {
"hostname": cfg.get("hostname", socket.gethostname()),
"cpu_percent": get_cpu_percent(),
"memory": get_memory(),
"disk": get_disk(),
"uptime": get_uptime(),
"load": get_load(),
"services": get_services(cfg),
"platform": "macOS",
"os_version": platform.mac_ver()[0],
"timestamp": datetime.utcnow().isoformat() + "Z",
}
# ── Registration ───────────────────────────────────────────────────────────────
def register(cfg: dict, state: dict) -> str:
hostname = cfg.get("hostname", socket.gethostname())
agent_type = cfg.get("agent_type", "macos")
ip = get_local_ip()
capabilities = detect_capabilities(cfg)
agent_id = cfg.get("agent_id", f"{hostname}_mac")
ssl_verify = bool(cfg.get("ssl_verify", True))
log(f"Registering as '{agent_id}' ({agent_type}) from {ip}...")
result = api_post(
f"{cfg['jarvis_url']}/api/agent/register",
{"hostname": hostname, "version": AGENT_VERSION, "agent_type": agent_type, "ip_address": ip,
"capabilities": capabilities, "agent_id": agent_id},
headers={"X-Registration-Key": cfg["registration_key"]},
ssl_verify=ssl_verify,
)
if "error" in result:
log(f"[ERROR] Registration failed: {result['error']}")
return ""
api_key = result.get("api_key", "")
if api_key:
state["api_key"] = api_key
state["agent_id"] = result.get("agent_id", agent_id)
save_state(state)
log(f"Registered. agent_id={state['agent_id']}")
return api_key
# ── Screenshot ─────────────────────────────────────────────────────────────────
def _take_screenshot(cmd_data: dict) -> dict:
import base64, tempfile, shutil
tmp = tempfile.mktemp(suffix=".png")
method = "unknown"
# screencapture -x = no sound, always available on macOS
try:
r = subprocess.run(["screencapture", "-x", "-t", "png", tmp],
capture_output=True, timeout=15)
if r.returncode == 0 and os.path.exists(tmp):
method = "screencapture"
except Exception:
pass
# Fallback: PIL
if method == "unknown":
try:
from PIL import ImageGrab
img = ImageGrab.grab()
img.save(tmp, "PNG")
method = "pil"
except Exception:
pass
if method == "unknown" or not os.path.exists(tmp):
snap = _sysinfo_snapshot()
snap["screenshot_available"] = False
snap["method"] = "text_only"
return snap
try:
with open(tmp, "rb") as f:
raw = f.read()
b64 = base64.b64encode(raw).decode()
try:
os.unlink(tmp)
except Exception:
pass
return {
"success": True,
"method": method,
"image_b64": b64,
"image_mime": "image/png",
"file_size": len(raw),
"hostname": socket.gethostname(),
}
except Exception as e:
return {"success": False, "error": str(e), "method": method}
# ── Sysinfo ────────────────────────────────────────────────────────────────────
def _sysinfo_snapshot() -> dict:
data = {"success": True, "hostname": socket.gethostname(),
"snapshot_type": "text", "screenshot_available": False, "platform": "macOS"}
try:
mem = get_memory()
data["mem_total_mb"] = int(mem.get("total_mb", 0))
data["mem_used_mb"] = int(mem.get("used_mb", 0))
data["mem_avail_mb"] = int(mem.get("free_mb", 0))
except Exception:
pass
try:
load = get_load()
data["load_1m"], data["load_5m"], data["load_15m"] = load[0], load[1], load[2]
except Exception:
pass
try:
r = subprocess.run(["df", "-H", "/"], capture_output=True, text=True, timeout=5)
data["disk"] = r.stdout.splitlines()[1] if r.stdout else ""
except Exception:
pass
try:
r = subprocess.run(["ps", "aux", "-r"], capture_output=True, text=True, timeout=5)
data["top_procs"] = r.stdout.splitlines()[1:8]
except Exception:
pass
try:
r = subprocess.run(["netstat", "-an", "-p", "tcp"], capture_output=True, text=True, timeout=5)
listening = [l for l in r.stdout.splitlines() if "LISTEN" in l]
data["listening_ports"] = "\n".join(listening[:20])
except Exception:
pass
return data
# ── Command execution ──────────────────────────────────────────────────────────
def execute_command(cmd: dict) -> dict:
cmd_type = cmd.get("command_type", "")
cmd_data = cmd.get("command_data", {})
try:
if cmd_type == "ping":
host = cmd_data.get("host", "8.8.8.8")
r = subprocess.run(["ping", "-c", "3", "-W", "2000", host],
capture_output=True, text=True, timeout=15)
return {"success": r.returncode == 0, "output": r.stdout}
elif cmd_type == "update":
_cfg = load_config()
updated = self_update(_cfg)
return {"success": True, "updated": updated}
elif cmd_type == "shell":
_cfg = load_config()
if not _cfg.get("allow_shell_commands", False):
return {"success": False, "error": "Shell commands not enabled in agent config"}
cmd_str = cmd_data.get("command", "")
r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30)
return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]}
elif cmd_type == "restart_service":
svc = cmd_data.get("service", "")
if not svc or "/" in svc:
return {"success": False, "error": "Invalid service name"}
r = subprocess.run(["launchctl", "kickstart", "-k", f"system/{svc}"],
capture_output=True, text=True, timeout=15)
if r.returncode != 0:
r = subprocess.run(["brew", "services", "restart", svc],
capture_output=True, text=True, timeout=15)
return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr}
elif cmd_type == "get_logs":
svc = cmd_data.get("service", "")
lines = min(int(cmd_data.get("lines", 50)), 200)
r = subprocess.run(["log", "show", "--predicate", f'process == "{svc}"',
"--last", "1h", "--style", "compact"],
capture_output=True, text=True, timeout=15)
output = "\n".join(r.stdout.splitlines()[-lines:])
return {"success": True, "output": output}
elif cmd_type == "screenshot":
return _take_screenshot(cmd_data)
elif cmd_type == "sysinfo":
return _sysinfo_snapshot()
else:
return {"success": False, "error": f"Unknown command type: {cmd_type}"}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Command timed out"}
except Exception as e:
return {"success": False, "error": str(e)}
# ── Self-update ────────────────────────────────────────────────────────────────
def self_update(cfg: dict) -> bool:
jarvis_url = cfg.get("jarvis_url", "").rstrip("/")
default_update_url = f"{jarvis_url}/agent/jarvis-agent-mac.py" if jarvis_url else ""
update_url = cfg.get("update_url", default_update_url)
if not update_url:
return False
script_path = os.path.abspath(__file__)
ssl_verify = bool(cfg.get("ssl_verify", True))
try:
# Download expected hash
hash_url = update_url + ".sha256"
req_hash = urllib.request.Request(hash_url)
req_hash.add_header("User-Agent", "JARVIS-Agent-macOS/3.0")
if _host_header:
req_hash.add_header("Host", _host_header)
expected_hash = None
try:
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req_hash, timeout=10, context=ctx) as resp:
expected_hash = resp.read().decode().strip().split()[0]
except Exception:
pass
# Download new script
req = urllib.request.Request(update_url)
req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0")
if _host_header:
req.add_header("Host", _host_header)
ctx = _make_ssl_ctx(ssl_verify)
with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
new_content = resp.read()
if expected_hash:
actual_hash = hashlib.sha256(new_content).hexdigest()
if actual_hash != expected_hash:
log(f"Update hash mismatch (expected {expected_hash[:16]}… got {actual_hash[:16]}…) — aborting")
return False
with open(script_path, "rb") as f:
current = f.read()
if new_content != current:
log(f"Update verified — replacing {script_path} and restarting...")
with open(script_path, "wb") as f:
f.write(new_content)
os.execv(sys.executable, [sys.executable] + sys.argv)
return True
return False
except Exception as e:
log(f"Self-update check failed: {e}")
return False
# ── Main loop ──────────────────────────────────────────────────────────────────
def main():
global _host_header
cfg = load_config()
state = load_state()
jarvis_url = cfg["jarvis_url"].rstrip("/")
ssl_verify = bool(cfg.get("ssl_verify", True))
_host_header = cfg.get("host_header", "")
poll_interval = int(cfg.get("poll_interval", 30))
heartbeat_every = int(cfg.get("heartbeat_every", 10))
update_interval = int(cfg.get("update_check_hours", 24)) * 3600
# Always re-register on startup to refresh capabilities, version, IP
api_key = state.get("api_key", "")
registered_key = register(cfg, state)
if registered_key:
api_key = registered_key
elif not api_key:
while not api_key:
api_key = register(cfg, state)
if not api_key:
log("[ERROR] Could not register with JARVIS. Retrying in 60s...")
time.sleep(60)
headers = {"X-Agent-Key": api_key}
last_metrics = 0
last_update_chk = 0
log(f"Agent v{AGENT_VERSION} (macOS) running. Polling {jarvis_url} every {heartbeat_every}s.")
while True:
tick_start = time.time()
now = tick_start
try:
hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {}, headers, ssl_verify=ssl_verify)
if "error" in hb:
log(f"[WARN] Heartbeat failed: {hb['error']}")
else:
for cmd in hb.get("commands", []):
log(f"[CMD] Executing: {cmd['command_type']}")
result = execute_command(cmd)
api_post(f"{jarvis_url}/api/agent/command_result",
{"command_id": cmd["id"], "success": result.get("success", False), "result": result},
headers, ssl_verify=ssl_verify)
# Self-update check
if now - last_update_chk >= update_interval:
last_update_chk = now
self_update(cfg)
# Push metrics
if now - last_metrics >= poll_interval:
metrics = collect_metrics(cfg)
api_post(f"{jarvis_url}/api/agent/metrics",
{"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify)
last_metrics = now
except Exception as e:
log(f"[ERROR] Loop error: {e}")
time.sleep(heartbeat_every)
if __name__ == "__main__":
main()