agent: v3.0 — fix cfg scope bug in execute_command (update + shell commands)

self_update(cfg) and shell allow_shell_commands check both referenced cfg
from run() scope, but execute_command() is a standalone function. Fixed by
calling load_config() locally in each branch that needs it.
This commit is contained in:
2026-06-11 21:10:08 +00:00
parent 2767a858dd
commit 8b7f597e76
3 changed files with 214 additions and 6 deletions
+209 -3
View File
@@ -23,7 +23,7 @@ from pathlib import Path
CONFIG_PATH = "/etc/jarvis-agent/config.json"
STATE_PATH = "/var/lib/jarvis-agent/state.json"
AGENT_VERSION = "2.3" # bumped on each release
AGENT_VERSION = "3.0" # Phase 4: screenshot + sysinfo commands
# ── Config helpers ────────────────────────────────────────────────────────────
@@ -119,6 +119,12 @@ def detect_capabilities(cfg: dict) -> list:
# Check for Home Assistant
if os.path.exists("/etc/homeassistant") or os.path.exists("/config/configuration.yaml"):
caps.append("homeassistant")
# Phase 4: screenshot capability
import shutil as _shutil
if (_shutil.which("scrot") or _shutil.which("import") or
_shutil.which("fbcat") or _shutil.which("convert")):
caps.append("screenshot")
caps.append("sysinfo")
return caps
def register(cfg: dict, state: dict) -> str:
@@ -298,6 +304,198 @@ def collect_proxmox_metrics(cfg: dict) -> dict | None:
except Exception as e:
return {"error": str(e)}
# ── Screenshot / Vision helpers ───────────────────────────────────────────────
def _take_screenshot(cmd_data: dict) -> dict:
"""
Attempts to capture a screenshot using available tools.
For headless servers, falls back to a rich text system snapshot.
Returns base64-encoded PNG and metadata.
"""
import base64, tempfile, shutil
tmp = tempfile.mktemp(suffix=".png")
width = height = 0
method = "unknown"
# 1. Try scrot (X11 desktop)
if shutil.which("scrot") and os.environ.get("DISPLAY"):
try:
r = subprocess.run(["scrot", "-z", tmp], capture_output=True, timeout=10)
if r.returncode == 0 and os.path.exists(tmp):
method = "scrot"
except Exception:
pass
# 2. Try import (ImageMagick X11)
if method == "unknown" and shutil.which("import") and os.environ.get("DISPLAY"):
try:
r = subprocess.run(["import", "-window", "root", tmp], capture_output=True, timeout=10)
if r.returncode == 0 and os.path.exists(tmp):
method = "import"
except Exception:
pass
# 3. Try fbcat (Linux framebuffer — headless VMs with framebuffer)
if method == "unknown" and shutil.which("fbcat") and os.path.exists("/dev/fb0"):
try:
ppm = tempfile.mktemp(suffix=".ppm")
r = subprocess.run(["fbcat", "-s", "/dev/fb0"], stdout=open(ppm, "wb"),
stderr=subprocess.PIPE, timeout=10)
if r.returncode == 0 and shutil.which("convert"):
subprocess.run(["convert", ppm, tmp], capture_output=True, timeout=10)
os.unlink(ppm)
if os.path.exists(tmp):
method = "framebuffer"
except Exception:
pass
# 4. Headless fallback: build a PNG system dashboard from text stats
if method == "unknown":
try:
result = _render_sysinfo_png(tmp)
if result:
method = "sysinfo_render"
except Exception:
pass
if method == "unknown" or not os.path.exists(tmp):
# Last resort: return text snapshot only
snap = _sysinfo_snapshot()
snap["screenshot_available"] = False
snap["method"] = "text_only"
return snap
# Read image
try:
with open(tmp, "rb") as f:
raw = f.read()
b64 = base64.b64encode(raw).decode()
fsize = len(raw)
os.unlink(tmp)
# Try to get dimensions via file command
try:
r = subprocess.run(["identify", "-format", "%wx%h", tmp],
capture_output=True, text=True, timeout=5)
if "x" in r.stdout:
w, h = r.stdout.strip().split("x", 1)
width, height = int(w), int(h)
except Exception:
pass
return {
"success": True,
"method": method,
"image_b64": b64,
"image_mime": "image/png",
"file_size": fsize,
"width": width,
"height": height,
"hostname": socket.gethostname(),
}
except Exception as e:
return {"success": False, "error": str(e), "method": method}
def _render_sysinfo_png(out_path: str) -> bool:
"""Render a system info text snapshot as a PNG using ansi2image or ImageMagick."""
import shutil
snap = _build_sysinfo_text()
# Try convert (ImageMagick) to render text → PNG
if shutil.which("convert"):
try:
r = subprocess.run([
"convert",
"-size", "900x600", "xc:#0a0f14",
"-font", "Courier-New",
"-pointsize", "13",
"-fill", "#00d4ff",
"-annotate", "+20+30", snap[:3000],
out_path,
], capture_output=True, timeout=15)
return r.returncode == 0 and os.path.exists(out_path)
except Exception:
pass
return False
def _build_sysinfo_text() -> str:
"""Build a rich text system snapshot for headless machines."""
lines = [f"JARVIS FIELD STATION — {socket.gethostname()}",
f"Timestamp: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}",
"" * 60]
try:
# CPU / mem / disk
with open("/proc/loadavg") as f:
load = f.read().split()[:3]
lines.append(f"Load avg: {' '.join(load)}")
except Exception:
pass
try:
with open("/proc/meminfo") as f:
minfo = {l.split(":")[0].strip(): int(l.split()[1]) for l in f if ":" in l}
total = minfo.get("MemTotal", 0)
avail = minfo.get("MemAvailable", 0)
used = total - avail
lines.append(f"Memory: {used//1024}MB used / {total//1024}MB total")
except Exception:
pass
try:
r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5)
lines.append("Disk:\n" + r.stdout.strip())
except Exception:
pass
try:
r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5)
lines.append("Top processes:\n" + "\n".join(r.stdout.splitlines()[1:8]))
except Exception:
pass
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
lines.append("Listening ports:\n" + r.stdout.strip()[:500])
except Exception:
pass
return "\n".join(lines)
def _sysinfo_snapshot() -> dict:
"""Return structured system snapshot (no image) for text-based analysis."""
data = {"success": True, "hostname": socket.gethostname(),
"snapshot_type": "text", "screenshot_available": False}
try:
with open("/proc/loadavg") as f:
parts = f.read().split()
data["load_1m"], data["load_5m"], data["load_15m"] = parts[0], parts[1], parts[2]
except Exception:
pass
try:
with open("/proc/meminfo") as f:
m = {l.split(":")[0].strip(): int(l.split()[1])
for l in f if ":" in l and len(l.split()) >= 2}
data["mem_total_mb"] = m.get("MemTotal", 0) // 1024
data["mem_avail_mb"] = m.get("MemAvailable", 0) // 1024
data["mem_used_mb"] = data["mem_total_mb"] - data["mem_avail_mb"]
except Exception:
pass
try:
r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True, timeout=5)
data["disk"] = r.stdout.splitlines()[1] if r.stdout else ""
except Exception:
pass
try:
r = subprocess.run(["ps", "aux", "--sort=-%cpu"], capture_output=True, text=True, timeout=5)
data["top_procs"] = r.stdout.splitlines()[1:8]
except Exception:
pass
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
data["listening_ports"] = r.stdout.strip()[:800]
except Exception:
pass
return data
# ── Command execution ─────────────────────────────────────────────────────────
def execute_command(cmd: dict) -> dict:
@@ -327,17 +525,25 @@ def execute_command(cmd: dict) -> dict:
return {"success": r.returncode == 0, "output": r.stdout}
elif cmd_type == "update":
updated = self_update(cfg)
_cfg = load_config()
updated = self_update(_cfg)
return {"success": True, "updated": updated}
elif cmd_type == "shell":
# Guard reads LOCAL config, not the server-supplied payload
if not cfg.get("allow_shell_commands", False):
_cfg = load_config()
if not _cfg.get("allow_shell_commands", False):
return {"success": False, "error": "Shell commands not enabled in agent config"}
cmd_str = cmd_data.get("command", "")
r = subprocess.run(cmd_str, shell=True, capture_output=True, text=True, timeout=30)
return {"success": True, "stdout": r.stdout[:2000], "stderr": r.stderr[:500]}
elif cmd_type == "screenshot":
return _take_screenshot(cmd_data)
elif cmd_type == "sysinfo":
return _sysinfo_snapshot()
else:
return {"success": False, "error": f"Unknown command type: {cmd_type}"}