Files
myron 7f6397b514 perf: route Guardian and Vision text analysis to Groq instead of Claude
Guardian anomaly alerts and SITREP are pure text reasoning — Groq's
llama-3.3-70b-versatile handles them at near-zero cost with lower
latency. Vision Protocol image analysis stays on Claude (claude-opus-
4-8) because Groq has no vision models. Text-only sysinfo snapshots
(no image captured) also move to Groq.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 17:06:15 +00:00

2774 lines
123 KiB
Python

#!/usr/bin/env python3
"""
JARVIS Arc Reactor — Core Daemon v9.0
Phase 1: Job queue, ping/echo/shell
Phase 2: Intel Protocol (research), Iron Protocol (tool loop), LLM router
Phase 3: Gmail Triage (Comms Protocol), Remote Exec (Field Protocol)
Phase 4: Vision Protocol (screenshot, vision, sysinfo)
Phase 5: Guardian Mode (continuous awareness, proactive alerts, SITREP)
Phase 6: Comms v2 — send email, compose, schedule event, meeting prep
Phase 7: Mission Ops — multi-step automated workflows with trigger engine
Phase 8: Mission Directives — OKR/goal tracking with AI progress review
Phase 9: Clearance Protocol — approval gating for high-risk operations
"""
import asyncio
import email as _email_lib
import imaplib
import json
import logging
import os
import re
import sys
import traceback
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from email.header import decode_header as _decode_header
from email.utils import parsedate_to_datetime
from typing import Any, Optional
import aiomysql
import aiohttp
import uvicorn
from fastapi import FastAPI, HTTPException, Request
# ── CONFIG ────────────────────────────────────────────────────────────────────
HOST = "127.0.0.1"
PORT = 7474
VERSION = "9.0.0"
DB_HOST = "localhost"
DB_PORT = 3306
DB_USER = "jarvis_user"
DB_PASS = "J4rv1s_Pr0t0c0l_2026!"
DB_NAME = "jarvis_db"
LOG_FILE = "/home/jarvis.orbishosting.com/logs/arc_reactor.log"
POLL_INTERVAL = 3
HEARTBEAT_INTERVAL = 30
CLAUDE_API_KEY = "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"
CLAUDE_MODEL = "claude-sonnet-4-6"
GROQ_API_KEY = "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"
GROQ_MODEL = "llama-3.3-70b-versatile"
OLLAMA_HOST = "http://10.48.200.95:11434"
OLLAMA_MODEL = "llama3.2:1b"
GMAIL_USER = "myronblair@gmail.com"
GMAIL_PASS = "demsvdylwweacbcx"
ICLOUD_USER = "myronblair@icloud.com"
ICLOUD_PASS = "yxfi-yvzu-geqk-japr"
# ── LOGGING ───────────────────────────────────────────────────────────────────
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("arc_reactor")
# ── DB POOL ───────────────────────────────────────────────────────────────────
_pool: Optional[aiomysql.Pool] = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None or _pool.closed:
_pool = await aiomysql.create_pool(
host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS,
db=DB_NAME, autocommit=True, minsize=2, maxsize=10, charset="utf8mb4",
)
return _pool
async def db_execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, args)
return cur.lastrowid
async def db_fetchone(sql: str, args: tuple = ()) -> Optional[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
async def db_fetchall(sql: str, args: tuple = ()) -> list:
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 1 HANDLERS
# ═══════════════════════════════════════════════════════════════════════════════
async def handle_ping(payload: dict) -> dict:
return {"pong": True, "timestamp": datetime.utcnow().isoformat(), "version": VERSION}
async def handle_echo(payload: dict) -> dict:
return {"echo": payload.get("message", ""), "timestamp": datetime.utcnow().isoformat()}
async def handle_shell(payload: dict) -> dict:
cmd = payload.get("command", "").strip()
allowed = ("df ", "free ", "uptime", "hostname", "uname", "ps ", "cat /proc/")
if not any(cmd.startswith(p) for p in allowed):
raise ValueError(f"Command not in whitelist: {cmd[:40]}")
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
return {"stdout": stdout.decode().strip(), "stderr": stderr.decode().strip(), "exit_code": proc.returncode}
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 2: LLM ROUTER
# ═══════════════════════════════════════════════════════════════════════════════
async def llm_call(messages: list, provider: str = "claude", system: str = "") -> str:
if provider == "claude" and CLAUDE_API_KEY:
return await _claude_call(messages, system)
elif provider == "groq" and GROQ_API_KEY:
return await _groq_call(messages, system)
elif provider == "ollama":
return await _ollama_call(messages, system)
for p in ["groq", "ollama"]:
try:
return await llm_call(messages, p, system)
except Exception:
continue
raise RuntimeError("All LLM providers failed")
async def _claude_call(messages: list, system: str = "") -> str:
payload = {"model": CLAUDE_MODEL, "max_tokens": 4096, "messages": messages}
if system:
payload["system"] = system
headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.anthropic.com/v1/messages", json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=60)) as resp:
data = await resp.json()
if resp.status != 200:
raise RuntimeError(f"Claude API error {resp.status}: {data.get('error',{}).get('message','')}")
return data["content"][0]["text"]
async def _groq_call(messages: list, system: str = "") -> str:
all_msgs = ([{"role": "system", "content": system}] if system else []) + messages
payload = {"model": GROQ_MODEL, "messages": all_msgs, "max_tokens": 4096}
headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.groq.com/openai/v1/chat/completions", json=payload,
headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp:
data = await resp.json()
if resp.status != 200:
raise RuntimeError(f"Groq error {resp.status}")
return data["choices"][0]["message"]["content"]
async def _ollama_call(messages: list, system: str = "") -> str:
prompt = (system + "\n\n" if system else "") + "\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages)
async with aiohttp.ClientSession() as session:
async with session.post(f"{OLLAMA_HOST}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
timeout=aiohttp.ClientTimeout(total=30)) as resp:
data = await resp.json()
return data.get("response", "")
async def handle_llm(payload: dict) -> dict:
message = payload.get("message", "")
system = payload.get("system", "You are JARVIS, an Iron Man-style AI assistant.")
provider = payload.get("provider", "claude")
if not message:
raise ValueError("Missing message")
result = await llm_call([{"role": "user", "content": message}], provider, system)
return {"response": result, "provider": provider}
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 2: INTEL PROTOCOL — RESEARCH ENGINE
# ═══════════════════════════════════════════════════════════════════════════════
async def _web_search(query: str, max_results: int = 6) -> list:
try:
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
for r in ddgs.text(query, max_results=max_results):
results.append({"url": r.get("href",""), "title": r.get("title",""), "snippet": r.get("body","")})
return results
except Exception as e:
log.warning(f"DDG search failed: {e}")
return []
async def _fetch_url_content(url: str, timeout: int = 12) -> str:
try:
import trafilatura
headers = {"User-Agent": "Mozilla/5.0 (compatible; JARVIS-Research/3.0)"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout),
allow_redirects=True, ssl=False) as resp:
if resp.status != 200:
return ""
html = await resp.text(errors="replace")
text = trafilatura.extract(html, include_links=False, include_images=False, no_fallback=False, favor_precision=True)
return (text or "")[:4000]
except Exception as e:
log.debug(f"Fetch failed {url[:60]}: {e}")
return ""
async def handle_research(payload: dict) -> dict:
query = payload.get("query", "").strip()
depth = payload.get("depth", "standard")
provider = payload.get("provider", "claude")
if not query:
raise ValueError("Missing research query")
max_sources = {"quick": 3, "standard": 5, "deep": 8}.get(depth, 5)
log.info(f"[INTEL] Research: '{query}' depth={depth} sources={max_sources}")
search_results = await _web_search(query, max_results=max_sources + 2)
if not search_results:
search_results = [{"url": "", "title": query, "snippet": f"No search results for: {query}"}]
fetch_tasks = [_fetch_url_content(r["url"]) for r in search_results if r.get("url")]
page_texts = await asyncio.gather(*fetch_tasks, return_exceptions=True)
sources = []
for i, r in enumerate(search_results[:max_sources]):
text = page_texts[i] if i < len(page_texts) and isinstance(page_texts[i], str) else ""
sources.append({"url": r["url"], "title": r["title"], "snippet": r["snippet"], "content": text})
context_parts = []
for i, s in enumerate(sources, 1):
body = s["content"] or s["snippet"]
if body:
context_parts.append(f"SOURCE {i}: {s['title']}\nURL: {s['url']}\n{body[:3000]}\n")
context_text = "\n---\n".join(context_parts) if context_parts else "No page content available."
synthesis_prompt = (
f'You are JARVIS, an advanced AI research assistant. The user asked: "{query}"\n\n'
f"You have retrieved the following source material:\n\n{context_text}\n\n"
f"Provide a comprehensive research briefing with:\n"
f"1. **EXECUTIVE SUMMARY** (2-3 sentences)\n"
f"2. **KEY FINDINGS** (5-8 bullet points)\n"
f"3. **DETAILS** (thorough synthesis, 2-4 paragraphs)\n"
f"4. **CONFIDENCE** (High/Medium/Low based on source quality)\n\n"
f"Be precise, factual, and cite source numbers where relevant."
)
try:
synthesis = await llm_call([{"role": "user", "content": synthesis_prompt}], provider=provider)
except Exception as e:
log.warning(f"[INTEL] Synthesis failed, falling back: {e}")
synthesis = f"**RESEARCH BRIEFING: {query}**\n\n" + "\n\n".join(f"**{s['title']}**\n{s['snippet']}" for s in sources)
key_points = []
for line in synthesis.split("\n"):
line = line.strip()
if line.startswith(("- ", "• ", "* ")) and len(line) > 10:
key_points.append(re.sub(r'^[-•*]\s*', '', line))
elif re.match(r'^\d+\.\s+', line) and len(line) > 10:
key_points.append(re.sub(r'^\d+\.\s+', '', line))
clean_sources = [{"url": s["url"], "title": s["title"] or s["url"]} for s in sources if s.get("url")]
log.info(f"[INTEL] Research complete: '{query}' — {len(sources)} sources")
return {"query": query, "depth": depth, "synthesis": synthesis, "key_points": key_points[:10],
"sources": clean_sources, "source_count": len(clean_sources), "provider": provider}
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 2: IRON PROTOCOL — MULTI-STEP TOOL LOOP
# ═══════════════════════════════════════════════════════════════════════════════
AGENT_TOOLS = [
{"name": "web_search", "description": "Search the web for current information.",
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}, "max_results": {"type": "integer", "default": 5}}, "required": ["query"]}},
{"name": "fetch_url", "description": "Fetch a web page and extract its main text content.",
"input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}},
{"name": "jarvis_agents", "description": "Get status of all JARVIS agents.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "jarvis_alerts", "description": "Get current active JARVIS alerts.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "current_time", "description": "Get current date and time.",
"input_schema": {"type": "object", "properties": {}}},
]
async def execute_agent_tool(tool_name: str, tool_input: dict) -> str:
try:
if tool_name == "web_search":
results = await _web_search(tool_input.get("query", ""), tool_input.get("max_results", 5))
if not results:
return "No search results found."
return "\n".join(f"{i+1}. {r['title']}\n URL: {r['url']}\n {r['snippet']}" for i, r in enumerate(results))
elif tool_name == "fetch_url":
url = tool_input.get("url", "")
if not url:
return "No URL provided."
return await _fetch_url_content(url) or "Could not extract content."
elif tool_name == "jarvis_agents":
agents = await db_fetchall("SELECT hostname, agent_type, ip_address, status, last_seen FROM registered_agents ORDER BY status='online' DESC, hostname")
if not agents:
return "No agents registered."
return "\n".join(f"- {a['hostname']} ({a['agent_type']}) @ {a['ip_address']}{a['status'].upper()}" for a in agents)
elif tool_name == "jarvis_alerts":
alerts = await db_fetchall("SELECT title, severity, message FROM alerts WHERE resolved=0 ORDER BY created_at DESC LIMIT 10")
return "\n".join(f"- [{a['severity'].upper()}] {a['title']}: {a['message']}" for a in alerts) or "No active alerts."
elif tool_name == "current_time":
return datetime.utcnow().strftime("UTC: %Y-%m-%d %H:%M:%S")
else:
return f"Unknown tool: {tool_name}"
except Exception as e:
return f"Tool error ({tool_name}): {str(e)[:200]}"
async def handle_tool_loop(payload: dict) -> dict:
task = payload.get("task", "").strip()
max_iterations = min(int(payload.get("max_iterations", 15)), 200)
system_prompt = payload.get("system", (
"You are JARVIS, an advanced autonomous AI assistant with access to tools. "
"Use tools methodically to complete the task. Be thorough but concise. "
"When you have enough information, provide a clear final answer."
))
if not task:
raise ValueError("Missing task")
log.info(f"[IRON] Tool loop: '{task[:80]}' max_iter={max_iterations}")
import anthropic
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
messages = [{"role": "user", "content": task}]
iteration = 0
final_text = ""
while iteration < max_iterations:
iteration += 1
response = await client.messages.create(
model=CLAUDE_MODEL, max_tokens=4096, system=system_prompt,
tools=AGENT_TOOLS, messages=messages,
)
assistant_content = []
text_parts = []
tool_uses = []
for block in response.content:
if block.type == "text":
assistant_content.append({"type": "text", "text": block.text})
text_parts.append(block.text)
elif block.type == "tool_use":
assistant_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input})
tool_uses.append(block)
messages.append({"role": "assistant", "content": assistant_content})
if text_parts:
final_text = "\n".join(text_parts)
if response.stop_reason == "end_turn" or not tool_uses:
log.info(f"[IRON] Complete after {iteration} iterations")
break
tool_results = []
for tu in tool_uses:
log.info(f"[IRON] Tool: {tu.name}")
result = await execute_agent_tool(tu.name, tu.input)
tool_results.append({"type": "tool_result", "tool_use_id": tu.id, "content": result[:3000]})
messages.append({"role": "user", "content": tool_results})
tools_used = list({b["name"] for m in messages if isinstance(m["content"], list)
for b in m["content"] if isinstance(b, dict) and b.get("type") == "tool_use"})
return {"task": task, "result": final_text, "iterations": iteration, "tools_used": tools_used}
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 3: COMMS PROTOCOL — GMAIL TRIAGE
# ═══════════════════════════════════════════════════════════════════════════════
def _imap_fetch_sync(account: str, max_msgs: int) -> list:
"""Synchronous IMAP fetch — called via run_in_executor."""
if account == "icloud":
host, port, user, passwd = "imap.mail.me.com", 993, ICLOUD_USER, ICLOUD_PASS
else:
host, port, user, passwd = "imap.gmail.com", 993, GMAIL_USER, GMAIL_PASS
if not user or not passwd:
return []
try:
mbox = imaplib.IMAP4_SSL(host, port)
mbox.login(user, passwd)
mbox.select("INBOX")
_, data = mbox.search(None, "ALL")
ids = data[0].split()[-max_msgs:]
msgs = []
for mid in reversed(ids):
_, raw = mbox.fetch(mid, "(RFC822)")
if not raw or not raw[0]:
continue
msg = _email_lib.message_from_bytes(raw[0][1])
# Subject
subj_raw = msg.get("Subject", "")
subject = ""
for part, enc in _decode_header(subj_raw):
if isinstance(part, bytes):
subject += part.decode(enc or "utf-8", errors="replace")
else:
subject += str(part)
# From
from_raw = msg.get("From", "")
from_name = ""
for part, enc in _decode_header(from_raw):
if isinstance(part, bytes):
from_name += part.decode(enc or "utf-8", errors="replace")
else:
from_name += str(part)
from_name = from_name.strip().strip('"')
em = re.search(r"<([^>]+)>", from_raw)
from_email = em.group(1) if em else from_raw
# Date
date_str = msg.get("Date", "")
# Body preview
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(part.get_content_charset() or "utf-8", errors="replace")
break
else:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
body = " ".join(body.split())[:500]
msg_uid = (msg.get("Message-ID", "") or f"{from_email}:{subject}:{date_str}").strip("<>")
msgs.append({
"msg_id": msg_uid[:255],
"from_name": from_name[:100],
"from_email": from_email[:100],
"subject": subject[:255],
"date_raw": date_str,
"body": body,
})
mbox.logout()
return msgs
except Exception as e:
log.warning(f"IMAP fetch failed ({account}): {e}")
return []
async def handle_gmail_triage(payload: dict) -> dict:
account = payload.get("account", "gmail")
max_emails = min(int(payload.get("max_emails", 20)), 40)
provider = payload.get("provider", "claude")
log.info(f"[COMMS] Gmail triage: account={account} max={max_emails}")
loop = asyncio.get_event_loop()
emails = await loop.run_in_executor(None, lambda: _imap_fetch_sync(account, max_emails))
if not emails:
return {"account": account, "triaged": 0, "urgent": 0, "action": 0,
"meeting": 0, "items": [], "error": "No emails fetched or IMAP unavailable"}
email_list = ""
for i, e in enumerate(emails, 1):
email_list += (
f"EMAIL {i}:\n"
f"From: {e['from_name']} <{e['from_email']}>\n"
f"Subject: {e['subject']}\n"
f"Date: {e['date_raw']}\n"
f"Body: {e['body'][:400]}\n\n"
)
triage_prompt = (
f"You are JARVIS, triaging {len(emails)} emails for Myron Blair.\n\n"
"For each email assign:\n"
"- category: urgent | action | reply | meeting | info | promo | spam\n"
"- priority: 1-10 (10 = drop everything)\n"
"- summary: one sentence\n"
"- draft_reply: for urgent/action/reply/meeting ONLY — brief professional reply. Empty string otherwise.\n\n"
"Return ONLY a valid JSON array. Example:\n"
'[{"index":1,"category":"urgent","priority":9,"summary":"Contract needs signing today","draft_reply":"Hi, I will review and sign today."}]\n\n'
f"EMAILS TO TRIAGE:\n{email_list}"
)
try:
raw = await llm_call([{"role": "user", "content": triage_prompt}], provider)
raw = raw.strip()
if raw.startswith("```"):
raw = "\n".join(raw.split("\n")[1:])
if raw.endswith("```"):
raw = raw[:-3]
triage_items = json.loads(raw.strip())
except Exception as e:
log.warning(f"[COMMS] Triage parse error: {e}")
triage_items = [{"index": i+1, "category": "info", "priority": 3,
"summary": f"Subject: {e2['subject']}", "draft_reply": ""}
for i, e2 in enumerate(emails)]
# Save to email_triage table
saved = 0
for item in triage_items:
idx = int(item.get("index", 0)) - 1
if idx < 0 or idx >= len(emails):
continue
e = emails[idx]
try:
date_parsed = None
if e.get("date_raw"):
try:
date_parsed = parsedate_to_datetime(e["date_raw"]).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
await db_execute(
"""INSERT INTO email_triage
(msg_id, account, from_name, from_email, subject, date_received,
category, priority, summary, draft_reply, action_taken)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'none')
ON DUPLICATE KEY UPDATE
category=VALUES(category), priority=VALUES(priority),
summary=VALUES(summary), draft_reply=VALUES(draft_reply)""",
(e["msg_id"], account, e["from_name"], e["from_email"], e["subject"],
date_parsed, item.get("category", "info"), int(item.get("priority", 3)),
item.get("summary", "")[:500], item.get("draft_reply", "")[:3000])
)
saved += 1
except Exception as ex:
log.debug(f"[COMMS] Save triage error: {ex}")
counts = {}
for item in triage_items:
c = item.get("category", "info")
counts[c] = counts.get(c, 0) + 1
urgent_items = []
for it in triage_items:
idx = int(it.get("index", 0)) - 1
if 0 <= idx < len(emails) and it.get("category") in ("urgent", "action", "reply", "meeting"):
urgent_items.append({
"from": emails[idx]["from_name"],
"from_email": emails[idx]["from_email"],
"subject": emails[idx]["subject"][:80],
"summary": it.get("summary", ""),
"priority": it.get("priority", 0),
"draft_reply": it.get("draft_reply", ""),
"category": it.get("category", ""),
})
urgent_items.sort(key=lambda x: -x["priority"])
log.info(f"[COMMS] Triage complete: {len(emails)} emails, {saved} saved, counts={counts}")
return {
"account": account,
"triaged": len(emails),
"saved": saved,
"counts": counts,
"urgent": counts.get("urgent", 0),
"action": counts.get("action", 0),
"meeting": counts.get("meeting", 0),
"reply": counts.get("reply", 0),
"items": urgent_items[:10],
}
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 3: FIELD PROTOCOL — REMOTE EXEC
# ═══════════════════════════════════════════════════════════════════════════════
async def handle_remote_exec(payload: dict) -> dict:
"""
Queue a command to a JARVIS Field Station agent and wait for the result.
payload: { agent, command_type, command_data, timeout }
"""
agent_name = payload.get("agent", "").strip()
cmd_type = payload.get("command_type", "ping")
cmd_data = payload.get("command_data", {})
timeout_secs = min(int(payload.get("timeout", 35)), 120)
if not agent_name:
raise ValueError("Missing agent name")
log.info(f"[FIELD] remote_exec {cmd_type} on {agent_name}")
# NOTE: _dispatch_agent_command is defined in Phase 4 block below.
# Forward declaration — called at runtime, not at import.
dispatch = await _dispatch_agent_command(agent_name, cmd_type, cmd_data, timeout_secs)
return {
"agent": dispatch["agent"],
"agent_id": dispatch["agent_id"],
"command_type": cmd_type,
"command_data": cmd_data,
"cmd_id": dispatch["cmd_id"],
"status": dispatch["status"],
"result": dispatch["result"],
}
# ── PHASE 4: VISION PROTOCOL ──────────────────────────────────────────────────
async def _dispatch_agent_command(agent_name: str, cmd_type: str, cmd_data: dict,
timeout_secs: int = 45) -> dict:
"""Shared helper: find agent, queue command, poll for result."""
agent = await db_fetchone(
"""SELECT agent_id, hostname, status FROM registered_agents
WHERE (hostname LIKE %s OR agent_id LIKE %s) AND status='online' LIMIT 1""",
(f"%{agent_name}%", f"%{agent_name}%")
)
if not agent:
all_agents = await db_fetchall("SELECT hostname FROM registered_agents WHERE status='online'")
names = [a["hostname"] for a in all_agents]
raise ValueError(f"No online agent matching '{agent_name}'. Online: {', '.join(names) or 'none'}")
cmd_id = await db_execute(
"INSERT INTO agent_commands (agent_id, command_type, command_data) VALUES (%s, %s, %s)",
(agent["agent_id"], cmd_type, json.dumps(cmd_data))
)
elapsed = 0
while elapsed < timeout_secs:
await asyncio.sleep(2)
elapsed += 2
row = await db_fetchone("SELECT status, result FROM agent_commands WHERE id=%s", (cmd_id,))
if row and row["status"] in ("executed", "failed"):
result = row.get("result")
if result and isinstance(result, str):
try:
result = json.loads(result)
except Exception:
pass
return {"agent": agent["hostname"], "agent_id": agent["agent_id"],
"cmd_id": cmd_id, "status": row["status"], "result": result or {}}
await db_execute("UPDATE agent_commands SET status='failed' WHERE id=%s AND status='pending'", (cmd_id,))
raise TimeoutError(f"No response from {agent['hostname']} within {timeout_secs}s")
async def handle_screenshot(payload: dict) -> dict:
"""
Capture a screenshot (or system snapshot) from a field agent, then optionally
run vision analysis via Claude.
payload: { agent, analyze: true|false, analyze_prompt: "...", timeout: 45 }
"""
agent_name = payload.get("agent", "").strip()
do_analyze = bool(payload.get("analyze", True))
analyze_prompt = payload.get("analyze_prompt", "Describe what you see on this screen in detail. Note any important status indicators, errors, running processes, or interesting information.")
timeout_secs = min(int(payload.get("timeout", 45)), 120)
if not agent_name:
raise ValueError("Missing agent name")
log.info(f"[VISION] Screenshot request: agent={agent_name} analyze={do_analyze}")
# Dispatch screenshot command to field agent
dispatch = await _dispatch_agent_command(agent_name, "screenshot", {}, timeout_secs)
result = dispatch.get("result", {})
hostname = dispatch.get("agent", agent_name)
if dispatch.get("status") == "failed" or not result.get("success"):
err = result.get("error", "Agent returned failure") if isinstance(result, dict) else str(result)
return {"agent": hostname, "success": False, "error": err}
image_b64 = result.get("image_b64", "")
method = result.get("method", "unknown")
width = result.get("width", 0)
height = result.get("height", 0)
file_size = result.get("file_size", 0)
# Run Claude vision analysis if we have an image
analysis = ""
provider_used = ""
if do_analyze and image_b64:
try:
import anthropic
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
msg = await client.messages.create(
model="claude-opus-4-8-20251101",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64",
"media_type": "image/png", "data": image_b64}},
{"type": "text", "text": analyze_prompt},
],
}],
)
analysis = msg.content[0].text if msg.content else ""
provider_used = "claude"
log.info(f"[VISION] Claude analysis complete ({len(analysis)} chars)")
except Exception as e:
log.warning(f"[VISION] Claude vision failed: {e}")
analysis = f"Vision analysis unavailable: {e}"
elif do_analyze and not image_b64 and result.get("snapshot_type") == "text":
# Text-only sysinfo snapshot — summarize with LLM
try:
snap_text = json.dumps(result, indent=2)[:3000]
prompt = f"Summarize this server system snapshot for JARVIS. Highlight any concerns:\n\n{snap_text}"
analysis = await llm_call([{"role": "user", "content": prompt}], "groq")
provider_used = "groq"
except Exception as e:
analysis = f"Analysis unavailable: {e}"
# Store screenshot
screenshot_id = await db_execute(
"""INSERT INTO agent_screenshots
(agent_id, hostname, method, image_b64, width, height, file_size,
vision_analysis, vision_provider)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(dispatch.get("agent_id", ""), hostname, method,
image_b64, width, height, file_size, analysis, provider_used)
)
log.info(f"[VISION] Screenshot saved: id={screenshot_id} agent={hostname} method={method}")
return {
"agent": hostname,
"screenshot_id": screenshot_id,
"method": method,
"width": width,
"height": height,
"file_size": file_size,
"has_image": bool(image_b64),
"analysis": analysis,
"provider": provider_used,
}
async def handle_vision(payload: dict) -> dict:
"""
Run Claude vision on an already-stored screenshot or a provided base64 image.
payload: { screenshot_id OR image_b64, prompt, provider }
"""
screenshot_id = payload.get("screenshot_id")
image_b64 = payload.get("image_b64", "")
prompt = payload.get("prompt", "Describe this image in detail.")
provider = payload.get("provider", "claude")
if screenshot_id:
row = await db_fetchone(
"SELECT image_b64, hostname, method FROM agent_screenshots WHERE id=%s",
(int(screenshot_id),)
)
if not row or not row["image_b64"]:
raise ValueError(f"Screenshot {screenshot_id} not found or has no image data")
image_b64 = row["image_b64"]
hostname = row.get("hostname", "unknown")
else:
hostname = "direct"
if not image_b64:
raise ValueError("No image data provided")
log.info(f"[VISION] Analysis: screenshot_id={screenshot_id} agent={hostname}")
try:
import anthropic
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
msg = await client.messages.create(
model="claude-opus-4-8-20251101",
max_tokens=2048,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64",
"media_type": "image/png", "data": image_b64}},
{"type": "text", "text": prompt},
],
}],
)
analysis = msg.content[0].text if msg.content else ""
except Exception as e:
raise RuntimeError(f"Vision analysis failed: {e}")
# Update stored screenshot if we have an ID
if screenshot_id:
await db_execute(
"UPDATE agent_screenshots SET vision_analysis=%s, vision_provider=%s WHERE id=%s",
(analysis, "claude", int(screenshot_id))
)
return {
"agent": hostname,
"screenshot_id": screenshot_id,
"prompt": prompt,
"analysis": analysis,
"provider": "claude",
}
async def handle_sysinfo(payload: dict) -> dict:
"""
Get a structured system snapshot from a field agent (no image).
Optionally ask Claude to summarize/assess the health of the machine.
payload: { agent, analyze: true|false, timeout: 30 }
"""
agent_name = payload.get("agent", "").strip()
do_analyze = bool(payload.get("analyze", True))
timeout_secs = min(int(payload.get("timeout", 30)), 60)
if not agent_name:
raise ValueError("Missing agent name")
dispatch = await _dispatch_agent_command(agent_name, "sysinfo", {}, timeout_secs)
result = dispatch.get("result", {})
hostname = dispatch.get("agent", agent_name)
if dispatch.get("status") == "failed":
return {"agent": hostname, "success": False, "error": result.get("error", "Command failed")}
analysis = ""
if do_analyze:
try:
snap = json.dumps(result, indent=2)[:3000]
summary_prompt = (
f"You are JARVIS analyzing a field station sysinfo snapshot from '{hostname}'.\n\n"
f"Snapshot:\n{snap}\n\n"
"Provide a concise health assessment: status (healthy/warning/critical), "
"key metrics, any concerns, and recommended actions if needed. "
"Keep it under 150 words, JARVIS style."
)
analysis = await llm_call([{"role": "user", "content": summary_prompt}], "claude")
except Exception as e:
analysis = f"Analysis unavailable: {e}"
return {
"agent": hostname,
"success": True,
"snapshot": result,
"analysis": analysis,
}
# ═══════════════════════════════════════════════════════════════════════════════
# JOB HANDLER REGISTRY
# ═══════════════════════════════════════════════════════════════════════════════
JOB_HANDLERS = {
"ping": handle_ping,
"echo": handle_echo,
"shell": handle_shell,
"llm": handle_llm,
"research": handle_research,
"tool_loop": handle_tool_loop,
"gmail_triage": handle_gmail_triage,
"remote_exec": handle_remote_exec,
# Phase 4
"screenshot": handle_screenshot,
"vision": handle_vision,
"sysinfo": handle_sysinfo,
# Phase 5
"sitrep": None, # registered after guardian functions are defined
"guardian_config": None,
}
# ── PHASE 5: GUARDIAN MODE ────────────────────────────────────────────────────
# In-memory state: tracks last alert time per (agent_id, metric) to debounce
_guardian_state: dict = {
"enabled": True,
"last_scan": None,
"last_sitrep": None,
"alert_cooldown": {}, # key: "agent_id:metric" → last_alert_epoch
"online_snapshot": {}, # agent_id → was_online bool
}
GUARDIAN_COOLDOWN = 600 # seconds between repeat alerts for same metric
async def _guardian_get_config() -> dict:
rows = await db_fetchall("SELECT key_name, value FROM guardian_config")
return {r["key_name"]: r["value"] for r in rows} if rows else {}
async def _guardian_emit(event_type: str, severity: str, agent_id: str,
hostname: str, metric: str, value: float,
threshold: float, message: str, ai_analysis: str = "") -> int:
return await db_execute(
"""INSERT INTO guardian_events
(event_type, severity, agent_id, hostname, metric, value, threshold,
message, ai_analysis)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(event_type, severity, agent_id, hostname, metric, value, threshold,
message, ai_analysis)
)
async def _guardian_cooldown_ok(agent_id: str, metric: str) -> bool:
"""Return True if enough time has passed since last alert for this agent+metric."""
import time
key = f"{agent_id}:{metric}"
last = _guardian_state["alert_cooldown"].get(key, 0)
if (time.time() - last) >= GUARDIAN_COOLDOWN:
_guardian_state["alert_cooldown"][key] = time.time()
return True
return False
async def guardian_loop() -> None:
"""
Background task — runs every SCAN_INTERVAL seconds.
Checks all agents for anomalies, emits guardian_events, optionally
generates AI analysis for critical findings.
"""
import time
log.info("[GUARDIAN] Guardian Mode activated")
while True:
try:
cfg = await _guardian_get_config()
if cfg.get("enabled", "1") == "0":
await asyncio.sleep(60)
continue
scan_interval = int(cfg.get("scan_interval", 120))
cpu_thresh = float(cfg.get("cpu_threshold", 85))
mem_thresh = float(cfg.get("mem_threshold", 88))
disk_thresh = float(cfg.get("disk_threshold", 88))
offline_mins = int(cfg.get("offline_minutes", 3))
ai_on = cfg.get("ai_analysis", "1") == "1"
proactive_chat = cfg.get("proactive_chat", "1") == "1"
_guardian_state["last_scan"] = datetime.utcnow().isoformat()
critical_findings = []
# ── 1. Agent online/offline transitions ───────────────────────────
agents = await db_fetchall(
"SELECT agent_id, hostname, status, last_seen FROM registered_agents"
)
current_snapshot = {}
for ag in agents:
aid = ag["agent_id"]
hostname = ag["hostname"]
is_online = ag["status"] == "online"
current_snapshot[aid] = is_online
was_online = _guardian_state["online_snapshot"].get(aid)
if was_online is True and not is_online:
# Agent went offline
if await _guardian_cooldown_ok(aid, "offline"):
eid = await _guardian_emit("agent_offline", "critical", aid, hostname,
"status", 0, 1, f"Field station '{hostname}' went OFFLINE")
critical_findings.append(f"⚠ {hostname} OFFLINE")
log.warning(f"[GUARDIAN] {hostname} went offline → event #{eid}")
elif was_online is False and is_online:
# Agent came back
if await _guardian_cooldown_ok(aid, "online"):
await _guardian_emit("agent_online", "info", aid, hostname,
"status", 1, 0, f"Field station '{hostname}' back ONLINE")
log.info(f"[GUARDIAN] {hostname} came back online")
_guardian_state["online_snapshot"] = current_snapshot
# ── 2. Metrics analysis ───────────────────────────────────────────
# Get latest metric per online agent
metrics_rows = await db_fetchall(
"""SELECT m.agent_id, r.hostname, m.metric_data
FROM agent_metrics m
JOIN registered_agents r ON r.agent_id = m.agent_id
WHERE r.status = 'online'
AND m.recorded_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
ORDER BY m.recorded_at DESC"""
)
seen_agents: set = set()
for row in metrics_rows:
aid = row["agent_id"]
if aid in seen_agents:
continue
seen_agents.add(aid)
hostname = row["hostname"]
try:
m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"]
sys_data = m
cpu = sys_data.get("cpu_percent")
if cpu is not None and float(cpu) >= cpu_thresh:
sev = "critical" if float(cpu) >= 95 else "warning"
if await _guardian_cooldown_ok(aid, "cpu"):
msg = f"{hostname}: CPU at {cpu:.1f}% (threshold: {cpu_thresh}%)"
await _guardian_emit("cpu_high", sev, aid, hostname,
"cpu_percent", float(cpu), cpu_thresh, msg)
critical_findings.append(f"CPU {hostname} {cpu:.0f}%")
mem = sys_data.get("memory", {})
mem_pct = mem.get("percent") if isinstance(mem, dict) else None
if mem_pct is not None and float(mem_pct) >= mem_thresh:
sev = "critical" if float(mem_pct) >= 95 else "warning"
if await _guardian_cooldown_ok(aid, "memory"):
msg = f"{hostname}: Memory at {mem_pct:.1f}% (threshold: {mem_thresh}%)"
await _guardian_emit("mem_high", sev, aid, hostname,
"mem_percent", float(mem_pct), mem_thresh, msg)
critical_findings.append(f"MEM {hostname} {mem_pct:.0f}%")
disks = sys_data.get("disk", [])
if isinstance(disks, list):
for disk in disks:
dpct = disk.get("percent", 0)
if dpct and float(str(dpct).rstrip("%")) >= disk_thresh:
mount = disk.get("mountpoint", "/")
key = f"disk_{mount}"
if await _guardian_cooldown_ok(aid, key):
msg = f"{hostname}: Disk {mount} at {dpct}% (threshold: {disk_thresh}%)"
sev = "critical" if float(str(dpct).rstrip("%")) >= 95 else "warning"
await _guardian_emit("disk_high", sev, aid, hostname,
key, float(str(dpct).rstrip("%")), disk_thresh, msg)
critical_findings.append(f"DISK {hostname}{mount} {dpct}%")
# Service anomalies
services = sys_data.get("services", [])
for svc in services:
if svc.get("status") == "failed":
svc_name = svc.get("service", "unknown")
if await _guardian_cooldown_ok(aid, f"svc_{svc_name}"):
msg = f"{hostname}: Service '{svc_name}' is FAILED"
await _guardian_emit("service_down", "critical", aid, hostname,
f"service:{svc_name}", 0, 1, msg)
critical_findings.append(f"SVC {hostname}/{svc_name} FAILED")
except Exception as e:
log.debug(f"[GUARDIAN] Metrics parse error for {row['hostname']}: {e}")
# ── 3. AI analysis for critical findings ──────────────────────────
if critical_findings and ai_on:
try:
findings_text = "\n".join(f"- {f}" for f in critical_findings)
ai_prompt = (
f"You are JARVIS. The following anomalies were just detected:\n\n"
f"{findings_text}\n\n"
"Provide a brief (2-3 sentences), Iron Man-style alert message "
"for Myron. Be direct about severity and what action to take. "
"No markdown, no headers."
)
ai_msg = await llm_call([{"role": "user", "content": ai_prompt}], "groq")
# Update the most recent guardian event with AI analysis
await db_execute(
"""UPDATE guardian_events SET ai_analysis=%s
WHERE ai_analysis='' AND created_at > DATE_SUB(NOW(), INTERVAL 1 MINUTE)
ORDER BY id DESC LIMIT 1""",
(ai_msg,)
)
# Inject proactive chat message if configured
if proactive_chat:
await _guardian_inject_chat(f"◈ GUARDIAN ALERT — {ai_msg}")
except Exception as e:
log.warning(f"[GUARDIAN] AI analysis error: {e}")
if critical_findings:
log.warning(f"[GUARDIAN] Scan complete — {len(critical_findings)} findings: {', '.join(critical_findings)}")
else:
log.debug(f"[GUARDIAN] Scan complete — all clear")
except Exception as exc:
log.error(f"[GUARDIAN] Loop error: {exc}")
await asyncio.sleep(scan_interval)
async def _guardian_inject_chat(message: str) -> None:
"""Write a proactive JARVIS message into the conversations table so the HUD picks it up."""
try:
await db_execute(
"""INSERT INTO conversations (session_id, role, message, created_at)
VALUES ('guardian', 'assistant', %s, NOW())""",
(message,)
)
except Exception as e:
log.debug(f"[GUARDIAN] Chat inject error: {e}")
async def handle_sitrep(payload: dict) -> dict:
"""
Situation Report — comprehensive health briefing across all field stations.
payload: { detail: brief|full, provider: groq }
"""
detail = payload.get("detail", "full")
provider = payload.get("provider", "groq")
log.info(f"[GUARDIAN] SITREP requested (detail={detail})")
# 1. Gather agent status
agents = await db_fetchall(
"""SELECT agent_id, hostname, status, ip_address, agent_type,
capabilities, last_seen
FROM registered_agents ORDER BY status DESC, hostname ASC"""
)
# 2. Get latest metrics per agent
metrics_map = {}
metrics_rows = await db_fetchall(
"""SELECT m.agent_id, m.metric_data, m.recorded_at
FROM agent_metrics m
WHERE m.recorded_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE)
ORDER BY m.recorded_at DESC"""
)
for row in metrics_rows:
if row["agent_id"] not in metrics_map:
try:
m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"]
metrics_map[row["agent_id"]] = m
except Exception:
pass
# 3. Recent guardian events (last 24h)
recent_events = await db_fetchall(
"""SELECT event_type, severity, hostname, metric, value, threshold, message, created_at
FROM guardian_events
WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)
ORDER BY created_at DESC LIMIT 20"""
)
# 4. Arc Reactor job stats
arc_stats = await db_fetchone(
"""SELECT
SUM(status='queued') AS queued,
SUM(status='running') AS running,
SUM(status='done') AS done,
SUM(status='failed') AS failed
FROM arc_jobs WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)"""
)
# 5. Build context for Claude
agent_lines = []
for ag in agents:
m = metrics_map.get(ag["agent_id"], {})
sys = m if m else {}
cpu = sys.get("cpu_percent", "?")
mem = sys.get("memory", {}).get("percent", "?") if isinstance(sys.get("memory"), dict) else "?"
disk = next((d.get("percent","?") for d in (sys.get("disk") or []) if d.get("mount",d.get("mountpoint","")) == "/"), "?")
line = (f" {ag['hostname']} ({ag['status'].upper()}) — "
f"CPU:{cpu}% MEM:{mem}% DISK:{disk}%")
agent_lines.append(line)
event_lines = [
f" [{e['severity'].upper()}] {e['hostname']}: {e['message']} ({e['created_at']})"
for e in recent_events
] or [" No events in last 24h"]
sitrep_context = (
f"JARVIS SITREP — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}\n\n"
f"FIELD STATIONS ({len(agents)} total):\n" + "\n".join(agent_lines) + "\n\n"
f"ARC REACTOR (last 24h): queued={arc_stats.get('queued',0)} "
f"running={arc_stats.get('running',0)} done={arc_stats.get('done',0)} "
f"failed={arc_stats.get('failed',0)}\n\n"
f"GUARDIAN EVENTS (last 24h):\n" + "\n".join(event_lines)
)
prompt = (
f"{sitrep_context}\n\n"
f"You are JARVIS. Deliver a {'brief 3-sentence' if detail == 'brief' else 'comprehensive'} "
f"situation report for Myron Blair. Iron Man style — direct, confident, actionable. "
f"Highlight: overall health status, any critical issues requiring immediate attention, "
f"and key metrics. "
+ ("Keep it under 80 words." if detail == "brief" else "Structure: Overall Status → Issues → Metrics → Recommendation.")
)
analysis = await llm_call([{"role": "user", "content": prompt}], provider)
_guardian_state["last_sitrep"] = datetime.utcnow().isoformat()
# Log SITREP as guardian event
await _guardian_emit("sitrep", "info", "system", "system", "sitrep", 0, 0,
f"SITREP generated ({detail})", analysis)
online_count = sum(1 for a in agents if a["status"] == "online")
offline_count = len(agents) - online_count
critical_events = sum(1 for e in recent_events if e["severity"] == "critical")
return {
"sitrep": analysis,
"agents_online": online_count,
"agents_offline": offline_count,
"agents_total": len(agents),
"events_24h": len(recent_events),
"critical_24h": critical_events,
"arc_stats": dict(arc_stats) if arc_stats else {},
"timestamp": datetime.utcnow().isoformat(),
"detail": detail,
}
async def handle_guardian_config(payload: dict) -> dict:
"""
Get or set Guardian Mode configuration.
payload: { action: get|set, key: ..., value: ... } or { action: set, config: {...} }
"""
action = payload.get("action", "get")
if action == "get":
cfg = await _guardian_get_config()
return {"config": cfg, "guardian_state": {
"enabled": _guardian_state.get("enabled"),
"last_scan": _guardian_state.get("last_scan"),
"last_sitrep": _guardian_state.get("last_sitrep"),
}}
elif action == "set":
updates = payload.get("config", {})
if payload.get("key") and payload.get("value") is not None:
updates[payload["key"]] = str(payload["value"])
for k, v in updates.items():
await db_execute(
"INSERT INTO guardian_config (key_name, value) VALUES (%s, %s) "
"ON DUPLICATE KEY UPDATE value=%s, updated_at=NOW()",
(k, str(v), str(v))
)
if k == "enabled":
_guardian_state["enabled"] = v not in ("0", "false", "off")
return {"ok": True, "updated": list(updates.keys())}
raise ValueError(f"Unknown guardian_config action: {action}")
# Register Phase 5 handlers
JOB_HANDLERS["sitrep"] = handle_sitrep
JOB_HANDLERS["guardian_config"] = handle_guardian_config
# ── PHASE 6: COMMS v2 — SEND EMAIL + SCHEDULE + MEETING PREP ─────────────────
import smtplib
import ssl as _ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr, make_msgid
SMTP_SETTINGS = {
"gmail": ("smtp.gmail.com", 587, GMAIL_USER, GMAIL_PASS),
"icloud": ("smtp.mail.me.com", 587, ICLOUD_USER, ICLOUD_PASS),
}
def _smtp_send_sync(account: str, to_email: str, to_name: str,
subject: str, body: str,
reply_to_msg_id: str = "") -> dict:
"""Synchronous SMTP send — run in executor."""
host, port, user, passwd = SMTP_SETTINGS.get(account, SMTP_SETTINGS["gmail"])
if not user or not passwd:
return {"success": False, "error": "No credentials configured for account"}
try:
msg = MIMEMultipart("alternative")
from_addr = formataddr(("JARVIS / Myron Blair", user))
msg["From"] = from_addr
msg["To"] = formataddr((to_name, to_email)) if to_name else to_email
msg["Subject"] = subject
msg_id = make_msgid(domain=user.split("@")[1])
msg["Message-ID"] = msg_id
if reply_to_msg_id:
msg["In-Reply-To"] = f"<{reply_to_msg_id.strip('<>')}>"
msg["References"] = f"<{reply_to_msg_id.strip('<>')}>"
msg.attach(MIMEText(body, "plain", "utf-8"))
ctx = _ssl.create_default_context()
with smtplib.SMTP(host, port, timeout=20) as smtp:
smtp.ehlo()
smtp.starttls(context=ctx)
smtp.login(user, passwd)
smtp.sendmail(user, [to_email], msg.as_bytes())
return {"success": True, "message_id": msg_id}
except Exception as e:
return {"success": False, "error": str(e)}
async def handle_send_email(payload: dict) -> dict:
"""
Send an email from Gmail or iCloud.
payload: {
account: gmail|icloud,
to_email: recipient address,
to_name: recipient display name (optional),
subject: subject line,
body: plain-text body,
triage_id: int (optional — if replying to a triage item),
reply_to_msg_id: original Message-ID for threading (optional),
compose: true — ask Claude to draft the body from a prompt first
}
"""
account = payload.get("account", "gmail")
to_email = payload.get("to_email", "").strip()
to_name = payload.get("to_name", "").strip()
subject = payload.get("subject", "").strip()
body = payload.get("body", "").strip()
triage_id = payload.get("triage_id")
reply_msg_id = payload.get("reply_to_msg_id", "")
compose_prompt = payload.get("compose_prompt", "")
# If triage_id is given, pull recipient + subject + draft from email_triage
if triage_id and not to_email:
row = await db_fetchone(
"SELECT from_email, from_name, subject, draft_reply, msg_id FROM email_triage WHERE id=%s",
(int(triage_id),)
)
if row:
to_email = to_email or row["from_email"]
to_name = to_name or row["from_name"]
subject = subject or ("Re: " + (row["subject"] or ""))
body = body or row.get("draft_reply", "")
reply_msg_id = reply_msg_id or row.get("msg_id", "")
if not to_email:
raise ValueError("Missing to_email")
# If compose requested, use Claude to write the body
if compose_prompt and not body:
draft_prompt = (
f"You are drafting an email on behalf of Myron Blair.\n"
f"To: {to_name or to_email}\n"
f"Subject: {subject}\n\n"
f"Instructions: {compose_prompt}\n\n"
"Write a professional, concise email body. No subject line, no salutation header. "
"Start with 'Hi [name],' or similar. Sign off as 'Myron Blair'."
)
body = await llm_call([{"role": "user", "content": draft_prompt}], "claude")
if not subject:
raise ValueError("Missing subject")
if not body:
raise ValueError("Missing email body — provide body or compose_prompt")
log.info(f"[COMMS] Sending email: {account}{to_email} | {subject[:60]}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: _smtp_send_sync(account, to_email, to_name, subject, body, reply_msg_id)
)
# Record in email_sent
status = "sent" if result["success"] else "failed"
await db_execute(
"""INSERT INTO email_sent
(account, to_email, to_name, subject, body, triage_id, status, error, message_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(account, to_email, to_name, subject, body,
triage_id or None, status,
result.get("error", ""), result.get("message_id", ""))
)
# Update triage item if this was a reply
if triage_id and result["success"]:
await db_execute(
"UPDATE email_triage SET action_taken='replied' WHERE id=%s", (int(triage_id),)
)
log.info(f"[COMMS] Send result: {status}{to_email}")
return {
"success": result["success"],
"account": account,
"to_email": to_email,
"subject": subject,
"status": status,
"message_id": result.get("message_id", ""),
"error": result.get("error", ""),
"body": body,
}
async def handle_compose_email(payload: dict) -> dict:
"""
Compose an email from natural-language instructions — draft only (no auto-send).
payload: { to_email, to_name, subject_hint, instructions, account, send: false }
"""
to_email = payload.get("to_email", "").strip()
to_name = payload.get("to_name", "").strip()
subject_hint = payload.get("subject_hint", "").strip()
instructions = payload.get("instructions", "").strip()
account = payload.get("account", "gmail")
auto_send = bool(payload.get("send", False))
if not instructions:
raise ValueError("Missing instructions for compose")
# Generate subject if not given
if not subject_hint:
subj_prompt = f"Write a concise email subject line (max 8 words) for an email about: {instructions}"
subject_hint = (await llm_call([{"role": "user", "content": subj_prompt}], "claude")).strip().strip('"')
# Draft full body
draft_prompt = (
f"You are drafting an email for Myron Blair.\n"
f"To: {to_name or to_email or 'the recipient'}\n"
f"Subject: {subject_hint}\n\n"
f"Instructions: {instructions}\n\n"
"Write a professional, concise email. Start with 'Hi [name],' or 'Hello,' "
"and sign off as 'Myron Blair'. Return only the email body text."
)
body = await llm_call([{"role": "user", "content": draft_prompt}], "claude")
if auto_send and to_email:
return await handle_send_email({
"account": account,
"to_email": to_email,
"to_name": to_name,
"subject": subject_hint,
"body": body,
})
# Draft-only — save to email_sent with status='queued' for review
draft_id = await db_execute(
"""INSERT INTO email_sent
(account, to_email, to_name, subject, body, status)
VALUES (%s,%s,%s,%s,%s,'queued')""",
(account, to_email, to_name, subject_hint, body)
)
log.info(f"[COMMS] Composed draft #{draft_id}: {to_email} | {subject_hint[:60]}")
return {
"draft_id": draft_id,
"to_email": to_email,
"to_name": to_name,
"subject": subject_hint,
"body": body,
"account": account,
"sent": False,
"status": "queued",
}
async def handle_schedule_event(payload: dict) -> dict:
"""
Create or find an appointment in the JARVIS planner from natural language.
payload: { title, description, start_at, end_at, location, category,
natural_language, all_day, reminder_min, generate_ics }
"""
nl = payload.get("natural_language", "").strip()
title = payload.get("title", "").strip()
start_at = payload.get("start_at", "")
end_at = payload.get("end_at", "")
location = payload.get("location", "")
category = payload.get("category", "work")
all_day = bool(payload.get("all_day", False))
reminder = int(payload.get("reminder_min", 30))
description = payload.get("description", "")
gen_ics = bool(payload.get("generate_ics", True))
# If natural language given, parse with Claude
if nl and (not title or not start_at):
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
parse_prompt = (
f"Current datetime: {now_str}\n\n"
f"Parse this scheduling request into JSON:\n\"{nl}\"\n\n"
"Return ONLY valid JSON with these fields (no markdown):\n"
'{"title":"...","start_at":"YYYY-MM-DD HH:MM:SS","end_at":"YYYY-MM-DD HH:MM:SS or null",'
'"location":"...","category":"personal|work|medical|other","all_day":false,'
'"description":"..."}\n'
"Use 24h time. Default duration 1 hour if not specified. "
"If only a date (no time) is given, set all_day=true."
)
raw = await llm_call([{"role": "user", "content": parse_prompt}], "claude")
raw = raw.strip().strip("```json").strip("```").strip()
try:
parsed = json.loads(raw)
title = title or parsed.get("title", nl[:100])
start_at = start_at or parsed.get("start_at", "")
end_at = end_at or parsed.get("end_at") or ""
location = location or parsed.get("location", "")
category = parsed.get("category", category)
all_day = parsed.get("all_day", all_day)
description = description or parsed.get("description", "")
except Exception as e:
log.warning(f"[COMMS] Schedule parse failed: {e} — raw: {raw[:100]}")
if not title:
title = nl[:100]
if not title:
raise ValueError("Could not determine event title from input")
if not start_at:
raise ValueError("Could not determine start time from input")
# Insert into appointments table
appt_id = await db_execute(
"""INSERT INTO appointments
(title, description, category, start_at, end_at, location,
all_day, reminder_min, external_source)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,'manual')""",
(title, description, category, start_at,
end_at or None, location, int(all_day), reminder)
)
# Generate ICS if requested
ics_content = ""
if gen_ics and start_at:
try:
from datetime import datetime as dt
uid = f"jarvis-{appt_id}@orbishosting.com"
dtstart = start_at.replace(" ", "T").replace("-", "").replace(":", "")
dtend = (end_at or start_at).replace(" ", "T").replace("-", "").replace(":", "")
dtstamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
ics_content = (
"BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//JARVIS//EN\r\n"
"BEGIN:VEVENT\r\n"
f"UID:{uid}\r\n"
f"DTSTAMP:{dtstamp}\r\n"
f"DTSTART:{dtstart}\r\n"
f"DTEND:{dtend}\r\n"
f"SUMMARY:{title}\r\n"
f"DESCRIPTION:{description}\r\n"
f"LOCATION:{location}\r\n"
"END:VEVENT\r\nEND:VCALENDAR\r\n"
)
except Exception:
pass
log.info(f"[COMMS] Event scheduled: #{appt_id} '{title}' @ {start_at}")
return {
"appointment_id": appt_id,
"title": title,
"start_at": start_at,
"end_at": end_at,
"location": location,
"category": category,
"all_day": all_day,
"ics": ics_content,
"description": description,
}
async def handle_meeting_prep(payload: dict) -> dict:
"""
Prepare a briefing for an upcoming meeting.
payload: { appointment_id OR title OR timeframe, research: true }
"""
appt_id = payload.get("appointment_id")
title = payload.get("title", "").strip()
timeframe = payload.get("timeframe", "today")
do_research = bool(payload.get("research", True))
# Find the appointment
appt = None
if appt_id:
appt = await db_fetchone("SELECT * FROM appointments WHERE id=%s", (int(appt_id),))
elif title:
appt = await db_fetchone(
"SELECT * FROM appointments WHERE title LIKE %s AND start_at >= NOW() ORDER BY start_at LIMIT 1",
(f"%{title}%",)
)
else:
appt = await db_fetchone(
"""SELECT * FROM appointments
WHERE start_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 24 HOUR)
ORDER BY start_at LIMIT 1"""
)
if not appt:
# Check tasks too
task = await db_fetchone(
"SELECT * FROM tasks WHERE title LIKE %s AND status != 'done' ORDER BY due_date LIMIT 1",
(f"%{title}%",)
)
if task:
appt = {"title": task["title"], "description": task.get("notes",""),
"start_at": str(task.get("due_date") or ""), "location": ""}
if not appt:
return {"success": False, "error": f"No upcoming appointment found matching '{title or timeframe}'"}
appt_title = appt.get("title", "")
appt_start = str(appt.get("start_at", ""))
appt_desc = appt.get("description", "")
appt_loc = appt.get("location", "")
# Optional: kick off a research job on the meeting topic/attendees
research_summary = ""
if do_research and appt_title:
try:
research_result = await handle_research({
"query": f"background information on: {appt_title}",
"depth": "quick",
"provider": "claude",
})
research_summary = research_result.get("synthesis", "")[:1500]
except Exception:
pass
# Build meeting briefing
context = (
f"Meeting: {appt_title}\n"
f"Time: {appt_start}\n"
f"Location: {appt_loc or 'Not specified'}\n"
f"Notes: {appt_desc or 'None'}\n"
)
if research_summary:
context += f"\nBackground Research:\n{research_summary}"
prompt = (
f"You are JARVIS. Prepare a concise meeting briefing for Myron Blair.\n\n"
f"{context}\n\n"
"Format: Meeting summary → Key context/background → Suggested talking points → "
"Action items to prepare. Keep it sharp and actionable. Iron Man style."
)
briefing = await llm_call([{"role": "user", "content": prompt}], "claude")
log.info(f"[COMMS] Meeting prep complete: '{appt_title}'")
return {
"appointment_id": appt.get("id"),
"title": appt_title,
"start_at": appt_start,
"location": appt_loc,
"briefing": briefing,
"has_research": bool(research_summary),
}
# Register Phase 6 handlers
JOB_HANDLERS["send_email"] = handle_send_email
JOB_HANDLERS["compose_email"] = handle_compose_email
JOB_HANDLERS["schedule_event"] = handle_schedule_event
JOB_HANDLERS["meeting_prep"] = handle_meeting_prep
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 7: MISSION OPS — multi-step automated workflows
# ═══════════════════════════════════════════════════════════════════════════════
import re as _re
def _render_template(text: str, context: dict) -> str:
"""Replace {{key.subkey}} tokens in a string/JSON with values from context."""
if not isinstance(text, str):
return text
def replacer(m):
path = m.group(1).strip().split(".")
val = context
for p in path:
if isinstance(val, dict):
val = val.get(p, m.group(0))
else:
return m.group(0)
return str(val) if not isinstance(val, (dict, list)) else json.dumps(val)
return _re.sub(r'\{\{([^}]+)\}\}', replacer, text)
def _apply_templates(payload: Any, context: dict) -> Any:
"""Recursively apply template substitution to a payload dict/list/str."""
if isinstance(payload, str):
return _render_template(payload, context)
if isinstance(payload, dict):
return {k: _apply_templates(v, context) for k, v in payload.items()}
if isinstance(payload, list):
return [_apply_templates(v, context) for v in payload]
return payload
async def _execute_mission(mission_id: int, trigger_source: str = "manual") -> dict:
"""Run all steps of a mission sequentially, log results, return summary."""
mission = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,))
if not mission:
return {"error": f"Mission {mission_id} not found"}
steps = await db_fetchall(
"SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC",
(mission_id,)
)
if not steps:
return {"error": "Mission has no steps"}
# Create run record
run_id = await db_execute(
"INSERT INTO mission_runs (mission_id, status, trigger_source, steps_log) VALUES (%s,'running',%s,'[]')",
(mission_id, trigger_source)
)
await db_execute(
"UPDATE missions SET last_run_at=NOW(), run_count=run_count+1 WHERE id=%s",
(mission_id,)
)
context = {"trigger": {"source": trigger_source, "mission_id": mission_id}}
steps_log = []
overall_status = "done"
for i, step in enumerate(steps):
step_label = step.get("label") or step["job_type"]
raw_payload = step.get("job_payload") or {}
if isinstance(raw_payload, str):
try:
raw_payload = json.loads(raw_payload)
except Exception:
raw_payload = {}
payload = _apply_templates(raw_payload, context)
step_entry = {
"step": i,
"label": step_label,
"job_type": step["job_type"],
"status": "running",
"result": None,
"error": None,
}
try:
handler = JOB_HANDLERS.get(step["job_type"])
if not handler:
raise ValueError(f"Unknown job type: {step['job_type']}")
result = await handler(payload)
step_entry["status"] = "done"
step_entry["result"] = result
context[f"step_{i}"] = result
log.info(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) done")
except Exception as exc:
step_entry["status"] = "failed"
step_entry["error"] = str(exc)[:500]
log.warning(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) failed: {exc}")
if not step.get("continue_on_failure"):
overall_status = "failed"
steps_log.append(step_entry)
break
steps_log.append(step_entry)
await db_execute(
"UPDATE mission_runs SET status=%s, steps_log=%s, completed_at=NOW() WHERE id=%s",
(overall_status, json.dumps(steps_log, default=str), run_id)
)
return {
"run_id": run_id,
"status": overall_status,
"steps": len(steps_log),
"steps_log": steps_log,
}
async def handle_run_mission(payload: dict) -> dict:
mission_id = int(payload.get("mission_id") or 0)
if not mission_id:
return {"error": "Missing mission_id"}
source = payload.get("trigger_source", "manual")
return await _execute_mission(mission_id, source)
JOB_HANDLERS["run_mission"] = handle_run_mission
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 8: MISSION DIRECTIVES — OKR / goal tracking with AI review
# ═══════════════════════════════════════════════════════════════════════════════
async def handle_directive_review(payload: dict) -> dict:
"""AI-powered review of active directives: progress analysis + recommendations."""
directive_id = payload.get("directive_id")
provider = payload.get("provider", "claude")
# Fetch directives
if directive_id:
dirs = await db_fetchall(
"SELECT * FROM directives WHERE id=%s", (directive_id,)
)
else:
dirs = await db_fetchall(
"SELECT * FROM directives WHERE status='active' ORDER BY priority DESC LIMIT 10"
)
if not dirs:
return {"error": "No active directives found"}
dir_ids = [d["id"] for d in dirs]
placeholders = ",".join(["%s"] * len(dir_ids))
key_results = await db_fetchall(
f"SELECT * FROM directive_key_results WHERE directive_id IN ({placeholders}) ORDER BY directive_id,id",
dir_ids
) or []
links = await db_fetchall(
f"""SELECT dl.directive_id, dl.link_type, dl.note,
COALESCE(t.title, a.title) AS linked_title,
COALESCE(t.status, 'scheduled') AS linked_status
FROM directive_links dl
LEFT JOIN tasks t ON dl.link_type='task' AND t.id=dl.link_id
LEFT JOIN appointments a ON dl.link_type='appointment' AND a.id=dl.link_id
WHERE dl.directive_id IN ({placeholders})""",
dir_ids
) or []
# Build context block
context_lines = []
for d in dirs:
d_krs = [kr for kr in key_results if kr["directive_id"] == d["id"]]
d_links = [lk for lk in links if lk["directive_id"] == d["id"]]
kr_sum_cur = sum(float(kr["current_value"]) for kr in d_krs)
kr_sum_tgt = sum(float(kr["target_value"]) for kr in d_krs) or 1
pct = round(kr_sum_cur / kr_sum_tgt * 100, 1)
context_lines.append(f"\n## DIRECTIVE: {d['title']} ({d['category'].upper()}) — {pct}% complete")
context_lines.append(f" Status: {d['status']} | Priority: {d['priority']}/10 | Target: {d.get('target_date') or 'no date'}")
if d.get("description"):
context_lines.append(f" Description: {d['description']}")
for kr in d_krs:
context_lines.append(f" KR: {kr['title']}{kr['current_value']}/{kr['target_value']} {kr['unit']}")
for lk in d_links:
status = lk.get("linked_status") or ""
context_lines.append(f" Linked {lk['link_type']}: {lk.get('linked_title') or lk.get('note') or '—'} [{status}]")
context = "\n".join(context_lines)
today = datetime.utcnow().strftime("%Y-%m-%d")
prompt = f"""You are JARVIS, an AI assistant conducting a Mission Directives review for your principal.
Today is {today}.
{context}
Provide a concise executive briefing covering:
1. Overall progress summary (2-3 sentences)
2. For each directive: current status, what's on track, what needs attention
3. Top 3 recommended next actions to move the highest-priority directives forward
4. Any directives at risk of missing their target date
Keep the tone confident and action-oriented. Format with clear sections. Under 400 words."""
review_text = ""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"},
json={"model": CLAUDE_MODEL, "max_tokens": 600, "messages": [{"role": "user", "content": prompt}]},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
rdata = await resp.json()
review_text = rdata.get("content", [{}])[0].get("text", "")
except Exception as e:
review_text = f"[Review generation failed: {e}]"
# Store in conversations so HUD can surface it
await db_execute(
"INSERT INTO conversations (session_id, role, message, created_at) VALUES ('guardian','assistant',%s,NOW())",
(review_text,)
)
return {
"review": review_text,
"directives": len(dirs),
"provider": provider,
}
JOB_HANDLERS["directive_review"] = handle_directive_review
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 10: MEMORY CORE — auto-extraction knowledge graph
# ═══════════════════════════════════════════════════════════════════════════════
_MEMORY_EXTRACT_PROMPT = """Extract factual information about the user from this conversation exchange. Focus on:
- Preferences (likes/dislikes, preferred ways of working)
- People they mention (names, relationships, roles)
- Places (home, work, locations)
- Routines (schedules, habits)
- Goals (things they want to achieve)
- Instructions to JARVIS (always/never rules)
- Key facts about their life/work/projects
Return a JSON array. Each element: {{"category": "<preference|person|place|routine|goal|fact|instruction>", "subject": "<short label>", "predicate": "<short verb/attribute>", "object": "<the value>", "confidence": <0.0-1.0>}}
Rules:
- Only extract clearly stated facts, not speculation
- subject/predicate should be concise (under 50 chars each)
- confidence: 0.95 for explicit statements, 0.75-0.90 for clear implications
- Skip ephemeral info (what they asked just now, current queries)
- Max 8 facts per exchange
- Return [] if nothing durable worth remembering
Exchange:
USER: {user_msg}
JARVIS: {asst_msg}
Return ONLY valid JSON array, nothing else."""
async def handle_memory_extract(payload: dict) -> dict:
user_msg = str(payload.get("user_message", "")).strip()
asst_msg = str(payload.get("assistant_message", "")).strip()
conv_id = payload.get("conversation_id")
if not user_msg and not asst_msg:
return {"ok": False, "reason": "empty exchange"}
prompt = _MEMORY_EXTRACT_PROMPT.format(
user_msg=user_msg[:800],
asst_msg=asst_msg[:800]
)
raw = ""
try:
# Use Haiku for speed/cost; fall back to Groq
headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}
body = {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 800,
"messages": [{"role": "user", "content": prompt}]
}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.anthropic.com/v1/messages", json=body,
headers=headers, timeout=aiohttp.ClientTimeout(total=20)) as resp:
if resp.status == 200:
data = await resp.json()
raw = data["content"][0]["text"].strip()
else:
raise RuntimeError(f"Claude {resp.status}")
except Exception as e:
log.warning(f"[MEMORY] Claude extract failed ({e}), trying Groq")
try:
raw = await llm_call([{"role": "user", "content": prompt}], provider="groq")
except Exception as e2:
log.error(f"[MEMORY] All providers failed: {e2}")
return {"ok": False, "reason": str(e2)}
# Parse JSON — strip code fences if present
if "```" in raw:
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
raw = raw.strip()
try:
facts = json.loads(raw)
if not isinstance(facts, list):
facts = []
except Exception:
log.warning(f"[MEMORY] JSON parse failed. Raw: {raw[:200]}")
return {"ok": False, "reason": "json_parse_failed"}
valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"}
inserted = 0
for f in facts:
if not isinstance(f, dict):
continue
subj = str(f.get("subject", "")).strip()[:255]
pred = str(f.get("predicate", "is")).strip()[:255]
obj = str(f.get("object", "")).strip()
cat = f.get("category", "fact")
conf = min(1.0, max(0.0, float(f.get("confidence", 0.85))))
if not subj or not obj or cat not in valid_cats:
continue
try:
await db_execute(
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source, conversation_id)
VALUES (%s, %s, %s, %s, %s, 'conversation', %s)
ON DUPLICATE KEY UPDATE
object=VALUES(object),
confidence=GREATEST(confidence, VALUES(confidence)),
confirmed_count=confirmed_count+1,
last_confirmed_at=NOW(),
active=1""",
(cat, subj, pred, obj, conf, conv_id)
)
inserted += 1
except Exception as e:
log.warning(f"[MEMORY] Insert ({subj},{pred}) failed: {e}")
log.info(f"[MEMORY] Stored {inserted}/{len(facts)} facts from conv {conv_id}")
return {"ok": True, "extracted": inserted, "candidates": len(facts)}
async def handle_memory_store(payload: dict) -> dict:
"""Explicit memory insertion — from 'remember that X' voice commands."""
subject = str(payload.get("subject", "user")).strip()[:255]
predicate = str(payload.get("predicate", "note")).strip()[:255]
obj = str(payload.get("object", "")).strip()
category = payload.get("category", "fact")
if not obj:
return {"ok": False, "reason": "empty object"}
valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"}
if category not in valid_cats:
category = "fact"
fid = await db_execute(
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source)
VALUES (%s, %s, %s, %s, 1.0, 'explicit')
ON DUPLICATE KEY UPDATE
object=VALUES(object), confidence=1.0,
confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""",
(category, subject, predicate, obj)
)
log.info(f"[MEMORY] Explicit store: [{category}] {subject} {predicate}: {obj}")
return {"ok": True, "id": fid}
JOB_HANDLERS["memory_extract"] = handle_memory_extract
JOB_HANDLERS["memory_store"] = handle_memory_store
# ═══════════════════════════════════════════════════════════════════════════════
# PHASE 9: CLEARANCE PROTOCOL — approval gating for high-risk operations
# ═══════════════════════════════════════════════════════════════════════════════
def _clearance_describe(job_type: str, payload: dict) -> str:
"""Human-readable description of what the job would do."""
if job_type == "shell":
cmd = payload.get("command", payload.get("cmd", "?"))
agent = payload.get("agent_id", payload.get("agent", "unknown"))
return f"Execute shell command on agent '{agent}': {str(cmd)[:120]}"
if job_type == "send_email":
to = payload.get("to_email") or payload.get("target", "?")
subj = payload.get("subject", "")
tid = payload.get("triage_id")
if tid:
return f"Send SMTP reply to triage item #{tid} (to: {to})"
return f"Send email to {to}" + (f" — {subj}" if subj else "")
if job_type == "remote_exec":
cmd_type = payload.get("command_type", payload.get("command", "?"))
agent = payload.get("agent_id", payload.get("agent", "?"))
return f"Remote execute '{cmd_type}' on agent '{agent}'"
if job_type == "purge":
return "Purge all completed/failed jobs from the Arc Reactor queue"
return f"Execute {job_type} job"
async def _check_clearance(job_type: str, payload: dict, created_by: str) -> Optional[dict]:
"""
Returns a clearance-pending response dict if approval is required,
or None if the job can proceed immediately.
"""
rule = await db_fetchone(
"SELECT * FROM clearance_rules WHERE job_type=%s AND enabled=1 AND require_approval=1",
(job_type,)
)
if not rule:
return None
desc = _clearance_describe(job_type, payload)
expires = None
if rule.get("auto_approve_after_min") is not None:
expires = datetime.utcnow() + timedelta(minutes=int(rule["auto_approve_after_min"]))
cr_id = await db_execute(
"INSERT INTO clearance_requests (job_type,job_payload,risk_level,description,requested_by,status,expires_at) "
"VALUES (%s,%s,%s,%s,%s,'pending',%s)",
(job_type, json.dumps(payload), rule["risk_level"], desc, created_by, expires)
)
log.warning(f"[CLEARANCE] Request #{cr_id}{rule['risk_level'].upper()}{desc}")
return {
"status": "pending_clearance",
"clearance_id": cr_id,
"risk_level": rule["risk_level"],
"description": desc,
"message": f"◈ CLEARANCE REQUIRED — {rule['risk_level'].upper()} risk operation intercepted. Approval needed before execution.",
}
async def _dispatch_cleared_job(cr_id: int, job_type: str, payload: dict, priority: int = 7, decided_by: str = "admin") -> dict:
"""Approve a clearance request and dispatch the job into the queue."""
jid = await db_execute(
"INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)",
(job_type, json.dumps(payload), priority, f"clearance:{cr_id}")
)
await db_execute(
"UPDATE clearance_requests SET status='approved', decided_by=%s, arc_job_id=%s, decided_at=NOW() WHERE id=%s",
(decided_by, jid, cr_id)
)
log.info(f"[CLEARANCE] Request #{cr_id} approved by {decided_by} → Job #{jid}")
return {"job_id": jid, "clearance_id": cr_id, "status": "approved"}
# ── Clearance watchdog ─────────────────────────────────────────────────────────
async def clearance_watchdog() -> None:
"""Expire timed-out pending requests; auto-approve those with auto_approve_after_min set."""
log.info("Clearance watchdog started")
await asyncio.sleep(20)
while True:
try:
now = datetime.utcnow()
# Expire requests past their expiry with no auto-approve (expires_at IS NULL means never expires)
expired = await db_fetchall(
"SELECT cr.*, cru.auto_approve_after_min FROM clearance_requests cr "
"JOIN clearance_rules cru ON cru.job_type=cr.job_type "
"WHERE cr.status='pending' AND cr.expires_at IS NOT NULL AND cr.expires_at <= %s",
(now,)
) or []
for req in expired:
if req.get("auto_approve_after_min") is not None:
# Auto-approve: dispatch the job
payload = req.get("job_payload") or {}
if isinstance(payload, str):
try: payload = json.loads(payload)
except Exception: payload = {}
await _dispatch_cleared_job(req["id"], req["job_type"], payload, decided_by="auto_approve")
log.info(f"[CLEARANCE] Auto-approved request #{req['id']} ({req['job_type']})")
else:
await db_execute(
"UPDATE clearance_requests SET status='expired', decided_at=NOW() WHERE id=%s",
(req["id"],)
)
log.info(f"[CLEARANCE] Expired request #{req['id']} ({req['job_type']})")
except Exception as exc:
log.error(f"Clearance watchdog error: {exc}")
await asyncio.sleep(60)
# ── Mission trigger loop ───────────────────────────────────────────────────────
_mission_trigger_state: dict = {} # mission_id → last_triggered ISO
async def mission_trigger_loop() -> None:
"""Check scheduled and event-based mission triggers every 30 seconds."""
log.info("Mission trigger loop started")
await asyncio.sleep(15) # stagger startup
while True:
try:
missions = await db_fetchall(
"SELECT * FROM missions WHERE enabled=1 AND trigger_type != 'manual'"
)
for m in missions:
mid = m["id"]
ttype = m["trigger_type"]
cfg = m.get("trigger_config") or {}
if isinstance(cfg, str):
try: cfg = json.loads(cfg)
except Exception: cfg = {}
should_run = False
source = ttype
if ttype == "schedule":
interval_min = int(cfg.get("interval_minutes", 60))
last_run = m.get("last_run_at")
if last_run is None:
should_run = True
else:
last_dt = last_run if isinstance(last_run, datetime) else datetime.fromisoformat(str(last_run))
elapsed = (datetime.utcnow() - last_dt).total_seconds() / 60
should_run = elapsed >= interval_min
elif ttype == "guardian_event":
severity = cfg.get("severity", "")
etype = cfg.get("event_type", "")
last_key = f"ge_{mid}"
last_seen = _mission_trigger_state.get(last_key, "1970-01-01")
wheres = ["acknowledged=0", "created_at > %s"]
params: list = [last_seen]
if severity: wheres.append("severity=%s"); params.append(severity)
if etype: wheres.append("event_type=%s"); params.append(etype)
row = await db_fetchone(
f"SELECT id, created_at FROM guardian_events WHERE {' AND '.join(wheres)} ORDER BY created_at DESC LIMIT 1",
params
)
if row:
should_run = True
_mission_trigger_state[last_key] = str(row["created_at"])
source = f"guardian_event:{row['id']}"
elif ttype == "email_keyword":
keywords = cfg.get("keywords", [])
category = cfg.get("category", "")
last_key = f"ek_{mid}"
last_seen = _mission_trigger_state.get(last_key, "1970-01-01")
if keywords:
kw_conds = " OR ".join(["subject LIKE %s OR summary LIKE %s"] * len(keywords))
kw_params = []
for kw in keywords:
kw_params += [f"%{kw}%", f"%{kw}%"]
where = f"created_at > %s AND ({kw_conds})"
params = [last_seen] + kw_params
if category:
where += " AND category=%s"
params.append(category)
row = await db_fetchone(
f"SELECT id, created_at FROM email_triage WHERE {where} ORDER BY created_at DESC LIMIT 1",
params
)
if row:
should_run = True
_mission_trigger_state[last_key] = str(row["created_at"])
source = f"email_triage:{row['id']}"
if should_run:
log.info(f"[MISSION TRIGGER] Mission {mid} ({m['name']}) triggered by {source}")
asyncio.create_task(_execute_mission(mid, source))
except Exception as exc:
log.error(f"Mission trigger loop error: {exc}")
await asyncio.sleep(30)
# ═══════════════════════════════════════════════════════════════════════════════
# JOB RUNNER + BACKGROUND TASKS
# ═══════════════════════════════════════════════════════════════════════════════
_running_jobs: set = set()
_stats = {"done": 0, "failed": 0}
async def run_job(job: dict) -> None:
jid = job["id"]
jtype = job["job_type"]
_running_jobs.add(jid)
try:
payload = job.get("payload") or {}
if isinstance(payload, str):
payload = json.loads(payload)
await db_execute("UPDATE arc_jobs SET status='running', started_at=NOW() WHERE id=%s", (jid,))
log.info(f"[JOB {jid}] Starting {jtype}")
handler = JOB_HANDLERS.get(jtype)
if not handler:
raise ValueError(f"Unknown job type: {jtype}")
result = await handler(payload)
result_str = json.dumps(result, default=str)
await db_execute("UPDATE arc_jobs SET status='done', result=%s, completed_at=NOW() WHERE id=%s", (result_str, jid))
_stats["done"] += 1
log.info(f"[JOB {jid}] Done: {jtype}")
except Exception as exc:
err = str(exc)
await db_execute("UPDATE arc_jobs SET status='failed', error=%s, completed_at=NOW() WHERE id=%s", (err[:2000], jid))
_stats["failed"] += 1
log.warning(f"[JOB {jid}] Failed: {err[:120]}")
finally:
_running_jobs.discard(jid)
async def job_poller() -> None:
log.info("Arc Reactor job poller started")
while True:
try:
jobs = await db_fetchall("SELECT * FROM arc_jobs WHERE status='queued' ORDER BY priority DESC, id ASC LIMIT 5")
for job in jobs:
if job["id"] not in _running_jobs:
asyncio.create_task(run_job(job))
except Exception as exc:
log.error(f"Poller error: {exc}")
await asyncio.sleep(POLL_INTERVAL)
async def heartbeat_loop() -> None:
while True:
try:
await db_execute(
"UPDATE arc_status SET last_heartbeat=NOW(), active_jobs=%s, jobs_done=%s, jobs_failed=%s WHERE id=1",
(len(_running_jobs), _stats["done"], _stats["failed"])
)
except Exception as exc:
log.error(f"Heartbeat error: {exc}")
await asyncio.sleep(HEARTBEAT_INTERVAL)
# ═══════════════════════════════════════════════════════════════════════════════
# FASTAPI APP
# ═══════════════════════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info(f"◈ JARVIS Arc Reactor v{VERSION} starting on {HOST}:{PORT}")
await get_pool()
await db_execute("UPDATE arc_status SET started_at=NOW(), last_heartbeat=NOW(), version=%s WHERE id=1", (VERSION,))
asyncio.create_task(job_poller())
asyncio.create_task(heartbeat_loop())
asyncio.create_task(guardian_loop())
asyncio.create_task(mission_trigger_loop())
asyncio.create_task(clearance_watchdog())
log.info(f"◈ Arc Reactor online — {len(JOB_HANDLERS)} handlers: {', '.join(JOB_HANDLERS)}")
yield
log.info("◈ Arc Reactor shutting down")
if _pool and not _pool.closed:
_pool.close()
await _pool.wait_closed()
app = FastAPI(title="JARVIS Arc Reactor", version=VERSION, lifespan=lifespan)
# ── Mission Ops endpoints ──────────────────────────────────────────────────────
@app.get("/missions")
async def missions_list(enabled: Optional[int] = None):
if enabled is not None:
rows = await db_fetchall("SELECT * FROM missions WHERE enabled=%s ORDER BY id DESC", (enabled,))
else:
rows = await db_fetchall("SELECT * FROM missions ORDER BY id DESC")
return rows or []
@app.get("/missions/{mission_id}")
async def mission_get(mission_id: int):
m = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,))
if not m:
raise HTTPException(status_code=404, detail="Not found")
steps = await db_fetchall(
"SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC",
(mission_id,)
)
runs = await db_fetchall(
"SELECT id, status, trigger_source, started_at, completed_at FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT 10",
(mission_id,)
)
return {**m, "steps": steps or [], "recent_runs": runs or []}
@app.post("/missions")
async def mission_create(req: Request):
body = await req.json()
name = body.get("name", "").strip()
if not name:
raise HTTPException(status_code=400, detail="name required")
desc = body.get("description", "")
ttype = body.get("trigger_type", "manual")
tcfg = json.dumps(body.get("trigger_config") or {})
enabled = int(body.get("enabled", 1))
mid = await db_execute(
"INSERT INTO missions (name,description,trigger_type,trigger_config,enabled) VALUES (%s,%s,%s,%s,%s)",
(name, desc, ttype, tcfg, enabled)
)
steps = body.get("steps") or []
for i, s in enumerate(steps):
await db_execute(
"INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)",
(mid, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0)))
)
return {"id": mid, "ok": True}
@app.put("/missions/{mission_id}")
async def mission_update(mission_id: int, req: Request):
body = await req.json()
fields: list = []
params: list = []
for col in ("name", "description", "trigger_type", "enabled"):
if col in body:
fields.append(f"{col}=%s")
params.append(body[col])
if "trigger_config" in body:
fields.append("trigger_config=%s")
params.append(json.dumps(body["trigger_config"]))
if fields:
params.append(mission_id)
await db_execute(f"UPDATE missions SET {','.join(fields)} WHERE id=%s", params)
if "steps" in body:
await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,))
for i, s in enumerate(body["steps"]):
await db_execute(
"INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)",
(mission_id, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0)))
)
return {"ok": True}
@app.delete("/missions/{mission_id}")
async def mission_delete(mission_id: int):
await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,))
await db_execute("DELETE FROM mission_runs WHERE mission_id=%s", (mission_id,))
await db_execute("DELETE FROM missions WHERE id=%s", (mission_id,))
return {"ok": True}
@app.post("/missions/{mission_id}/run")
async def mission_run_endpoint(mission_id: int, req: Request):
try:
body = await req.json()
except Exception:
body = {}
source = body.get("trigger_source", "manual") if isinstance(body, dict) else "manual"
return await _execute_mission(mission_id, source)
@app.get("/missions/{mission_id}/runs")
async def mission_runs_list(mission_id: int, limit: int = 20):
rows = await db_fetchall(
"SELECT * FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT %s",
(mission_id, limit)
)
return rows or []
# ── Clearance FastAPI endpoints ────────────────────────────────────────────────
@app.get("/clearance/pending")
async def clearance_pending():
rows = await db_fetchall(
"SELECT * FROM clearance_requests WHERE status='pending' ORDER BY created_at DESC"
)
return rows or []
@app.get("/clearance/history")
async def clearance_history(limit: int = 50):
rows = await db_fetchall(
"SELECT * FROM clearance_requests ORDER BY created_at DESC LIMIT %s", (limit,)
)
return rows or []
@app.post("/clearance/{cr_id}/approve")
async def clearance_approve(cr_id: int, req: Request):
try: body = await req.json()
except Exception: body = {}
decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin"
cr = await db_fetchone("SELECT * FROM clearance_requests WHERE id=%s AND status='pending'", (cr_id,))
if not cr:
raise HTTPException(status_code=404, detail="Clearance request not found or not pending")
payload = cr.get("job_payload") or {}
if isinstance(payload, str):
try: payload = json.loads(payload)
except Exception: payload = {}
return await _dispatch_cleared_job(cr_id, cr["job_type"], payload, decided_by=decided_by)
@app.post("/clearance/{cr_id}/deny")
async def clearance_deny(cr_id: int, req: Request):
try: body = await req.json()
except Exception: body = {}
decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin"
note = body.get("note", "") if isinstance(body, dict) else ""
await db_execute(
"UPDATE clearance_requests SET status='denied', decided_by=%s, decision_note=%s, decided_at=NOW() WHERE id=%s",
(decided_by, note, cr_id)
)
log.info(f"[CLEARANCE] Request #{cr_id} denied by {decided_by}")
return {"ok": True, "clearance_id": cr_id, "status": "denied"}
@app.get("/clearance/rules")
async def clearance_rules_list():
rows = await db_fetchall("SELECT * FROM clearance_rules ORDER BY risk_level DESC, job_type ASC")
return rows or []
@app.put("/clearance/rules/{rule_id}")
async def clearance_rule_update(rule_id: int, req: Request):
body = await req.json()
fields: list = []
params: list = []
for col in ("risk_level", "require_approval", "auto_approve_after_min", "enabled", "description"):
if col in body:
fields.append(f"{col}=%s")
params.append(body[col])
if not fields:
raise HTTPException(status_code=400, detail="Nothing to update")
params.append(rule_id)
await db_execute(f"UPDATE clearance_rules SET {','.join(fields)} WHERE id=%s", params)
return {"ok": True}
@app.post("/clearance/rules")
async def clearance_rule_create(req: Request):
body = await req.json()
job_type = body.get("job_type", "").strip()
if not job_type:
raise HTTPException(status_code=400, detail="job_type required")
rid = await db_execute(
"INSERT INTO clearance_rules (job_type,risk_level,require_approval,auto_approve_after_min,description,enabled) "
"VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE "
"risk_level=%s,require_approval=%s,auto_approve_after_min=%s,description=%s,enabled=%s",
(job_type,
body.get("risk_level","high"), int(body.get("require_approval",1)),
body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)),
body.get("risk_level","high"), int(body.get("require_approval",1)),
body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)))
)
return {"ok": True, "id": rid}
# ── Memory Core FastAPI endpoints ──────────────────────────────────────────────
@app.get("/memory/facts")
async def memory_facts_list(limit: int = 100, category: str = "", search: str = ""):
where = "WHERE active=1"
params: list = []
if category:
where += " AND category=%s"
params.append(category)
if search:
where += " AND (subject LIKE %s OR predicate LIKE %s OR object LIKE %s)"
params += [f"%{search}%"] * 3
rows = await db_fetchall(
f"SELECT * FROM memory_facts {where} ORDER BY confirmed_count DESC, last_confirmed_at DESC LIMIT %s",
params + [limit]
)
return rows or []
@app.post("/memory/facts")
async def memory_fact_create(req: Request):
body = await req.json()
subj = str(body.get("subject", "")).strip()[:255]
pred = str(body.get("predicate", "is")).strip()[:255]
obj = str(body.get("object", "")).strip()
cat = body.get("category", "fact")
if not subj or not obj:
raise HTTPException(status_code=400, detail="subject and object required")
valid = {"preference","person","place","routine","goal","fact","instruction"}
if cat not in valid: cat = "fact"
fid = await db_execute(
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source)
VALUES (%s,%s,%s,%s,1.0,'explicit')
ON DUPLICATE KEY UPDATE
object=VALUES(object), confidence=1.0,
confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""",
(cat, subj, pred, obj)
)
return {"ok": True, "id": fid}
@app.delete("/memory/facts/{fact_id}")
async def memory_fact_delete(fact_id: int):
await db_execute("UPDATE memory_facts SET active=0 WHERE id=%s", (fact_id,))
return {"ok": True}
@app.delete("/memory/facts")
async def memory_facts_clear(category: str = ""):
if category:
await db_execute("UPDATE memory_facts SET active=0 WHERE category=%s", (category,))
else:
await db_execute("UPDATE memory_facts SET active=0 WHERE active=1", ())
return {"ok": True}
@app.get("/memory/context")
async def memory_context(message: str = "", limit: int = 15):
"""Return relevant memory facts for a given message (for prompt injection)."""
params: list = []
if message.strip():
words = [w for w in message.lower().split() if len(w) > 3][:10]
if words:
like_clauses = " OR ".join(["subject LIKE %s OR object LIKE %s"] * len(words))
for w in words:
params += [f"%{w}%", f"%{w}%"]
rows = await db_fetchall(
f"SELECT * FROM memory_facts WHERE active=1 AND ({like_clauses}) "
f"ORDER BY confirmed_count DESC, confidence DESC LIMIT %s",
params + [limit]
) or []
# Pad with top general facts if sparse
if len(rows) < 5:
existing = [r["id"] for r in rows]
excl = ("AND id NOT IN (" + ",".join(["%s"]*len(existing)) + ")") if existing else ""
extra = await db_fetchall(
f"SELECT * FROM memory_facts WHERE active=1 {excl} "
f"ORDER BY confirmed_count DESC LIMIT %s",
existing + [max(1, limit - len(rows))]
) or []
rows = rows + extra
else:
rows = await db_fetchall(
"SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,)
) or []
else:
rows = await db_fetchall(
"SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,)
) or []
lines = [f"[{r['category']}] {r['subject']} {r['predicate']}: {r['object']}" for r in rows]
return {"facts": rows, "context": "\n".join(lines) if lines else ""}
@app.get("/memory/stats")
async def memory_stats():
total = await db_fetchone("SELECT COUNT(*) cnt FROM memory_facts WHERE active=1")
by_cat = await db_fetchall(
"SELECT category, COUNT(*) cnt FROM memory_facts WHERE active=1 GROUP BY category ORDER BY cnt DESC"
)
recent = await db_fetchall(
"SELECT * FROM memory_facts WHERE active=1 ORDER BY last_confirmed_at DESC LIMIT 5"
)
return {
"total": total["cnt"] if total else 0,
"by_category": by_cat or [],
"recent": recent or [],
}
@app.get("/status")
async def status():
row = await db_fetchone("SELECT * FROM arc_status WHERE id=1")
queued = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='queued'")
running = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='running'")
return {
"online": True, "version": VERSION,
"last_heartbeat": row["last_heartbeat"].isoformat() if row and row["last_heartbeat"] else None,
"started_at": row["started_at"].isoformat() if row and row["started_at"] else None,
"jobs_done": row["jobs_done"] if row else 0,
"jobs_failed": row["jobs_failed"] if row else 0,
"active_jobs": len(_running_jobs),
"queued_jobs": queued["cnt"] if queued else 0,
"running_jobs": running["cnt"] if running else 0,
"handlers": list(JOB_HANDLERS.keys()),
}
@app.post("/job")
async def create_job(request: Request):
body = await request.json()
jtype = body.get("type", "")
payload = body.get("payload", {})
priority = int(body.get("priority", 5))
created_by = body.get("created_by", "jarvis")
if not jtype:
raise HTTPException(status_code=400, detail="Missing job type")
clearance = await _check_clearance(jtype, payload, created_by)
if clearance:
return clearance
jid = await db_execute(
"INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)",
(jtype, json.dumps(payload), priority, created_by)
)
return {"job_id": jid, "status": "queued"}
@app.get("/job/{job_id}")
async def get_job(job_id: int):
job = await db_fetchone("SELECT * FROM arc_jobs WHERE id=%s", (job_id,))
if not job:
raise HTTPException(status_code=404, detail="Job not found")
result = job.get("result")
if result and isinstance(result, str):
try:
result = json.loads(result)
except Exception:
pass
return {
"id": job["id"], "job_type": job["job_type"], "status": job["status"],
"result": result, "error": job.get("error"),
"created_at": job["created_at"].isoformat() if job["created_at"] else None,
"started_at": job["started_at"].isoformat() if job["started_at"] else None,
"completed_at": job["completed_at"].isoformat() if job["completed_at"] else None,
}
@app.delete("/job/{job_id}")
async def cancel_job(job_id: int):
await db_execute("UPDATE arc_jobs SET status='cancelled' WHERE id=%s AND status='queued'", (job_id,))
return {"ok": True}
@app.get("/jobs")
async def list_jobs(limit: int = 50, status: str = ""):
if status:
rows = await db_fetchall(
"SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs WHERE status=%s ORDER BY id DESC LIMIT %s",
(status, limit)
)
else:
rows = await db_fetchall(
"SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s",
(limit,)
)
for r in rows:
if r.get("created_at"): r["created_at"] = r["created_at"].isoformat()
if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat()
return rows
@app.get("/jobs/recent")
async def recent_jobs(limit: int = 10, job_type: str = ""):
if job_type:
rows = await db_fetchall(
"SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs WHERE job_type=%s ORDER BY id DESC LIMIT %s",
(job_type, limit)
)
else:
rows = await db_fetchall(
"SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s",
(limit,)
)
for r in rows:
if r.get("created_at"): r["created_at"] = r["created_at"].isoformat()
if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat()
if r.get("result") and isinstance(r["result"], str):
try:
r["result"] = json.loads(r["result"])
except Exception:
pass
return rows
@app.get("/triage")
async def get_triage(limit: int = 30, account: str = "gmail"):
rows = await db_fetchall(
"""SELECT id, from_name, from_email, subject, date_received, category, priority,
summary, draft_reply, action_taken, created_at
FROM email_triage WHERE account=%s
ORDER BY priority DESC, created_at DESC LIMIT %s""",
(account, limit)
)
for r in rows:
if r.get("date_received"): r["date_received"] = str(r["date_received"])
if r.get("created_at"): r["created_at"] = str(r["created_at"])
return rows
@app.post("/triage/{triage_id}/action")
async def triage_action(triage_id: int, request: Request):
body = await request.json()
action = body.get("action", "dismissed")
await db_execute("UPDATE email_triage SET action_taken=%s WHERE id=%s", (action, triage_id))
return {"ok": True}
@app.delete("/jobs/purge")
async def purge_jobs():
await db_execute("DELETE FROM arc_jobs WHERE status IN ('done','failed','cancelled') AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)")
return {"ok": True}
# ── VISION PROTOCOL endpoints ─────────────────────────────────────────────────
@app.get("/screenshots")
async def list_screenshots(limit: int = 20, agent: str = ""):
if agent:
rows = await db_fetchall(
"SELECT id, agent_id, hostname, method, width, height, file_size, "
" vision_analysis, vision_provider, created_at "
"FROM agent_screenshots WHERE hostname LIKE %s "
"ORDER BY created_at DESC LIMIT %s",
(f"%{agent}%", limit)
)
else:
rows = await db_fetchall(
"SELECT id, agent_id, hostname, method, width, height, file_size, "
" vision_analysis, vision_provider, created_at "
"FROM agent_screenshots ORDER BY created_at DESC LIMIT %s",
(limit,)
)
return rows or []
@app.get("/screenshots/{screenshot_id}")
async def get_screenshot(screenshot_id: int):
row = await db_fetchone(
"SELECT * FROM agent_screenshots WHERE id=%s", (screenshot_id,)
)
if not row:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Screenshot not found")
return row
@app.delete("/screenshots/{screenshot_id}")
async def delete_screenshot(screenshot_id: int):
await db_execute("DELETE FROM agent_screenshots WHERE id=%s", (screenshot_id,))
return {"ok": True}
@app.delete("/screenshots/purge")
async def purge_screenshots():
await db_execute("DELETE FROM agent_screenshots WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY)")
return {"ok": True}
# ── GUARDIAN MODE endpoints ───────────────────────────────────────────────────
@app.get("/guardian/status")
async def guardian_status():
cfg = await _guardian_get_config()
counts = await db_fetchone(
"""SELECT
SUM(acknowledged=0) AS unread,
SUM(severity='critical' AND acknowledged=0) AS critical_unread,
SUM(severity='warning' AND acknowledged=0) AS warning_unread,
SUM(created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)) AS events_24h
FROM guardian_events"""
)
return {
"enabled": cfg.get("enabled", "1") == "1",
"scan_interval": int(cfg.get("scan_interval", 120)),
"last_scan": _guardian_state.get("last_scan"),
"last_sitrep": _guardian_state.get("last_sitrep"),
"thresholds": {
"cpu": float(cfg.get("cpu_threshold", 85)),
"memory": float(cfg.get("mem_threshold", 88)),
"disk": float(cfg.get("disk_threshold", 88)),
"offline_minutes": int(cfg.get("offline_minutes", 3)),
},
"counts": dict(counts) if counts else {},
}
@app.get("/guardian/events")
async def guardian_events_list(limit: int = 30, unread: bool = False,
severity: str = "", since: str = ""):
wheres = []
params: list = []
if unread:
wheres.append("acknowledged = 0")
if severity:
wheres.append("severity = %s"); params.append(severity)
if since:
wheres.append("created_at > %s"); params.append(since)
where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else ""
params.append(limit)
rows = await db_fetchall(
f"SELECT * FROM guardian_events {where_sql} ORDER BY created_at DESC LIMIT %s",
params
)
return rows or []
@app.post("/guardian/events/{event_id}/ack")
async def guardian_ack(event_id: int):
await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE id=%s", (event_id,))
return {"ok": True}
@app.post("/guardian/events/ack_all")
async def guardian_ack_all():
await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE acknowledged=0")
return {"ok": True}
@app.get("/guardian/chat")
async def guardian_chat_events(since: str = ""):
"""Return proactive guardian messages injected into conversations."""
if since:
rows = await db_fetchall(
"SELECT id, message, created_at FROM conversations "
"WHERE session_id='guardian' AND created_at > %s ORDER BY created_at ASC",
(since,)
)
else:
rows = await db_fetchall(
"SELECT id, message, created_at FROM conversations "
"WHERE session_id='guardian' ORDER BY created_at DESC LIMIT 10"
)
return rows or []
# ── COMMS v2 endpoints ───────────────────────────────────────────────────────
@app.get("/comms/sent")
async def comms_sent(limit: int = 50, status: str = ""):
if status:
rows = await db_fetchall(
"SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id "
"FROM email_sent WHERE status=%s ORDER BY sent_at DESC LIMIT %s",
(status, limit)
)
else:
rows = await db_fetchall(
"SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id "
"FROM email_sent ORDER BY sent_at DESC LIMIT %s",
(limit,)
)
return rows or []
@app.get("/comms/sent/{sent_id}")
async def comms_sent_get(sent_id: int):
row = await db_fetchone("SELECT * FROM email_sent WHERE id=%s", (sent_id,))
if not row:
raise HTTPException(status_code=404, detail="Not found")
return row
@app.delete("/comms/sent/{sent_id}")
async def comms_sent_delete(sent_id: int):
await db_execute("DELETE FROM email_sent WHERE id=%s", (sent_id,))
return {"ok": True}
if __name__ == "__main__":
uvicorn.run("reactor:app", host=HOST, port=PORT, log_level="info", access_log=False)