#!/usr/bin/env python3 """ JARVIS Agent for macOS — system monitor that reports metrics to JARVIS HUD. Install: bash <(curl -sSL https://jarvis.orbishosting.com/agent/install-mac.sh) --key YOUR_KEY Config: ~/.jarvis-agent/config.json (or $JARVIS_CONFIG) Logs: ~/.jarvis-agent/jarvis-agent.log (or journalctl via launchd) """ import json import os import platform import re import socket import subprocess import sys import time import urllib.request import urllib.error import hashlib from datetime import datetime from pathlib import Path AGENT_VERSION = "3.0" # Config/state paths — env vars let the launchd plist override them _default_dir = Path.home() / ".jarvis-agent" CONFIG_PATH = Path(os.environ.get("JARVIS_CONFIG", str(_default_dir / "config.json"))) STATE_PATH = Path(os.environ.get("JARVIS_STATE", str(_default_dir / "state.json"))) LOG_PATH = _default_dir / "jarvis-agent.log" # ── Logging ──────────────────────────────────────────────────────────────────── def log(msg: str): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line, flush=True) try: LOG_PATH.parent.mkdir(parents=True, exist_ok=True) with open(LOG_PATH, "a") as f: f.write(line + "\n") except Exception: pass # ── Config ───────────────────────────────────────────────────────────────────── def load_config() -> dict: if not CONFIG_PATH.exists(): print(f"[ERROR] Config not found at {CONFIG_PATH}. Run the installer first.") sys.exit(1) with open(CONFIG_PATH) as f: return json.load(f) def load_state() -> dict: if STATE_PATH.exists(): with open(STATE_PATH) as f: return json.load(f) return {} def save_state(state: dict): STATE_PATH.parent.mkdir(parents=True, exist_ok=True) with open(STATE_PATH, "w") as f: json.dump(state, f, indent=2) # ── HTTP ─────────────────────────────────────────────────────────────────────── import ssl as _ssl def _make_ssl_ctx(verify: bool): if not verify: ctx = _ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = _ssl.CERT_NONE return ctx return None _host_header: str = "" def api_post(url: str, payload: dict, headers: dict = {}, timeout: int = 15, ssl_verify: bool = True) -> dict: body = json.dumps(payload).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: return {"error": f"HTTP {e.code}: {e.read().decode()[:200]}"} except Exception as e: return {"error": str(e)} def api_get(url: str, headers: dict = {}, timeout: int = 10, ssl_verify: bool = True) -> dict: req = urllib.request.Request(url) req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0") if _host_header: req.add_header("Host", _host_header) for k, v in headers.items(): req.add_header(k, v) try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return json.loads(resp.read().decode()) except Exception as e: return {"error": str(e)} # ── Metrics ──────────────────────────────────────────────────────────────────── def get_local_ip() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "unknown" _last_cpu_times = None def get_cpu_percent() -> float: """Delta-based CPU % using top (two samples).""" global _last_cpu_times try: # top -l 2 gives two snapshots; second line has the real delta r = subprocess.run( ["top", "-l", "2", "-s", "0", "-n", "0"], capture_output=True, text=True, timeout=12 ) idle = None for line in r.stdout.splitlines(): if "CPU usage:" in line: m = re.search(r"([\d.]+)%\s+idle", line) if m: idle = float(m.group(1)) if idle is not None: return round(100.0 - idle, 1) except Exception: pass return 0.0 def get_memory() -> dict: try: # Total physical memory r = subprocess.run(["sysctl", "-n", "hw.memsize"], capture_output=True, text=True, timeout=3) total_bytes = int(r.stdout.strip()) # vm_stat for page counts; default page size on Apple Silicon and Intel = 4096 page_size = 4096 try: ps_r = subprocess.run(["sysctl", "-n", "hw.pagesize"], capture_output=True, text=True, timeout=3) page_size = int(ps_r.stdout.strip()) except Exception: pass vm_r = subprocess.run(["vm_stat"], capture_output=True, text=True, timeout=5) pages = {} for line in vm_r.stdout.splitlines(): m = re.match(r"Pages\s+(.+?):\s+([\d]+)", line) if m: pages[m.group(1).strip().lower()] = int(m.group(2)) free_pages = pages.get("free", 0) + pages.get("speculative", 0) # "available" = free + inactive (can be reclaimed) avail_pages = free_pages + pages.get("inactive", 0) used_pages = (total_bytes // page_size) - avail_pages total_mb = round(total_bytes / (1024 * 1024), 1) used_mb = round(used_pages * page_size / (1024 * 1024), 1) avail_mb = round(avail_pages * page_size / (1024 * 1024), 1) used_mb = max(0, min(used_mb, total_mb)) return { "total_mb": total_mb, "used_mb": used_mb, "free_mb": avail_mb, "percent": round(used_mb / total_mb * 100, 1) if total_mb else 0, } except Exception: return {} def get_disk() -> list: disks = [] try: r = subprocess.run(["df", "-H", "-l"], capture_output=True, text=True, timeout=5) for line in r.stdout.splitlines()[1:]: parts = line.split() if len(parts) >= 6: mount = parts[5] if not any(mount.startswith(x) for x in ["/dev", "/private/var/vm", "/System/Volumes/VM"]): disks.append({ "mount": mount, "size": parts[1], "used": parts[2], "avail": parts[3], "percent": parts[4].rstrip("%"), }) except Exception: pass return disks def get_uptime() -> dict: try: r = subprocess.run(["sysctl", "-n", "kern.boottime"], capture_output=True, text=True, timeout=3) m = re.search(r"sec\s*=\s*(\d+)", r.stdout) if m: boot_ts = int(m.group(1)) secs = time.time() - boot_ts days = int(secs // 86400) hours = int((secs % 86400) // 3600) minutes = int((secs % 3600) // 60) return {"seconds": int(secs), "days": days, "hours": hours, "minutes": minutes, "human": f"{days}d {hours}h {minutes}m"} except Exception: pass return {} def get_load() -> list: try: r = subprocess.run(["sysctl", "-n", "vm.loadavg"], capture_output=True, text=True, timeout=3) # format: "{ 0.12 0.34 0.56 }" nums = re.findall(r"[\d.]+", r.stdout) if len(nums) >= 3: return [float(nums[0]), float(nums[1]), float(nums[2])] except Exception: pass return [0, 0, 0] def get_services(cfg: dict) -> list: """Check macOS LaunchDaemon/LaunchAgent services via launchctl.""" watch = cfg.get("watch_services", []) if not watch: return [] statuses = [] try: r = subprocess.run(["launchctl", "list"], capture_output=True, text=True, timeout=5) running = set() for line in r.stdout.splitlines(): parts = line.split() if len(parts) >= 3: running.add(parts[2]) except Exception: running = set() for svc in watch: status = "active" if any(svc in s for s in running) else "inactive" statuses.append({"service": svc, "status": status}) return statuses def detect_capabilities(cfg: dict) -> list: import shutil caps = ["metrics", "commands"] if shutil.which("docker"): caps.append("docker") if shutil.which("ollama"): caps.append("ollama") # screencapture is always available on macOS caps.append("screenshot") caps.append("sysinfo") return caps def collect_metrics(cfg: dict) -> dict: return { "hostname": cfg.get("hostname", socket.gethostname()), "cpu_percent": get_cpu_percent(), "memory": get_memory(), "disk": get_disk(), "uptime": get_uptime(), "load": get_load(), "services": get_services(cfg), "platform": "macOS", "os_version": platform.mac_ver()[0], "timestamp": datetime.utcnow().isoformat() + "Z", } # ── Registration ─────────────────────────────────────────────────────────────── def register(cfg: dict, state: dict) -> str: hostname = cfg.get("hostname", socket.gethostname()) agent_type = cfg.get("agent_type", "macos") ip = get_local_ip() capabilities = detect_capabilities(cfg) agent_id = cfg.get("agent_id", f"{hostname}_mac") ssl_verify = bool(cfg.get("ssl_verify", True)) log(f"Registering as '{agent_id}' ({agent_type}) from {ip}...") result = api_post( f"{cfg['jarvis_url']}/api/agent/register", {"hostname": hostname, "version": AGENT_VERSION, "agent_type": agent_type, "ip_address": ip, "capabilities": capabilities, "agent_id": agent_id}, headers={"X-Registration-Key": cfg["registration_key"]}, ssl_verify=ssl_verify, ) if "error" in result: log(f"[ERROR] Registration failed: {result['error']}") return "" api_key = result.get("api_key", "") if api_key: state["api_key"] = api_key state["agent_id"] = result.get("agent_id", agent_id) save_state(state) log(f"Registered. agent_id={state['agent_id']}") return api_key # ── Screenshot ───────────────────────────────────────────────────────────────── def _take_screenshot(cmd_data: dict) -> dict: import base64, tempfile, shutil tmp = tempfile.mktemp(suffix=".png") method = "unknown" # screencapture -x = no sound, always available on macOS try: r = subprocess.run(["screencapture", "-x", "-t", "png", tmp], capture_output=True, timeout=15) if r.returncode == 0 and os.path.exists(tmp): method = "screencapture" except Exception: pass # Fallback: PIL if method == "unknown": try: from PIL import ImageGrab img = ImageGrab.grab() img.save(tmp, "PNG") method = "pil" except Exception: pass if method == "unknown" or not os.path.exists(tmp): snap = _sysinfo_snapshot() snap["screenshot_available"] = False snap["method"] = "text_only" return snap try: with open(tmp, "rb") as f: raw = f.read() b64 = base64.b64encode(raw).decode() try: os.unlink(tmp) except Exception: pass return { "success": True, "method": method, "image_b64": b64, "image_mime": "image/png", "file_size": len(raw), "hostname": socket.gethostname(), } except Exception as e: return {"success": False, "error": str(e), "method": method} # ── Sysinfo ──────────────────────────────────────────────────────────────────── def _sysinfo_snapshot() -> dict: data = {"success": True, "hostname": socket.gethostname(), "snapshot_type": "text", "screenshot_available": False, "platform": "macOS"} try: mem = get_memory() data["mem_total_mb"] = int(mem.get("total_mb", 0)) data["mem_used_mb"] = int(mem.get("used_mb", 0)) data["mem_avail_mb"] = int(mem.get("free_mb", 0)) except Exception: pass try: load = get_load() data["load_1m"], data["load_5m"], data["load_15m"] = load[0], load[1], load[2] except Exception: pass try: r = subprocess.run(["df", "-H", "/"], capture_output=True, text=True, timeout=5) data["disk"] = r.stdout.splitlines()[1] if r.stdout else "" except Exception: pass try: r = subprocess.run(["ps", "aux", "-r"], capture_output=True, text=True, timeout=5) data["top_procs"] = r.stdout.splitlines()[1:8] except Exception: pass try: r = subprocess.run(["netstat", "-an", "-p", "tcp"], capture_output=True, text=True, timeout=5) listening = [l for l in r.stdout.splitlines() if "LISTEN" in l] data["listening_ports"] = "\n".join(listening[:20]) except Exception: pass return data # ── Command execution ────────────────────────────────────────────────────────── def execute_command(cmd: dict) -> dict: cmd_type = cmd.get("command_type", "") cmd_data = cmd.get("command_data", {}) try: if cmd_type == "ping": host = cmd_data.get("host", "8.8.8.8") r = subprocess.run(["ping", "-c", "3", "-W", "2000", host], capture_output=True, text=True, timeout=15) return {"success": r.returncode == 0, "output": r.stdout} elif cmd_type == "update": _cfg = load_config() updated = self_update(_cfg) return {"success": True, "updated": updated} elif cmd_type == "shell": _cfg = load_config() if not _cfg.get("allow_shell_commands", False): return {"success": False, "error": "Shell commands not enabled in agent config"} cmd_str = cmd_data.get("command", "") r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30) return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]} elif cmd_type == "restart_service": svc = cmd_data.get("service", "") if not svc or "/" in svc: return {"success": False, "error": "Invalid service name"} r = subprocess.run(["launchctl", "kickstart", "-k", f"system/{svc}"], capture_output=True, text=True, timeout=15) if r.returncode != 0: r = subprocess.run(["brew", "services", "restart", svc], capture_output=True, text=True, timeout=15) return {"success": r.returncode == 0, "stdout": r.stdout, "stderr": r.stderr} elif cmd_type == "get_logs": svc = cmd_data.get("service", "") lines = min(int(cmd_data.get("lines", 50)), 200) r = subprocess.run(["log", "show", "--predicate", f'process == "{svc}"', "--last", "1h", "--style", "compact"], capture_output=True, text=True, timeout=15) output = "\n".join(r.stdout.splitlines()[-lines:]) return {"success": True, "output": output} elif cmd_type == "screenshot": return _take_screenshot(cmd_data) elif cmd_type == "sysinfo": return _sysinfo_snapshot() else: return {"success": False, "error": f"Unknown command type: {cmd_type}"} except subprocess.TimeoutExpired: return {"success": False, "error": "Command timed out"} except Exception as e: return {"success": False, "error": str(e)} # ── Self-update ──────────────────────────────────────────────────────────────── def self_update(cfg: dict) -> bool: jarvis_url = cfg.get("jarvis_url", "").rstrip("/") default_update_url = f"{jarvis_url}/agent/jarvis-agent-mac.py" if jarvis_url else "" update_url = cfg.get("update_url", default_update_url) if not update_url: return False script_path = os.path.abspath(__file__) ssl_verify = bool(cfg.get("ssl_verify", True)) try: # Download expected hash hash_url = update_url + ".sha256" req_hash = urllib.request.Request(hash_url) req_hash.add_header("User-Agent", "JARVIS-Agent-macOS/3.0") if _host_header: req_hash.add_header("Host", _host_header) expected_hash = None try: ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req_hash, timeout=10, context=ctx) as resp: expected_hash = resp.read().decode().strip().split()[0] except Exception: pass # Download new script req = urllib.request.Request(update_url) req.add_header("User-Agent", "JARVIS-Agent-macOS/3.0") if _host_header: req.add_header("Host", _host_header) ctx = _make_ssl_ctx(ssl_verify) with urllib.request.urlopen(req, timeout=30, context=ctx) as resp: new_content = resp.read() if expected_hash: actual_hash = hashlib.sha256(new_content).hexdigest() if actual_hash != expected_hash: log(f"Update hash mismatch (expected {expected_hash[:16]}… got {actual_hash[:16]}…) — aborting") return False with open(script_path, "rb") as f: current = f.read() if new_content != current: log(f"Update verified — replacing {script_path} and restarting...") with open(script_path, "wb") as f: f.write(new_content) os.execv(sys.executable, [sys.executable] + sys.argv) return True return False except Exception as e: log(f"Self-update check failed: {e}") return False # ── Main loop ────────────────────────────────────────────────────────────────── def main(): global _host_header cfg = load_config() state = load_state() jarvis_url = cfg["jarvis_url"].rstrip("/") ssl_verify = bool(cfg.get("ssl_verify", True)) _host_header = cfg.get("host_header", "") poll_interval = int(cfg.get("poll_interval", 30)) heartbeat_every = int(cfg.get("heartbeat_every", 10)) update_interval = int(cfg.get("update_check_hours", 24)) * 3600 # Always re-register on startup to refresh capabilities, version, IP api_key = state.get("api_key", "") registered_key = register(cfg, state) if registered_key: api_key = registered_key elif not api_key: while not api_key: api_key = register(cfg, state) if not api_key: log("[ERROR] Could not register with JARVIS. Retrying in 60s...") time.sleep(60) headers = {"X-Agent-Key": api_key} last_metrics = 0 last_update_chk = 0 log(f"Agent v{AGENT_VERSION} (macOS) running. Polling {jarvis_url} every {heartbeat_every}s.") while True: tick_start = time.time() now = tick_start try: hb = api_post(f"{jarvis_url}/api/agent/heartbeat", {}, headers, ssl_verify=ssl_verify) if "error" in hb: log(f"[WARN] Heartbeat failed: {hb['error']}") else: for cmd in hb.get("commands", []): log(f"[CMD] Executing: {cmd['command_type']}") result = execute_command(cmd) api_post(f"{jarvis_url}/api/agent/command_result", {"command_id": cmd["id"], "success": result.get("success", False), "result": result}, headers, ssl_verify=ssl_verify) # Self-update check if now - last_update_chk >= update_interval: last_update_chk = now self_update(cfg) # Push metrics if now - last_metrics >= poll_interval: metrics = collect_metrics(cfg) api_post(f"{jarvis_url}/api/agent/metrics", {"type": "system", "data": metrics}, headers, ssl_verify=ssl_verify) last_metrics = now except Exception as e: log(f"[ERROR] Loop error: {e}") time.sleep(heartbeat_every) if __name__ == "__main__": main()