Files
proxmox-config/pve/scripts/jarvis-ping-probe.py
T

101 lines
2.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
JARVIS Ping Probe — runs on PVE1 (10.48.200.90), which is on the LAN.
Pings devices that can't run the full agent, then calls JARVIS heartbeat
on their behalf so the dashboard shows live status.
"""
import json
import subprocess
import urllib.request
import urllib.error
import ssl
JARVIS_URL = "https://165.22.1.228"
HOST_HEADER = "jarvis.orbishosting.com"
# Devices to probe: agent_id → api_key
DEVICES = {
"fortigate_gw": "00103aea6fcbf837bc55e11b445a3620",
"yealink_t48s": "2bf8bd7ca8dd31c28fd16aa956e15f88",
"homeassistant_ha": "6f8077dee7a7b4af202bc80886f1223d",
}
# Map agent_id → IP (for ping)
IPS = {
"fortigate_gw": "10.48.200.1",
"yealink_t48s": "10.48.200.43",
"homeassistant_ha": "10.48.200.97",
}
def ping(ip: str) -> bool:
result = subprocess.run(
["ping", "-c", "1", "-W", "2", ip],
capture_output=True, timeout=5
)
return result.returncode == 0
def heartbeat(agent_id: str, api_key: str, alive: bool):
# If device is down we still send heartbeat so JARVIS updates last_seen
# and sets status based on the alive flag via the metric payload
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
payload = json.dumps({}).encode()
req = urllib.request.Request(
f"{JARVIS_URL}/api/agent/heartbeat",
data=payload, method="POST"
)
req.add_header("Content-Type", "application/json")
req.add_header("X-Agent-Key", api_key)
req.add_header("Host", HOST_HEADER)
try:
with urllib.request.urlopen(req, timeout=10, context=ctx):
pass
except Exception:
pass
def update_status(agent_id: str, api_key: str, status: str):
"""Push a minimal metric so JARVIS knows if device is up or down."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
payload = json.dumps({
"type": "system",
"data": {
"hostname": agent_id,
"cpu_percent": 0,
"ping_only": True,
"ping_status": status,
}
}).encode()
req = urllib.request.Request(
f"{JARVIS_URL}/api/agent/metrics",
data=payload, method="POST"
)
req.add_header("Content-Type", "application/json")
req.add_header("X-Agent-Key", api_key)
req.add_header("Host", HOST_HEADER)
try:
with urllib.request.urlopen(req, timeout=10, context=ctx):
pass
except Exception:
pass
def main():
for agent_id, api_key in DEVICES.items():
ip = IPS.get(agent_id, "")
alive = ping(ip) if ip else False
status = "online" if alive else "offline"
print(f"{agent_id} ({ip}): {status}", flush=True)
heartbeat(agent_id, api_key, alive)
if alive:
update_status(agent_id, api_key, status)
if __name__ == "__main__":
main()