mirror of
https://github.com/myronblair/jarvis
synced 2026-06-30 17:50:23 -05:00
7f6397b514
Guardian anomaly alerts and SITREP are pure text reasoning — Groq's llama-3.3-70b-versatile handles them at near-zero cost with lower latency. Vision Protocol image analysis stays on Claude (claude-opus- 4-8) because Groq has no vision models. Text-only sysinfo snapshots (no image captured) also move to Groq. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2774 lines
123 KiB
Python
2774 lines
123 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
JARVIS Arc Reactor — Core Daemon v9.0
|
|
Phase 1: Job queue, ping/echo/shell
|
|
Phase 2: Intel Protocol (research), Iron Protocol (tool loop), LLM router
|
|
Phase 3: Gmail Triage (Comms Protocol), Remote Exec (Field Protocol)
|
|
Phase 4: Vision Protocol (screenshot, vision, sysinfo)
|
|
Phase 5: Guardian Mode (continuous awareness, proactive alerts, SITREP)
|
|
Phase 6: Comms v2 — send email, compose, schedule event, meeting prep
|
|
Phase 7: Mission Ops — multi-step automated workflows with trigger engine
|
|
Phase 8: Mission Directives — OKR/goal tracking with AI progress review
|
|
Phase 9: Clearance Protocol — approval gating for high-risk operations
|
|
"""
|
|
|
|
import asyncio
|
|
import email as _email_lib
|
|
import imaplib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
from email.header import decode_header as _decode_header
|
|
from email.utils import parsedate_to_datetime
|
|
from typing import Any, Optional
|
|
|
|
import aiomysql
|
|
import aiohttp
|
|
import uvicorn
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
|
|
# ── CONFIG ────────────────────────────────────────────────────────────────────
|
|
HOST = "127.0.0.1"
|
|
PORT = 7474
|
|
VERSION = "9.0.0"
|
|
DB_HOST = "localhost"
|
|
DB_PORT = 3306
|
|
DB_USER = "jarvis_user"
|
|
DB_PASS = "J4rv1s_Pr0t0c0l_2026!"
|
|
DB_NAME = "jarvis_db"
|
|
LOG_FILE = "/home/jarvis.orbishosting.com/logs/arc_reactor.log"
|
|
POLL_INTERVAL = 3
|
|
HEARTBEAT_INTERVAL = 30
|
|
|
|
CLAUDE_API_KEY = "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"
|
|
CLAUDE_MODEL = "claude-sonnet-4-6"
|
|
GROQ_API_KEY = "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"
|
|
GROQ_MODEL = "llama-3.3-70b-versatile"
|
|
OLLAMA_HOST = "http://10.48.200.95:11434"
|
|
OLLAMA_MODEL = "llama3.2:1b"
|
|
|
|
GMAIL_USER = "myronblair@gmail.com"
|
|
GMAIL_PASS = "demsvdylwweacbcx"
|
|
ICLOUD_USER = "myronblair@icloud.com"
|
|
ICLOUD_PASS = "yxfi-yvzu-geqk-japr"
|
|
|
|
# ── LOGGING ───────────────────────────────────────────────────────────────────
|
|
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="[%(asctime)s] %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(sys.stdout),
|
|
],
|
|
)
|
|
log = logging.getLogger("arc_reactor")
|
|
|
|
# ── DB POOL ───────────────────────────────────────────────────────────────────
|
|
_pool: Optional[aiomysql.Pool] = None
|
|
|
|
async def get_pool() -> aiomysql.Pool:
|
|
global _pool
|
|
if _pool is None or _pool.closed:
|
|
_pool = await aiomysql.create_pool(
|
|
host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS,
|
|
db=DB_NAME, autocommit=True, minsize=2, maxsize=10, charset="utf8mb4",
|
|
)
|
|
return _pool
|
|
|
|
async def db_execute(sql: str, args: tuple = ()) -> int:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(sql, args)
|
|
return cur.lastrowid
|
|
|
|
async def db_fetchone(sql: str, args: tuple = ()) -> Optional[dict]:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(sql, args)
|
|
return await cur.fetchone()
|
|
|
|
async def db_fetchall(sql: str, args: tuple = ()) -> list:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(sql, args)
|
|
return await cur.fetchall()
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 1 HANDLERS
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def handle_ping(payload: dict) -> dict:
|
|
return {"pong": True, "timestamp": datetime.utcnow().isoformat(), "version": VERSION}
|
|
|
|
async def handle_echo(payload: dict) -> dict:
|
|
return {"echo": payload.get("message", ""), "timestamp": datetime.utcnow().isoformat()}
|
|
|
|
async def handle_shell(payload: dict) -> dict:
|
|
cmd = payload.get("command", "").strip()
|
|
allowed = ("df ", "free ", "uptime", "hostname", "uname", "ps ", "cat /proc/")
|
|
if not any(cmd.startswith(p) for p in allowed):
|
|
raise ValueError(f"Command not in whitelist: {cmd[:40]}")
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
|
|
return {"stdout": stdout.decode().strip(), "stderr": stderr.decode().strip(), "exit_code": proc.returncode}
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 2: LLM ROUTER
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def llm_call(messages: list, provider: str = "claude", system: str = "") -> str:
|
|
if provider == "claude" and CLAUDE_API_KEY:
|
|
return await _claude_call(messages, system)
|
|
elif provider == "groq" and GROQ_API_KEY:
|
|
return await _groq_call(messages, system)
|
|
elif provider == "ollama":
|
|
return await _ollama_call(messages, system)
|
|
for p in ["groq", "ollama"]:
|
|
try:
|
|
return await llm_call(messages, p, system)
|
|
except Exception:
|
|
continue
|
|
raise RuntimeError("All LLM providers failed")
|
|
|
|
async def _claude_call(messages: list, system: str = "") -> str:
|
|
payload = {"model": CLAUDE_MODEL, "max_tokens": 4096, "messages": messages}
|
|
if system:
|
|
payload["system"] = system
|
|
headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post("https://api.anthropic.com/v1/messages", json=payload, headers=headers,
|
|
timeout=aiohttp.ClientTimeout(total=60)) as resp:
|
|
data = await resp.json()
|
|
if resp.status != 200:
|
|
raise RuntimeError(f"Claude API error {resp.status}: {data.get('error',{}).get('message','')}")
|
|
return data["content"][0]["text"]
|
|
|
|
async def _groq_call(messages: list, system: str = "") -> str:
|
|
all_msgs = ([{"role": "system", "content": system}] if system else []) + messages
|
|
payload = {"model": GROQ_MODEL, "messages": all_msgs, "max_tokens": 4096}
|
|
headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post("https://api.groq.com/openai/v1/chat/completions", json=payload,
|
|
headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp:
|
|
data = await resp.json()
|
|
if resp.status != 200:
|
|
raise RuntimeError(f"Groq error {resp.status}")
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
async def _ollama_call(messages: list, system: str = "") -> str:
|
|
prompt = (system + "\n\n" if system else "") + "\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(f"{OLLAMA_HOST}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False},
|
|
timeout=aiohttp.ClientTimeout(total=30)) as resp:
|
|
data = await resp.json()
|
|
return data.get("response", "")
|
|
|
|
async def handle_llm(payload: dict) -> dict:
|
|
message = payload.get("message", "")
|
|
system = payload.get("system", "You are JARVIS, an Iron Man-style AI assistant.")
|
|
provider = payload.get("provider", "claude")
|
|
if not message:
|
|
raise ValueError("Missing message")
|
|
result = await llm_call([{"role": "user", "content": message}], provider, system)
|
|
return {"response": result, "provider": provider}
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 2: INTEL PROTOCOL — RESEARCH ENGINE
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def _web_search(query: str, max_results: int = 6) -> list:
|
|
try:
|
|
from duckduckgo_search import DDGS
|
|
results = []
|
|
with DDGS() as ddgs:
|
|
for r in ddgs.text(query, max_results=max_results):
|
|
results.append({"url": r.get("href",""), "title": r.get("title",""), "snippet": r.get("body","")})
|
|
return results
|
|
except Exception as e:
|
|
log.warning(f"DDG search failed: {e}")
|
|
return []
|
|
|
|
async def _fetch_url_content(url: str, timeout: int = 12) -> str:
|
|
try:
|
|
import trafilatura
|
|
headers = {"User-Agent": "Mozilla/5.0 (compatible; JARVIS-Research/3.0)"}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout),
|
|
allow_redirects=True, ssl=False) as resp:
|
|
if resp.status != 200:
|
|
return ""
|
|
html = await resp.text(errors="replace")
|
|
text = trafilatura.extract(html, include_links=False, include_images=False, no_fallback=False, favor_precision=True)
|
|
return (text or "")[:4000]
|
|
except Exception as e:
|
|
log.debug(f"Fetch failed {url[:60]}: {e}")
|
|
return ""
|
|
|
|
async def handle_research(payload: dict) -> dict:
|
|
query = payload.get("query", "").strip()
|
|
depth = payload.get("depth", "standard")
|
|
provider = payload.get("provider", "claude")
|
|
if not query:
|
|
raise ValueError("Missing research query")
|
|
|
|
max_sources = {"quick": 3, "standard": 5, "deep": 8}.get(depth, 5)
|
|
log.info(f"[INTEL] Research: '{query}' depth={depth} sources={max_sources}")
|
|
|
|
search_results = await _web_search(query, max_results=max_sources + 2)
|
|
if not search_results:
|
|
search_results = [{"url": "", "title": query, "snippet": f"No search results for: {query}"}]
|
|
|
|
fetch_tasks = [_fetch_url_content(r["url"]) for r in search_results if r.get("url")]
|
|
page_texts = await asyncio.gather(*fetch_tasks, return_exceptions=True)
|
|
|
|
sources = []
|
|
for i, r in enumerate(search_results[:max_sources]):
|
|
text = page_texts[i] if i < len(page_texts) and isinstance(page_texts[i], str) else ""
|
|
sources.append({"url": r["url"], "title": r["title"], "snippet": r["snippet"], "content": text})
|
|
|
|
context_parts = []
|
|
for i, s in enumerate(sources, 1):
|
|
body = s["content"] or s["snippet"]
|
|
if body:
|
|
context_parts.append(f"SOURCE {i}: {s['title']}\nURL: {s['url']}\n{body[:3000]}\n")
|
|
|
|
context_text = "\n---\n".join(context_parts) if context_parts else "No page content available."
|
|
synthesis_prompt = (
|
|
f'You are JARVIS, an advanced AI research assistant. The user asked: "{query}"\n\n'
|
|
f"You have retrieved the following source material:\n\n{context_text}\n\n"
|
|
f"Provide a comprehensive research briefing with:\n"
|
|
f"1. **EXECUTIVE SUMMARY** (2-3 sentences)\n"
|
|
f"2. **KEY FINDINGS** (5-8 bullet points)\n"
|
|
f"3. **DETAILS** (thorough synthesis, 2-4 paragraphs)\n"
|
|
f"4. **CONFIDENCE** (High/Medium/Low based on source quality)\n\n"
|
|
f"Be precise, factual, and cite source numbers where relevant."
|
|
)
|
|
|
|
try:
|
|
synthesis = await llm_call([{"role": "user", "content": synthesis_prompt}], provider=provider)
|
|
except Exception as e:
|
|
log.warning(f"[INTEL] Synthesis failed, falling back: {e}")
|
|
synthesis = f"**RESEARCH BRIEFING: {query}**\n\n" + "\n\n".join(f"**{s['title']}**\n{s['snippet']}" for s in sources)
|
|
|
|
key_points = []
|
|
for line in synthesis.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith(("- ", "• ", "* ")) and len(line) > 10:
|
|
key_points.append(re.sub(r'^[-•*]\s*', '', line))
|
|
elif re.match(r'^\d+\.\s+', line) and len(line) > 10:
|
|
key_points.append(re.sub(r'^\d+\.\s+', '', line))
|
|
|
|
clean_sources = [{"url": s["url"], "title": s["title"] or s["url"]} for s in sources if s.get("url")]
|
|
log.info(f"[INTEL] Research complete: '{query}' — {len(sources)} sources")
|
|
return {"query": query, "depth": depth, "synthesis": synthesis, "key_points": key_points[:10],
|
|
"sources": clean_sources, "source_count": len(clean_sources), "provider": provider}
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 2: IRON PROTOCOL — MULTI-STEP TOOL LOOP
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
AGENT_TOOLS = [
|
|
{"name": "web_search", "description": "Search the web for current information.",
|
|
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}, "max_results": {"type": "integer", "default": 5}}, "required": ["query"]}},
|
|
{"name": "fetch_url", "description": "Fetch a web page and extract its main text content.",
|
|
"input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}},
|
|
{"name": "jarvis_agents", "description": "Get status of all JARVIS agents.",
|
|
"input_schema": {"type": "object", "properties": {}}},
|
|
{"name": "jarvis_alerts", "description": "Get current active JARVIS alerts.",
|
|
"input_schema": {"type": "object", "properties": {}}},
|
|
{"name": "current_time", "description": "Get current date and time.",
|
|
"input_schema": {"type": "object", "properties": {}}},
|
|
]
|
|
|
|
async def execute_agent_tool(tool_name: str, tool_input: dict) -> str:
|
|
try:
|
|
if tool_name == "web_search":
|
|
results = await _web_search(tool_input.get("query", ""), tool_input.get("max_results", 5))
|
|
if not results:
|
|
return "No search results found."
|
|
return "\n".join(f"{i+1}. {r['title']}\n URL: {r['url']}\n {r['snippet']}" for i, r in enumerate(results))
|
|
elif tool_name == "fetch_url":
|
|
url = tool_input.get("url", "")
|
|
if not url:
|
|
return "No URL provided."
|
|
return await _fetch_url_content(url) or "Could not extract content."
|
|
elif tool_name == "jarvis_agents":
|
|
agents = await db_fetchall("SELECT hostname, agent_type, ip_address, status, last_seen FROM registered_agents ORDER BY status='online' DESC, hostname")
|
|
if not agents:
|
|
return "No agents registered."
|
|
return "\n".join(f"- {a['hostname']} ({a['agent_type']}) @ {a['ip_address']} — {a['status'].upper()}" for a in agents)
|
|
elif tool_name == "jarvis_alerts":
|
|
alerts = await db_fetchall("SELECT title, severity, message FROM alerts WHERE resolved=0 ORDER BY created_at DESC LIMIT 10")
|
|
return "\n".join(f"- [{a['severity'].upper()}] {a['title']}: {a['message']}" for a in alerts) or "No active alerts."
|
|
elif tool_name == "current_time":
|
|
return datetime.utcnow().strftime("UTC: %Y-%m-%d %H:%M:%S")
|
|
else:
|
|
return f"Unknown tool: {tool_name}"
|
|
except Exception as e:
|
|
return f"Tool error ({tool_name}): {str(e)[:200]}"
|
|
|
|
async def handle_tool_loop(payload: dict) -> dict:
|
|
task = payload.get("task", "").strip()
|
|
max_iterations = min(int(payload.get("max_iterations", 15)), 200)
|
|
system_prompt = payload.get("system", (
|
|
"You are JARVIS, an advanced autonomous AI assistant with access to tools. "
|
|
"Use tools methodically to complete the task. Be thorough but concise. "
|
|
"When you have enough information, provide a clear final answer."
|
|
))
|
|
if not task:
|
|
raise ValueError("Missing task")
|
|
|
|
log.info(f"[IRON] Tool loop: '{task[:80]}' max_iter={max_iterations}")
|
|
|
|
import anthropic
|
|
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
|
|
messages = [{"role": "user", "content": task}]
|
|
iteration = 0
|
|
final_text = ""
|
|
|
|
while iteration < max_iterations:
|
|
iteration += 1
|
|
response = await client.messages.create(
|
|
model=CLAUDE_MODEL, max_tokens=4096, system=system_prompt,
|
|
tools=AGENT_TOOLS, messages=messages,
|
|
)
|
|
assistant_content = []
|
|
text_parts = []
|
|
tool_uses = []
|
|
for block in response.content:
|
|
if block.type == "text":
|
|
assistant_content.append({"type": "text", "text": block.text})
|
|
text_parts.append(block.text)
|
|
elif block.type == "tool_use":
|
|
assistant_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input})
|
|
tool_uses.append(block)
|
|
messages.append({"role": "assistant", "content": assistant_content})
|
|
if text_parts:
|
|
final_text = "\n".join(text_parts)
|
|
if response.stop_reason == "end_turn" or not tool_uses:
|
|
log.info(f"[IRON] Complete after {iteration} iterations")
|
|
break
|
|
tool_results = []
|
|
for tu in tool_uses:
|
|
log.info(f"[IRON] Tool: {tu.name}")
|
|
result = await execute_agent_tool(tu.name, tu.input)
|
|
tool_results.append({"type": "tool_result", "tool_use_id": tu.id, "content": result[:3000]})
|
|
messages.append({"role": "user", "content": tool_results})
|
|
|
|
tools_used = list({b["name"] for m in messages if isinstance(m["content"], list)
|
|
for b in m["content"] if isinstance(b, dict) and b.get("type") == "tool_use"})
|
|
return {"task": task, "result": final_text, "iterations": iteration, "tools_used": tools_used}
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 3: COMMS PROTOCOL — GMAIL TRIAGE
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _imap_fetch_sync(account: str, max_msgs: int) -> list:
|
|
"""Synchronous IMAP fetch — called via run_in_executor."""
|
|
if account == "icloud":
|
|
host, port, user, passwd = "imap.mail.me.com", 993, ICLOUD_USER, ICLOUD_PASS
|
|
else:
|
|
host, port, user, passwd = "imap.gmail.com", 993, GMAIL_USER, GMAIL_PASS
|
|
if not user or not passwd:
|
|
return []
|
|
try:
|
|
mbox = imaplib.IMAP4_SSL(host, port)
|
|
mbox.login(user, passwd)
|
|
mbox.select("INBOX")
|
|
_, data = mbox.search(None, "ALL")
|
|
ids = data[0].split()[-max_msgs:]
|
|
msgs = []
|
|
for mid in reversed(ids):
|
|
_, raw = mbox.fetch(mid, "(RFC822)")
|
|
if not raw or not raw[0]:
|
|
continue
|
|
msg = _email_lib.message_from_bytes(raw[0][1])
|
|
# Subject
|
|
subj_raw = msg.get("Subject", "")
|
|
subject = ""
|
|
for part, enc in _decode_header(subj_raw):
|
|
if isinstance(part, bytes):
|
|
subject += part.decode(enc or "utf-8", errors="replace")
|
|
else:
|
|
subject += str(part)
|
|
# From
|
|
from_raw = msg.get("From", "")
|
|
from_name = ""
|
|
for part, enc in _decode_header(from_raw):
|
|
if isinstance(part, bytes):
|
|
from_name += part.decode(enc or "utf-8", errors="replace")
|
|
else:
|
|
from_name += str(part)
|
|
from_name = from_name.strip().strip('"')
|
|
em = re.search(r"<([^>]+)>", from_raw)
|
|
from_email = em.group(1) if em else from_raw
|
|
# Date
|
|
date_str = msg.get("Date", "")
|
|
# Body preview
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
body = payload.decode(part.get_content_charset() or "utf-8", errors="replace")
|
|
break
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
body = payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
|
|
body = " ".join(body.split())[:500]
|
|
msg_uid = (msg.get("Message-ID", "") or f"{from_email}:{subject}:{date_str}").strip("<>")
|
|
msgs.append({
|
|
"msg_id": msg_uid[:255],
|
|
"from_name": from_name[:100],
|
|
"from_email": from_email[:100],
|
|
"subject": subject[:255],
|
|
"date_raw": date_str,
|
|
"body": body,
|
|
})
|
|
mbox.logout()
|
|
return msgs
|
|
except Exception as e:
|
|
log.warning(f"IMAP fetch failed ({account}): {e}")
|
|
return []
|
|
|
|
async def handle_gmail_triage(payload: dict) -> dict:
|
|
account = payload.get("account", "gmail")
|
|
max_emails = min(int(payload.get("max_emails", 20)), 40)
|
|
provider = payload.get("provider", "claude")
|
|
|
|
log.info(f"[COMMS] Gmail triage: account={account} max={max_emails}")
|
|
|
|
loop = asyncio.get_event_loop()
|
|
emails = await loop.run_in_executor(None, lambda: _imap_fetch_sync(account, max_emails))
|
|
|
|
if not emails:
|
|
return {"account": account, "triaged": 0, "urgent": 0, "action": 0,
|
|
"meeting": 0, "items": [], "error": "No emails fetched or IMAP unavailable"}
|
|
|
|
email_list = ""
|
|
for i, e in enumerate(emails, 1):
|
|
email_list += (
|
|
f"EMAIL {i}:\n"
|
|
f"From: {e['from_name']} <{e['from_email']}>\n"
|
|
f"Subject: {e['subject']}\n"
|
|
f"Date: {e['date_raw']}\n"
|
|
f"Body: {e['body'][:400]}\n\n"
|
|
)
|
|
|
|
triage_prompt = (
|
|
f"You are JARVIS, triaging {len(emails)} emails for Myron Blair.\n\n"
|
|
"For each email assign:\n"
|
|
"- category: urgent | action | reply | meeting | info | promo | spam\n"
|
|
"- priority: 1-10 (10 = drop everything)\n"
|
|
"- summary: one sentence\n"
|
|
"- draft_reply: for urgent/action/reply/meeting ONLY — brief professional reply. Empty string otherwise.\n\n"
|
|
"Return ONLY a valid JSON array. Example:\n"
|
|
'[{"index":1,"category":"urgent","priority":9,"summary":"Contract needs signing today","draft_reply":"Hi, I will review and sign today."}]\n\n'
|
|
f"EMAILS TO TRIAGE:\n{email_list}"
|
|
)
|
|
|
|
try:
|
|
raw = await llm_call([{"role": "user", "content": triage_prompt}], provider)
|
|
raw = raw.strip()
|
|
if raw.startswith("```"):
|
|
raw = "\n".join(raw.split("\n")[1:])
|
|
if raw.endswith("```"):
|
|
raw = raw[:-3]
|
|
triage_items = json.loads(raw.strip())
|
|
except Exception as e:
|
|
log.warning(f"[COMMS] Triage parse error: {e}")
|
|
triage_items = [{"index": i+1, "category": "info", "priority": 3,
|
|
"summary": f"Subject: {e2['subject']}", "draft_reply": ""}
|
|
for i, e2 in enumerate(emails)]
|
|
|
|
# Save to email_triage table
|
|
saved = 0
|
|
for item in triage_items:
|
|
idx = int(item.get("index", 0)) - 1
|
|
if idx < 0 or idx >= len(emails):
|
|
continue
|
|
e = emails[idx]
|
|
try:
|
|
date_parsed = None
|
|
if e.get("date_raw"):
|
|
try:
|
|
date_parsed = parsedate_to_datetime(e["date_raw"]).strftime("%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
pass
|
|
await db_execute(
|
|
"""INSERT INTO email_triage
|
|
(msg_id, account, from_name, from_email, subject, date_received,
|
|
category, priority, summary, draft_reply, action_taken)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'none')
|
|
ON DUPLICATE KEY UPDATE
|
|
category=VALUES(category), priority=VALUES(priority),
|
|
summary=VALUES(summary), draft_reply=VALUES(draft_reply)""",
|
|
(e["msg_id"], account, e["from_name"], e["from_email"], e["subject"],
|
|
date_parsed, item.get("category", "info"), int(item.get("priority", 3)),
|
|
item.get("summary", "")[:500], item.get("draft_reply", "")[:3000])
|
|
)
|
|
saved += 1
|
|
except Exception as ex:
|
|
log.debug(f"[COMMS] Save triage error: {ex}")
|
|
|
|
counts = {}
|
|
for item in triage_items:
|
|
c = item.get("category", "info")
|
|
counts[c] = counts.get(c, 0) + 1
|
|
|
|
urgent_items = []
|
|
for it in triage_items:
|
|
idx = int(it.get("index", 0)) - 1
|
|
if 0 <= idx < len(emails) and it.get("category") in ("urgent", "action", "reply", "meeting"):
|
|
urgent_items.append({
|
|
"from": emails[idx]["from_name"],
|
|
"from_email": emails[idx]["from_email"],
|
|
"subject": emails[idx]["subject"][:80],
|
|
"summary": it.get("summary", ""),
|
|
"priority": it.get("priority", 0),
|
|
"draft_reply": it.get("draft_reply", ""),
|
|
"category": it.get("category", ""),
|
|
})
|
|
urgent_items.sort(key=lambda x: -x["priority"])
|
|
|
|
log.info(f"[COMMS] Triage complete: {len(emails)} emails, {saved} saved, counts={counts}")
|
|
return {
|
|
"account": account,
|
|
"triaged": len(emails),
|
|
"saved": saved,
|
|
"counts": counts,
|
|
"urgent": counts.get("urgent", 0),
|
|
"action": counts.get("action", 0),
|
|
"meeting": counts.get("meeting", 0),
|
|
"reply": counts.get("reply", 0),
|
|
"items": urgent_items[:10],
|
|
}
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 3: FIELD PROTOCOL — REMOTE EXEC
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def handle_remote_exec(payload: dict) -> dict:
|
|
"""
|
|
Queue a command to a JARVIS Field Station agent and wait for the result.
|
|
payload: { agent, command_type, command_data, timeout }
|
|
"""
|
|
agent_name = payload.get("agent", "").strip()
|
|
cmd_type = payload.get("command_type", "ping")
|
|
cmd_data = payload.get("command_data", {})
|
|
timeout_secs = min(int(payload.get("timeout", 35)), 120)
|
|
|
|
if not agent_name:
|
|
raise ValueError("Missing agent name")
|
|
|
|
log.info(f"[FIELD] remote_exec {cmd_type} on {agent_name}")
|
|
|
|
# NOTE: _dispatch_agent_command is defined in Phase 4 block below.
|
|
# Forward declaration — called at runtime, not at import.
|
|
dispatch = await _dispatch_agent_command(agent_name, cmd_type, cmd_data, timeout_secs)
|
|
return {
|
|
"agent": dispatch["agent"],
|
|
"agent_id": dispatch["agent_id"],
|
|
"command_type": cmd_type,
|
|
"command_data": cmd_data,
|
|
"cmd_id": dispatch["cmd_id"],
|
|
"status": dispatch["status"],
|
|
"result": dispatch["result"],
|
|
}
|
|
|
|
# ── PHASE 4: VISION PROTOCOL ──────────────────────────────────────────────────
|
|
|
|
async def _dispatch_agent_command(agent_name: str, cmd_type: str, cmd_data: dict,
|
|
timeout_secs: int = 45) -> dict:
|
|
"""Shared helper: find agent, queue command, poll for result."""
|
|
agent = await db_fetchone(
|
|
"""SELECT agent_id, hostname, status FROM registered_agents
|
|
WHERE (hostname LIKE %s OR agent_id LIKE %s) AND status='online' LIMIT 1""",
|
|
(f"%{agent_name}%", f"%{agent_name}%")
|
|
)
|
|
if not agent:
|
|
all_agents = await db_fetchall("SELECT hostname FROM registered_agents WHERE status='online'")
|
|
names = [a["hostname"] for a in all_agents]
|
|
raise ValueError(f"No online agent matching '{agent_name}'. Online: {', '.join(names) or 'none'}")
|
|
|
|
cmd_id = await db_execute(
|
|
"INSERT INTO agent_commands (agent_id, command_type, command_data) VALUES (%s, %s, %s)",
|
|
(agent["agent_id"], cmd_type, json.dumps(cmd_data))
|
|
)
|
|
elapsed = 0
|
|
while elapsed < timeout_secs:
|
|
await asyncio.sleep(2)
|
|
elapsed += 2
|
|
row = await db_fetchone("SELECT status, result FROM agent_commands WHERE id=%s", (cmd_id,))
|
|
if row and row["status"] in ("executed", "failed"):
|
|
result = row.get("result")
|
|
if result and isinstance(result, str):
|
|
try:
|
|
result = json.loads(result)
|
|
except Exception:
|
|
pass
|
|
return {"agent": agent["hostname"], "agent_id": agent["agent_id"],
|
|
"cmd_id": cmd_id, "status": row["status"], "result": result or {}}
|
|
await db_execute("UPDATE agent_commands SET status='failed' WHERE id=%s AND status='pending'", (cmd_id,))
|
|
raise TimeoutError(f"No response from {agent['hostname']} within {timeout_secs}s")
|
|
|
|
|
|
async def handle_screenshot(payload: dict) -> dict:
|
|
"""
|
|
Capture a screenshot (or system snapshot) from a field agent, then optionally
|
|
run vision analysis via Claude.
|
|
payload: { agent, analyze: true|false, analyze_prompt: "...", timeout: 45 }
|
|
"""
|
|
agent_name = payload.get("agent", "").strip()
|
|
do_analyze = bool(payload.get("analyze", True))
|
|
analyze_prompt = payload.get("analyze_prompt", "Describe what you see on this screen in detail. Note any important status indicators, errors, running processes, or interesting information.")
|
|
timeout_secs = min(int(payload.get("timeout", 45)), 120)
|
|
|
|
if not agent_name:
|
|
raise ValueError("Missing agent name")
|
|
|
|
log.info(f"[VISION] Screenshot request: agent={agent_name} analyze={do_analyze}")
|
|
|
|
# Dispatch screenshot command to field agent
|
|
dispatch = await _dispatch_agent_command(agent_name, "screenshot", {}, timeout_secs)
|
|
result = dispatch.get("result", {})
|
|
hostname = dispatch.get("agent", agent_name)
|
|
|
|
if dispatch.get("status") == "failed" or not result.get("success"):
|
|
err = result.get("error", "Agent returned failure") if isinstance(result, dict) else str(result)
|
|
return {"agent": hostname, "success": False, "error": err}
|
|
|
|
image_b64 = result.get("image_b64", "")
|
|
method = result.get("method", "unknown")
|
|
width = result.get("width", 0)
|
|
height = result.get("height", 0)
|
|
file_size = result.get("file_size", 0)
|
|
|
|
# Run Claude vision analysis if we have an image
|
|
analysis = ""
|
|
provider_used = ""
|
|
if do_analyze and image_b64:
|
|
try:
|
|
import anthropic
|
|
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
|
|
msg = await client.messages.create(
|
|
model="claude-opus-4-8-20251101",
|
|
max_tokens=1024,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "image", "source": {"type": "base64",
|
|
"media_type": "image/png", "data": image_b64}},
|
|
{"type": "text", "text": analyze_prompt},
|
|
],
|
|
}],
|
|
)
|
|
analysis = msg.content[0].text if msg.content else ""
|
|
provider_used = "claude"
|
|
log.info(f"[VISION] Claude analysis complete ({len(analysis)} chars)")
|
|
except Exception as e:
|
|
log.warning(f"[VISION] Claude vision failed: {e}")
|
|
analysis = f"Vision analysis unavailable: {e}"
|
|
elif do_analyze and not image_b64 and result.get("snapshot_type") == "text":
|
|
# Text-only sysinfo snapshot — summarize with LLM
|
|
try:
|
|
snap_text = json.dumps(result, indent=2)[:3000]
|
|
prompt = f"Summarize this server system snapshot for JARVIS. Highlight any concerns:\n\n{snap_text}"
|
|
analysis = await llm_call([{"role": "user", "content": prompt}], "groq")
|
|
provider_used = "groq"
|
|
except Exception as e:
|
|
analysis = f"Analysis unavailable: {e}"
|
|
|
|
# Store screenshot
|
|
screenshot_id = await db_execute(
|
|
"""INSERT INTO agent_screenshots
|
|
(agent_id, hostname, method, image_b64, width, height, file_size,
|
|
vision_analysis, vision_provider)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
(dispatch.get("agent_id", ""), hostname, method,
|
|
image_b64, width, height, file_size, analysis, provider_used)
|
|
)
|
|
|
|
log.info(f"[VISION] Screenshot saved: id={screenshot_id} agent={hostname} method={method}")
|
|
return {
|
|
"agent": hostname,
|
|
"screenshot_id": screenshot_id,
|
|
"method": method,
|
|
"width": width,
|
|
"height": height,
|
|
"file_size": file_size,
|
|
"has_image": bool(image_b64),
|
|
"analysis": analysis,
|
|
"provider": provider_used,
|
|
}
|
|
|
|
|
|
async def handle_vision(payload: dict) -> dict:
|
|
"""
|
|
Run Claude vision on an already-stored screenshot or a provided base64 image.
|
|
payload: { screenshot_id OR image_b64, prompt, provider }
|
|
"""
|
|
screenshot_id = payload.get("screenshot_id")
|
|
image_b64 = payload.get("image_b64", "")
|
|
prompt = payload.get("prompt", "Describe this image in detail.")
|
|
provider = payload.get("provider", "claude")
|
|
|
|
if screenshot_id:
|
|
row = await db_fetchone(
|
|
"SELECT image_b64, hostname, method FROM agent_screenshots WHERE id=%s",
|
|
(int(screenshot_id),)
|
|
)
|
|
if not row or not row["image_b64"]:
|
|
raise ValueError(f"Screenshot {screenshot_id} not found or has no image data")
|
|
image_b64 = row["image_b64"]
|
|
hostname = row.get("hostname", "unknown")
|
|
else:
|
|
hostname = "direct"
|
|
|
|
if not image_b64:
|
|
raise ValueError("No image data provided")
|
|
|
|
log.info(f"[VISION] Analysis: screenshot_id={screenshot_id} agent={hostname}")
|
|
|
|
try:
|
|
import anthropic
|
|
client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY)
|
|
msg = await client.messages.create(
|
|
model="claude-opus-4-8-20251101",
|
|
max_tokens=2048,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "image", "source": {"type": "base64",
|
|
"media_type": "image/png", "data": image_b64}},
|
|
{"type": "text", "text": prompt},
|
|
],
|
|
}],
|
|
)
|
|
analysis = msg.content[0].text if msg.content else ""
|
|
except Exception as e:
|
|
raise RuntimeError(f"Vision analysis failed: {e}")
|
|
|
|
# Update stored screenshot if we have an ID
|
|
if screenshot_id:
|
|
await db_execute(
|
|
"UPDATE agent_screenshots SET vision_analysis=%s, vision_provider=%s WHERE id=%s",
|
|
(analysis, "claude", int(screenshot_id))
|
|
)
|
|
|
|
return {
|
|
"agent": hostname,
|
|
"screenshot_id": screenshot_id,
|
|
"prompt": prompt,
|
|
"analysis": analysis,
|
|
"provider": "claude",
|
|
}
|
|
|
|
|
|
async def handle_sysinfo(payload: dict) -> dict:
|
|
"""
|
|
Get a structured system snapshot from a field agent (no image).
|
|
Optionally ask Claude to summarize/assess the health of the machine.
|
|
payload: { agent, analyze: true|false, timeout: 30 }
|
|
"""
|
|
agent_name = payload.get("agent", "").strip()
|
|
do_analyze = bool(payload.get("analyze", True))
|
|
timeout_secs = min(int(payload.get("timeout", 30)), 60)
|
|
|
|
if not agent_name:
|
|
raise ValueError("Missing agent name")
|
|
|
|
dispatch = await _dispatch_agent_command(agent_name, "sysinfo", {}, timeout_secs)
|
|
result = dispatch.get("result", {})
|
|
hostname = dispatch.get("agent", agent_name)
|
|
|
|
if dispatch.get("status") == "failed":
|
|
return {"agent": hostname, "success": False, "error": result.get("error", "Command failed")}
|
|
|
|
analysis = ""
|
|
if do_analyze:
|
|
try:
|
|
snap = json.dumps(result, indent=2)[:3000]
|
|
summary_prompt = (
|
|
f"You are JARVIS analyzing a field station sysinfo snapshot from '{hostname}'.\n\n"
|
|
f"Snapshot:\n{snap}\n\n"
|
|
"Provide a concise health assessment: status (healthy/warning/critical), "
|
|
"key metrics, any concerns, and recommended actions if needed. "
|
|
"Keep it under 150 words, JARVIS style."
|
|
)
|
|
analysis = await llm_call([{"role": "user", "content": summary_prompt}], "claude")
|
|
except Exception as e:
|
|
analysis = f"Analysis unavailable: {e}"
|
|
|
|
return {
|
|
"agent": hostname,
|
|
"success": True,
|
|
"snapshot": result,
|
|
"analysis": analysis,
|
|
}
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# JOB HANDLER REGISTRY
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
JOB_HANDLERS = {
|
|
"ping": handle_ping,
|
|
"echo": handle_echo,
|
|
"shell": handle_shell,
|
|
"llm": handle_llm,
|
|
"research": handle_research,
|
|
"tool_loop": handle_tool_loop,
|
|
"gmail_triage": handle_gmail_triage,
|
|
"remote_exec": handle_remote_exec,
|
|
# Phase 4
|
|
"screenshot": handle_screenshot,
|
|
"vision": handle_vision,
|
|
"sysinfo": handle_sysinfo,
|
|
# Phase 5
|
|
"sitrep": None, # registered after guardian functions are defined
|
|
"guardian_config": None,
|
|
}
|
|
|
|
# ── PHASE 5: GUARDIAN MODE ────────────────────────────────────────────────────
|
|
|
|
# In-memory state: tracks last alert time per (agent_id, metric) to debounce
|
|
_guardian_state: dict = {
|
|
"enabled": True,
|
|
"last_scan": None,
|
|
"last_sitrep": None,
|
|
"alert_cooldown": {}, # key: "agent_id:metric" → last_alert_epoch
|
|
"online_snapshot": {}, # agent_id → was_online bool
|
|
}
|
|
GUARDIAN_COOLDOWN = 600 # seconds between repeat alerts for same metric
|
|
|
|
|
|
async def _guardian_get_config() -> dict:
|
|
rows = await db_fetchall("SELECT key_name, value FROM guardian_config")
|
|
return {r["key_name"]: r["value"] for r in rows} if rows else {}
|
|
|
|
|
|
async def _guardian_emit(event_type: str, severity: str, agent_id: str,
|
|
hostname: str, metric: str, value: float,
|
|
threshold: float, message: str, ai_analysis: str = "") -> int:
|
|
return await db_execute(
|
|
"""INSERT INTO guardian_events
|
|
(event_type, severity, agent_id, hostname, metric, value, threshold,
|
|
message, ai_analysis)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
(event_type, severity, agent_id, hostname, metric, value, threshold,
|
|
message, ai_analysis)
|
|
)
|
|
|
|
|
|
async def _guardian_cooldown_ok(agent_id: str, metric: str) -> bool:
|
|
"""Return True if enough time has passed since last alert for this agent+metric."""
|
|
import time
|
|
key = f"{agent_id}:{metric}"
|
|
last = _guardian_state["alert_cooldown"].get(key, 0)
|
|
if (time.time() - last) >= GUARDIAN_COOLDOWN:
|
|
_guardian_state["alert_cooldown"][key] = time.time()
|
|
return True
|
|
return False
|
|
|
|
|
|
async def guardian_loop() -> None:
|
|
"""
|
|
Background task — runs every SCAN_INTERVAL seconds.
|
|
Checks all agents for anomalies, emits guardian_events, optionally
|
|
generates AI analysis for critical findings.
|
|
"""
|
|
import time
|
|
log.info("[GUARDIAN] Guardian Mode activated")
|
|
|
|
while True:
|
|
try:
|
|
cfg = await _guardian_get_config()
|
|
if cfg.get("enabled", "1") == "0":
|
|
await asyncio.sleep(60)
|
|
continue
|
|
|
|
scan_interval = int(cfg.get("scan_interval", 120))
|
|
cpu_thresh = float(cfg.get("cpu_threshold", 85))
|
|
mem_thresh = float(cfg.get("mem_threshold", 88))
|
|
disk_thresh = float(cfg.get("disk_threshold", 88))
|
|
offline_mins = int(cfg.get("offline_minutes", 3))
|
|
ai_on = cfg.get("ai_analysis", "1") == "1"
|
|
proactive_chat = cfg.get("proactive_chat", "1") == "1"
|
|
|
|
_guardian_state["last_scan"] = datetime.utcnow().isoformat()
|
|
critical_findings = []
|
|
|
|
# ── 1. Agent online/offline transitions ───────────────────────────
|
|
agents = await db_fetchall(
|
|
"SELECT agent_id, hostname, status, last_seen FROM registered_agents"
|
|
)
|
|
current_snapshot = {}
|
|
for ag in agents:
|
|
aid = ag["agent_id"]
|
|
hostname = ag["hostname"]
|
|
is_online = ag["status"] == "online"
|
|
current_snapshot[aid] = is_online
|
|
was_online = _guardian_state["online_snapshot"].get(aid)
|
|
|
|
if was_online is True and not is_online:
|
|
# Agent went offline
|
|
if await _guardian_cooldown_ok(aid, "offline"):
|
|
eid = await _guardian_emit("agent_offline", "critical", aid, hostname,
|
|
"status", 0, 1, f"Field station '{hostname}' went OFFLINE")
|
|
critical_findings.append(f"⚠ {hostname} OFFLINE")
|
|
log.warning(f"[GUARDIAN] {hostname} went offline → event #{eid}")
|
|
|
|
elif was_online is False and is_online:
|
|
# Agent came back
|
|
if await _guardian_cooldown_ok(aid, "online"):
|
|
await _guardian_emit("agent_online", "info", aid, hostname,
|
|
"status", 1, 0, f"Field station '{hostname}' back ONLINE")
|
|
log.info(f"[GUARDIAN] {hostname} came back online")
|
|
|
|
_guardian_state["online_snapshot"] = current_snapshot
|
|
|
|
# ── 2. Metrics analysis ───────────────────────────────────────────
|
|
# Get latest metric per online agent
|
|
metrics_rows = await db_fetchall(
|
|
"""SELECT m.agent_id, r.hostname, m.metric_data
|
|
FROM agent_metrics m
|
|
JOIN registered_agents r ON r.agent_id = m.agent_id
|
|
WHERE r.status = 'online'
|
|
AND m.recorded_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
|
|
ORDER BY m.recorded_at DESC"""
|
|
)
|
|
seen_agents: set = set()
|
|
for row in metrics_rows:
|
|
aid = row["agent_id"]
|
|
if aid in seen_agents:
|
|
continue
|
|
seen_agents.add(aid)
|
|
hostname = row["hostname"]
|
|
|
|
try:
|
|
m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"]
|
|
sys_data = m
|
|
|
|
cpu = sys_data.get("cpu_percent")
|
|
if cpu is not None and float(cpu) >= cpu_thresh:
|
|
sev = "critical" if float(cpu) >= 95 else "warning"
|
|
if await _guardian_cooldown_ok(aid, "cpu"):
|
|
msg = f"{hostname}: CPU at {cpu:.1f}% (threshold: {cpu_thresh}%)"
|
|
await _guardian_emit("cpu_high", sev, aid, hostname,
|
|
"cpu_percent", float(cpu), cpu_thresh, msg)
|
|
critical_findings.append(f"CPU {hostname} {cpu:.0f}%")
|
|
|
|
mem = sys_data.get("memory", {})
|
|
mem_pct = mem.get("percent") if isinstance(mem, dict) else None
|
|
if mem_pct is not None and float(mem_pct) >= mem_thresh:
|
|
sev = "critical" if float(mem_pct) >= 95 else "warning"
|
|
if await _guardian_cooldown_ok(aid, "memory"):
|
|
msg = f"{hostname}: Memory at {mem_pct:.1f}% (threshold: {mem_thresh}%)"
|
|
await _guardian_emit("mem_high", sev, aid, hostname,
|
|
"mem_percent", float(mem_pct), mem_thresh, msg)
|
|
critical_findings.append(f"MEM {hostname} {mem_pct:.0f}%")
|
|
|
|
disks = sys_data.get("disk", [])
|
|
if isinstance(disks, list):
|
|
for disk in disks:
|
|
dpct = disk.get("percent", 0)
|
|
if dpct and float(str(dpct).rstrip("%")) >= disk_thresh:
|
|
mount = disk.get("mountpoint", "/")
|
|
key = f"disk_{mount}"
|
|
if await _guardian_cooldown_ok(aid, key):
|
|
msg = f"{hostname}: Disk {mount} at {dpct}% (threshold: {disk_thresh}%)"
|
|
sev = "critical" if float(str(dpct).rstrip("%")) >= 95 else "warning"
|
|
await _guardian_emit("disk_high", sev, aid, hostname,
|
|
key, float(str(dpct).rstrip("%")), disk_thresh, msg)
|
|
critical_findings.append(f"DISK {hostname}{mount} {dpct}%")
|
|
|
|
# Service anomalies
|
|
services = sys_data.get("services", [])
|
|
for svc in services:
|
|
if svc.get("status") == "failed":
|
|
svc_name = svc.get("service", "unknown")
|
|
if await _guardian_cooldown_ok(aid, f"svc_{svc_name}"):
|
|
msg = f"{hostname}: Service '{svc_name}' is FAILED"
|
|
await _guardian_emit("service_down", "critical", aid, hostname,
|
|
f"service:{svc_name}", 0, 1, msg)
|
|
critical_findings.append(f"SVC {hostname}/{svc_name} FAILED")
|
|
|
|
except Exception as e:
|
|
log.debug(f"[GUARDIAN] Metrics parse error for {row['hostname']}: {e}")
|
|
|
|
# ── 3. AI analysis for critical findings ──────────────────────────
|
|
if critical_findings and ai_on:
|
|
try:
|
|
findings_text = "\n".join(f"- {f}" for f in critical_findings)
|
|
ai_prompt = (
|
|
f"You are JARVIS. The following anomalies were just detected:\n\n"
|
|
f"{findings_text}\n\n"
|
|
"Provide a brief (2-3 sentences), Iron Man-style alert message "
|
|
"for Myron. Be direct about severity and what action to take. "
|
|
"No markdown, no headers."
|
|
)
|
|
ai_msg = await llm_call([{"role": "user", "content": ai_prompt}], "groq")
|
|
# Update the most recent guardian event with AI analysis
|
|
await db_execute(
|
|
"""UPDATE guardian_events SET ai_analysis=%s
|
|
WHERE ai_analysis='' AND created_at > DATE_SUB(NOW(), INTERVAL 1 MINUTE)
|
|
ORDER BY id DESC LIMIT 1""",
|
|
(ai_msg,)
|
|
)
|
|
# Inject proactive chat message if configured
|
|
if proactive_chat:
|
|
await _guardian_inject_chat(f"◈ GUARDIAN ALERT — {ai_msg}")
|
|
except Exception as e:
|
|
log.warning(f"[GUARDIAN] AI analysis error: {e}")
|
|
|
|
if critical_findings:
|
|
log.warning(f"[GUARDIAN] Scan complete — {len(critical_findings)} findings: {', '.join(critical_findings)}")
|
|
else:
|
|
log.debug(f"[GUARDIAN] Scan complete — all clear")
|
|
|
|
except Exception as exc:
|
|
log.error(f"[GUARDIAN] Loop error: {exc}")
|
|
|
|
await asyncio.sleep(scan_interval)
|
|
|
|
|
|
async def _guardian_inject_chat(message: str) -> None:
|
|
"""Write a proactive JARVIS message into the conversations table so the HUD picks it up."""
|
|
try:
|
|
await db_execute(
|
|
"""INSERT INTO conversations (session_id, role, message, created_at)
|
|
VALUES ('guardian', 'assistant', %s, NOW())""",
|
|
(message,)
|
|
)
|
|
except Exception as e:
|
|
log.debug(f"[GUARDIAN] Chat inject error: {e}")
|
|
|
|
|
|
async def handle_sitrep(payload: dict) -> dict:
|
|
"""
|
|
Situation Report — comprehensive health briefing across all field stations.
|
|
payload: { detail: brief|full, provider: groq }
|
|
"""
|
|
detail = payload.get("detail", "full")
|
|
provider = payload.get("provider", "groq")
|
|
|
|
log.info(f"[GUARDIAN] SITREP requested (detail={detail})")
|
|
|
|
# 1. Gather agent status
|
|
agents = await db_fetchall(
|
|
"""SELECT agent_id, hostname, status, ip_address, agent_type,
|
|
capabilities, last_seen
|
|
FROM registered_agents ORDER BY status DESC, hostname ASC"""
|
|
)
|
|
|
|
# 2. Get latest metrics per agent
|
|
metrics_map = {}
|
|
metrics_rows = await db_fetchall(
|
|
"""SELECT m.agent_id, m.metric_data, m.recorded_at
|
|
FROM agent_metrics m
|
|
WHERE m.recorded_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE)
|
|
ORDER BY m.recorded_at DESC"""
|
|
)
|
|
for row in metrics_rows:
|
|
if row["agent_id"] not in metrics_map:
|
|
try:
|
|
m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"]
|
|
metrics_map[row["agent_id"]] = m
|
|
except Exception:
|
|
pass
|
|
|
|
# 3. Recent guardian events (last 24h)
|
|
recent_events = await db_fetchall(
|
|
"""SELECT event_type, severity, hostname, metric, value, threshold, message, created_at
|
|
FROM guardian_events
|
|
WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)
|
|
ORDER BY created_at DESC LIMIT 20"""
|
|
)
|
|
|
|
# 4. Arc Reactor job stats
|
|
arc_stats = await db_fetchone(
|
|
"""SELECT
|
|
SUM(status='queued') AS queued,
|
|
SUM(status='running') AS running,
|
|
SUM(status='done') AS done,
|
|
SUM(status='failed') AS failed
|
|
FROM arc_jobs WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)"""
|
|
)
|
|
|
|
# 5. Build context for Claude
|
|
agent_lines = []
|
|
for ag in agents:
|
|
m = metrics_map.get(ag["agent_id"], {})
|
|
sys = m if m else {}
|
|
cpu = sys.get("cpu_percent", "?")
|
|
mem = sys.get("memory", {}).get("percent", "?") if isinstance(sys.get("memory"), dict) else "?"
|
|
disk = next((d.get("percent","?") for d in (sys.get("disk") or []) if d.get("mount",d.get("mountpoint","")) == "/"), "?")
|
|
line = (f" {ag['hostname']} ({ag['status'].upper()}) — "
|
|
f"CPU:{cpu}% MEM:{mem}% DISK:{disk}%")
|
|
agent_lines.append(line)
|
|
|
|
event_lines = [
|
|
f" [{e['severity'].upper()}] {e['hostname']}: {e['message']} ({e['created_at']})"
|
|
for e in recent_events
|
|
] or [" No events in last 24h"]
|
|
|
|
sitrep_context = (
|
|
f"JARVIS SITREP — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}\n\n"
|
|
f"FIELD STATIONS ({len(agents)} total):\n" + "\n".join(agent_lines) + "\n\n"
|
|
f"ARC REACTOR (last 24h): queued={arc_stats.get('queued',0)} "
|
|
f"running={arc_stats.get('running',0)} done={arc_stats.get('done',0)} "
|
|
f"failed={arc_stats.get('failed',0)}\n\n"
|
|
f"GUARDIAN EVENTS (last 24h):\n" + "\n".join(event_lines)
|
|
)
|
|
|
|
prompt = (
|
|
f"{sitrep_context}\n\n"
|
|
f"You are JARVIS. Deliver a {'brief 3-sentence' if detail == 'brief' else 'comprehensive'} "
|
|
f"situation report for Myron Blair. Iron Man style — direct, confident, actionable. "
|
|
f"Highlight: overall health status, any critical issues requiring immediate attention, "
|
|
f"and key metrics. "
|
|
+ ("Keep it under 80 words." if detail == "brief" else "Structure: Overall Status → Issues → Metrics → Recommendation.")
|
|
)
|
|
|
|
analysis = await llm_call([{"role": "user", "content": prompt}], provider)
|
|
_guardian_state["last_sitrep"] = datetime.utcnow().isoformat()
|
|
|
|
# Log SITREP as guardian event
|
|
await _guardian_emit("sitrep", "info", "system", "system", "sitrep", 0, 0,
|
|
f"SITREP generated ({detail})", analysis)
|
|
|
|
online_count = sum(1 for a in agents if a["status"] == "online")
|
|
offline_count = len(agents) - online_count
|
|
critical_events = sum(1 for e in recent_events if e["severity"] == "critical")
|
|
|
|
return {
|
|
"sitrep": analysis,
|
|
"agents_online": online_count,
|
|
"agents_offline": offline_count,
|
|
"agents_total": len(agents),
|
|
"events_24h": len(recent_events),
|
|
"critical_24h": critical_events,
|
|
"arc_stats": dict(arc_stats) if arc_stats else {},
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"detail": detail,
|
|
}
|
|
|
|
|
|
async def handle_guardian_config(payload: dict) -> dict:
|
|
"""
|
|
Get or set Guardian Mode configuration.
|
|
payload: { action: get|set, key: ..., value: ... } or { action: set, config: {...} }
|
|
"""
|
|
action = payload.get("action", "get")
|
|
|
|
if action == "get":
|
|
cfg = await _guardian_get_config()
|
|
return {"config": cfg, "guardian_state": {
|
|
"enabled": _guardian_state.get("enabled"),
|
|
"last_scan": _guardian_state.get("last_scan"),
|
|
"last_sitrep": _guardian_state.get("last_sitrep"),
|
|
}}
|
|
|
|
elif action == "set":
|
|
updates = payload.get("config", {})
|
|
if payload.get("key") and payload.get("value") is not None:
|
|
updates[payload["key"]] = str(payload["value"])
|
|
for k, v in updates.items():
|
|
await db_execute(
|
|
"INSERT INTO guardian_config (key_name, value) VALUES (%s, %s) "
|
|
"ON DUPLICATE KEY UPDATE value=%s, updated_at=NOW()",
|
|
(k, str(v), str(v))
|
|
)
|
|
if k == "enabled":
|
|
_guardian_state["enabled"] = v not in ("0", "false", "off")
|
|
return {"ok": True, "updated": list(updates.keys())}
|
|
|
|
raise ValueError(f"Unknown guardian_config action: {action}")
|
|
|
|
|
|
# Register Phase 5 handlers
|
|
JOB_HANDLERS["sitrep"] = handle_sitrep
|
|
JOB_HANDLERS["guardian_config"] = handle_guardian_config
|
|
|
|
# ── PHASE 6: COMMS v2 — SEND EMAIL + SCHEDULE + MEETING PREP ─────────────────
|
|
|
|
import smtplib
|
|
import ssl as _ssl
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.utils import formataddr, make_msgid
|
|
|
|
SMTP_SETTINGS = {
|
|
"gmail": ("smtp.gmail.com", 587, GMAIL_USER, GMAIL_PASS),
|
|
"icloud": ("smtp.mail.me.com", 587, ICLOUD_USER, ICLOUD_PASS),
|
|
}
|
|
|
|
def _smtp_send_sync(account: str, to_email: str, to_name: str,
|
|
subject: str, body: str,
|
|
reply_to_msg_id: str = "") -> dict:
|
|
"""Synchronous SMTP send — run in executor."""
|
|
host, port, user, passwd = SMTP_SETTINGS.get(account, SMTP_SETTINGS["gmail"])
|
|
if not user or not passwd:
|
|
return {"success": False, "error": "No credentials configured for account"}
|
|
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
from_addr = formataddr(("JARVIS / Myron Blair", user))
|
|
msg["From"] = from_addr
|
|
msg["To"] = formataddr((to_name, to_email)) if to_name else to_email
|
|
msg["Subject"] = subject
|
|
msg_id = make_msgid(domain=user.split("@")[1])
|
|
msg["Message-ID"] = msg_id
|
|
if reply_to_msg_id:
|
|
msg["In-Reply-To"] = f"<{reply_to_msg_id.strip('<>')}>"
|
|
msg["References"] = f"<{reply_to_msg_id.strip('<>')}>"
|
|
|
|
msg.attach(MIMEText(body, "plain", "utf-8"))
|
|
|
|
ctx = _ssl.create_default_context()
|
|
with smtplib.SMTP(host, port, timeout=20) as smtp:
|
|
smtp.ehlo()
|
|
smtp.starttls(context=ctx)
|
|
smtp.login(user, passwd)
|
|
smtp.sendmail(user, [to_email], msg.as_bytes())
|
|
|
|
return {"success": True, "message_id": msg_id}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
async def handle_send_email(payload: dict) -> dict:
|
|
"""
|
|
Send an email from Gmail or iCloud.
|
|
payload: {
|
|
account: gmail|icloud,
|
|
to_email: recipient address,
|
|
to_name: recipient display name (optional),
|
|
subject: subject line,
|
|
body: plain-text body,
|
|
triage_id: int (optional — if replying to a triage item),
|
|
reply_to_msg_id: original Message-ID for threading (optional),
|
|
compose: true — ask Claude to draft the body from a prompt first
|
|
}
|
|
"""
|
|
account = payload.get("account", "gmail")
|
|
to_email = payload.get("to_email", "").strip()
|
|
to_name = payload.get("to_name", "").strip()
|
|
subject = payload.get("subject", "").strip()
|
|
body = payload.get("body", "").strip()
|
|
triage_id = payload.get("triage_id")
|
|
reply_msg_id = payload.get("reply_to_msg_id", "")
|
|
compose_prompt = payload.get("compose_prompt", "")
|
|
|
|
# If triage_id is given, pull recipient + subject + draft from email_triage
|
|
if triage_id and not to_email:
|
|
row = await db_fetchone(
|
|
"SELECT from_email, from_name, subject, draft_reply, msg_id FROM email_triage WHERE id=%s",
|
|
(int(triage_id),)
|
|
)
|
|
if row:
|
|
to_email = to_email or row["from_email"]
|
|
to_name = to_name or row["from_name"]
|
|
subject = subject or ("Re: " + (row["subject"] or ""))
|
|
body = body or row.get("draft_reply", "")
|
|
reply_msg_id = reply_msg_id or row.get("msg_id", "")
|
|
|
|
if not to_email:
|
|
raise ValueError("Missing to_email")
|
|
|
|
# If compose requested, use Claude to write the body
|
|
if compose_prompt and not body:
|
|
draft_prompt = (
|
|
f"You are drafting an email on behalf of Myron Blair.\n"
|
|
f"To: {to_name or to_email}\n"
|
|
f"Subject: {subject}\n\n"
|
|
f"Instructions: {compose_prompt}\n\n"
|
|
"Write a professional, concise email body. No subject line, no salutation header. "
|
|
"Start with 'Hi [name],' or similar. Sign off as 'Myron Blair'."
|
|
)
|
|
body = await llm_call([{"role": "user", "content": draft_prompt}], "claude")
|
|
|
|
if not subject:
|
|
raise ValueError("Missing subject")
|
|
if not body:
|
|
raise ValueError("Missing email body — provide body or compose_prompt")
|
|
|
|
log.info(f"[COMMS] Sending email: {account} → {to_email} | {subject[:60]}")
|
|
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None, lambda: _smtp_send_sync(account, to_email, to_name, subject, body, reply_msg_id)
|
|
)
|
|
|
|
# Record in email_sent
|
|
status = "sent" if result["success"] else "failed"
|
|
await db_execute(
|
|
"""INSERT INTO email_sent
|
|
(account, to_email, to_name, subject, body, triage_id, status, error, message_id)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
|
(account, to_email, to_name, subject, body,
|
|
triage_id or None, status,
|
|
result.get("error", ""), result.get("message_id", ""))
|
|
)
|
|
|
|
# Update triage item if this was a reply
|
|
if triage_id and result["success"]:
|
|
await db_execute(
|
|
"UPDATE email_triage SET action_taken='replied' WHERE id=%s", (int(triage_id),)
|
|
)
|
|
|
|
log.info(f"[COMMS] Send result: {status} → {to_email}")
|
|
return {
|
|
"success": result["success"],
|
|
"account": account,
|
|
"to_email": to_email,
|
|
"subject": subject,
|
|
"status": status,
|
|
"message_id": result.get("message_id", ""),
|
|
"error": result.get("error", ""),
|
|
"body": body,
|
|
}
|
|
|
|
|
|
async def handle_compose_email(payload: dict) -> dict:
|
|
"""
|
|
Compose an email from natural-language instructions — draft only (no auto-send).
|
|
payload: { to_email, to_name, subject_hint, instructions, account, send: false }
|
|
"""
|
|
to_email = payload.get("to_email", "").strip()
|
|
to_name = payload.get("to_name", "").strip()
|
|
subject_hint = payload.get("subject_hint", "").strip()
|
|
instructions = payload.get("instructions", "").strip()
|
|
account = payload.get("account", "gmail")
|
|
auto_send = bool(payload.get("send", False))
|
|
|
|
if not instructions:
|
|
raise ValueError("Missing instructions for compose")
|
|
|
|
# Generate subject if not given
|
|
if not subject_hint:
|
|
subj_prompt = f"Write a concise email subject line (max 8 words) for an email about: {instructions}"
|
|
subject_hint = (await llm_call([{"role": "user", "content": subj_prompt}], "claude")).strip().strip('"')
|
|
|
|
# Draft full body
|
|
draft_prompt = (
|
|
f"You are drafting an email for Myron Blair.\n"
|
|
f"To: {to_name or to_email or 'the recipient'}\n"
|
|
f"Subject: {subject_hint}\n\n"
|
|
f"Instructions: {instructions}\n\n"
|
|
"Write a professional, concise email. Start with 'Hi [name],' or 'Hello,' "
|
|
"and sign off as 'Myron Blair'. Return only the email body text."
|
|
)
|
|
body = await llm_call([{"role": "user", "content": draft_prompt}], "claude")
|
|
|
|
if auto_send and to_email:
|
|
return await handle_send_email({
|
|
"account": account,
|
|
"to_email": to_email,
|
|
"to_name": to_name,
|
|
"subject": subject_hint,
|
|
"body": body,
|
|
})
|
|
|
|
# Draft-only — save to email_sent with status='queued' for review
|
|
draft_id = await db_execute(
|
|
"""INSERT INTO email_sent
|
|
(account, to_email, to_name, subject, body, status)
|
|
VALUES (%s,%s,%s,%s,%s,'queued')""",
|
|
(account, to_email, to_name, subject_hint, body)
|
|
)
|
|
|
|
log.info(f"[COMMS] Composed draft #{draft_id}: {to_email} | {subject_hint[:60]}")
|
|
return {
|
|
"draft_id": draft_id,
|
|
"to_email": to_email,
|
|
"to_name": to_name,
|
|
"subject": subject_hint,
|
|
"body": body,
|
|
"account": account,
|
|
"sent": False,
|
|
"status": "queued",
|
|
}
|
|
|
|
|
|
async def handle_schedule_event(payload: dict) -> dict:
|
|
"""
|
|
Create or find an appointment in the JARVIS planner from natural language.
|
|
payload: { title, description, start_at, end_at, location, category,
|
|
natural_language, all_day, reminder_min, generate_ics }
|
|
"""
|
|
nl = payload.get("natural_language", "").strip()
|
|
title = payload.get("title", "").strip()
|
|
start_at = payload.get("start_at", "")
|
|
end_at = payload.get("end_at", "")
|
|
location = payload.get("location", "")
|
|
category = payload.get("category", "work")
|
|
all_day = bool(payload.get("all_day", False))
|
|
reminder = int(payload.get("reminder_min", 30))
|
|
description = payload.get("description", "")
|
|
gen_ics = bool(payload.get("generate_ics", True))
|
|
|
|
# If natural language given, parse with Claude
|
|
if nl and (not title or not start_at):
|
|
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
|
|
parse_prompt = (
|
|
f"Current datetime: {now_str}\n\n"
|
|
f"Parse this scheduling request into JSON:\n\"{nl}\"\n\n"
|
|
"Return ONLY valid JSON with these fields (no markdown):\n"
|
|
'{"title":"...","start_at":"YYYY-MM-DD HH:MM:SS","end_at":"YYYY-MM-DD HH:MM:SS or null",'
|
|
'"location":"...","category":"personal|work|medical|other","all_day":false,'
|
|
'"description":"..."}\n'
|
|
"Use 24h time. Default duration 1 hour if not specified. "
|
|
"If only a date (no time) is given, set all_day=true."
|
|
)
|
|
raw = await llm_call([{"role": "user", "content": parse_prompt}], "claude")
|
|
raw = raw.strip().strip("```json").strip("```").strip()
|
|
try:
|
|
parsed = json.loads(raw)
|
|
title = title or parsed.get("title", nl[:100])
|
|
start_at = start_at or parsed.get("start_at", "")
|
|
end_at = end_at or parsed.get("end_at") or ""
|
|
location = location or parsed.get("location", "")
|
|
category = parsed.get("category", category)
|
|
all_day = parsed.get("all_day", all_day)
|
|
description = description or parsed.get("description", "")
|
|
except Exception as e:
|
|
log.warning(f"[COMMS] Schedule parse failed: {e} — raw: {raw[:100]}")
|
|
if not title:
|
|
title = nl[:100]
|
|
|
|
if not title:
|
|
raise ValueError("Could not determine event title from input")
|
|
if not start_at:
|
|
raise ValueError("Could not determine start time from input")
|
|
|
|
# Insert into appointments table
|
|
appt_id = await db_execute(
|
|
"""INSERT INTO appointments
|
|
(title, description, category, start_at, end_at, location,
|
|
all_day, reminder_min, external_source)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,'manual')""",
|
|
(title, description, category, start_at,
|
|
end_at or None, location, int(all_day), reminder)
|
|
)
|
|
|
|
# Generate ICS if requested
|
|
ics_content = ""
|
|
if gen_ics and start_at:
|
|
try:
|
|
from datetime import datetime as dt
|
|
uid = f"jarvis-{appt_id}@orbishosting.com"
|
|
dtstart = start_at.replace(" ", "T").replace("-", "").replace(":", "")
|
|
dtend = (end_at or start_at).replace(" ", "T").replace("-", "").replace(":", "")
|
|
dtstamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
|
|
ics_content = (
|
|
"BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//JARVIS//EN\r\n"
|
|
"BEGIN:VEVENT\r\n"
|
|
f"UID:{uid}\r\n"
|
|
f"DTSTAMP:{dtstamp}\r\n"
|
|
f"DTSTART:{dtstart}\r\n"
|
|
f"DTEND:{dtend}\r\n"
|
|
f"SUMMARY:{title}\r\n"
|
|
f"DESCRIPTION:{description}\r\n"
|
|
f"LOCATION:{location}\r\n"
|
|
"END:VEVENT\r\nEND:VCALENDAR\r\n"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
log.info(f"[COMMS] Event scheduled: #{appt_id} '{title}' @ {start_at}")
|
|
return {
|
|
"appointment_id": appt_id,
|
|
"title": title,
|
|
"start_at": start_at,
|
|
"end_at": end_at,
|
|
"location": location,
|
|
"category": category,
|
|
"all_day": all_day,
|
|
"ics": ics_content,
|
|
"description": description,
|
|
}
|
|
|
|
|
|
async def handle_meeting_prep(payload: dict) -> dict:
|
|
"""
|
|
Prepare a briefing for an upcoming meeting.
|
|
payload: { appointment_id OR title OR timeframe, research: true }
|
|
"""
|
|
appt_id = payload.get("appointment_id")
|
|
title = payload.get("title", "").strip()
|
|
timeframe = payload.get("timeframe", "today")
|
|
do_research = bool(payload.get("research", True))
|
|
|
|
# Find the appointment
|
|
appt = None
|
|
if appt_id:
|
|
appt = await db_fetchone("SELECT * FROM appointments WHERE id=%s", (int(appt_id),))
|
|
elif title:
|
|
appt = await db_fetchone(
|
|
"SELECT * FROM appointments WHERE title LIKE %s AND start_at >= NOW() ORDER BY start_at LIMIT 1",
|
|
(f"%{title}%",)
|
|
)
|
|
else:
|
|
appt = await db_fetchone(
|
|
"""SELECT * FROM appointments
|
|
WHERE start_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 24 HOUR)
|
|
ORDER BY start_at LIMIT 1"""
|
|
)
|
|
|
|
if not appt:
|
|
# Check tasks too
|
|
task = await db_fetchone(
|
|
"SELECT * FROM tasks WHERE title LIKE %s AND status != 'done' ORDER BY due_date LIMIT 1",
|
|
(f"%{title}%",)
|
|
)
|
|
if task:
|
|
appt = {"title": task["title"], "description": task.get("notes",""),
|
|
"start_at": str(task.get("due_date") or ""), "location": ""}
|
|
|
|
if not appt:
|
|
return {"success": False, "error": f"No upcoming appointment found matching '{title or timeframe}'"}
|
|
|
|
appt_title = appt.get("title", "")
|
|
appt_start = str(appt.get("start_at", ""))
|
|
appt_desc = appt.get("description", "")
|
|
appt_loc = appt.get("location", "")
|
|
|
|
# Optional: kick off a research job on the meeting topic/attendees
|
|
research_summary = ""
|
|
if do_research and appt_title:
|
|
try:
|
|
research_result = await handle_research({
|
|
"query": f"background information on: {appt_title}",
|
|
"depth": "quick",
|
|
"provider": "claude",
|
|
})
|
|
research_summary = research_result.get("synthesis", "")[:1500]
|
|
except Exception:
|
|
pass
|
|
|
|
# Build meeting briefing
|
|
context = (
|
|
f"Meeting: {appt_title}\n"
|
|
f"Time: {appt_start}\n"
|
|
f"Location: {appt_loc or 'Not specified'}\n"
|
|
f"Notes: {appt_desc or 'None'}\n"
|
|
)
|
|
if research_summary:
|
|
context += f"\nBackground Research:\n{research_summary}"
|
|
|
|
prompt = (
|
|
f"You are JARVIS. Prepare a concise meeting briefing for Myron Blair.\n\n"
|
|
f"{context}\n\n"
|
|
"Format: Meeting summary → Key context/background → Suggested talking points → "
|
|
"Action items to prepare. Keep it sharp and actionable. Iron Man style."
|
|
)
|
|
briefing = await llm_call([{"role": "user", "content": prompt}], "claude")
|
|
|
|
log.info(f"[COMMS] Meeting prep complete: '{appt_title}'")
|
|
return {
|
|
"appointment_id": appt.get("id"),
|
|
"title": appt_title,
|
|
"start_at": appt_start,
|
|
"location": appt_loc,
|
|
"briefing": briefing,
|
|
"has_research": bool(research_summary),
|
|
}
|
|
|
|
|
|
# Register Phase 6 handlers
|
|
JOB_HANDLERS["send_email"] = handle_send_email
|
|
JOB_HANDLERS["compose_email"] = handle_compose_email
|
|
JOB_HANDLERS["schedule_event"] = handle_schedule_event
|
|
JOB_HANDLERS["meeting_prep"] = handle_meeting_prep
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 7: MISSION OPS — multi-step automated workflows
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
import re as _re
|
|
|
|
def _render_template(text: str, context: dict) -> str:
|
|
"""Replace {{key.subkey}} tokens in a string/JSON with values from context."""
|
|
if not isinstance(text, str):
|
|
return text
|
|
def replacer(m):
|
|
path = m.group(1).strip().split(".")
|
|
val = context
|
|
for p in path:
|
|
if isinstance(val, dict):
|
|
val = val.get(p, m.group(0))
|
|
else:
|
|
return m.group(0)
|
|
return str(val) if not isinstance(val, (dict, list)) else json.dumps(val)
|
|
return _re.sub(r'\{\{([^}]+)\}\}', replacer, text)
|
|
|
|
def _apply_templates(payload: Any, context: dict) -> Any:
|
|
"""Recursively apply template substitution to a payload dict/list/str."""
|
|
if isinstance(payload, str):
|
|
return _render_template(payload, context)
|
|
if isinstance(payload, dict):
|
|
return {k: _apply_templates(v, context) for k, v in payload.items()}
|
|
if isinstance(payload, list):
|
|
return [_apply_templates(v, context) for v in payload]
|
|
return payload
|
|
|
|
async def _execute_mission(mission_id: int, trigger_source: str = "manual") -> dict:
|
|
"""Run all steps of a mission sequentially, log results, return summary."""
|
|
mission = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,))
|
|
if not mission:
|
|
return {"error": f"Mission {mission_id} not found"}
|
|
|
|
steps = await db_fetchall(
|
|
"SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC",
|
|
(mission_id,)
|
|
)
|
|
if not steps:
|
|
return {"error": "Mission has no steps"}
|
|
|
|
# Create run record
|
|
run_id = await db_execute(
|
|
"INSERT INTO mission_runs (mission_id, status, trigger_source, steps_log) VALUES (%s,'running',%s,'[]')",
|
|
(mission_id, trigger_source)
|
|
)
|
|
await db_execute(
|
|
"UPDATE missions SET last_run_at=NOW(), run_count=run_count+1 WHERE id=%s",
|
|
(mission_id,)
|
|
)
|
|
|
|
context = {"trigger": {"source": trigger_source, "mission_id": mission_id}}
|
|
steps_log = []
|
|
overall_status = "done"
|
|
|
|
for i, step in enumerate(steps):
|
|
step_label = step.get("label") or step["job_type"]
|
|
raw_payload = step.get("job_payload") or {}
|
|
if isinstance(raw_payload, str):
|
|
try:
|
|
raw_payload = json.loads(raw_payload)
|
|
except Exception:
|
|
raw_payload = {}
|
|
|
|
payload = _apply_templates(raw_payload, context)
|
|
step_entry = {
|
|
"step": i,
|
|
"label": step_label,
|
|
"job_type": step["job_type"],
|
|
"status": "running",
|
|
"result": None,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
handler = JOB_HANDLERS.get(step["job_type"])
|
|
if not handler:
|
|
raise ValueError(f"Unknown job type: {step['job_type']}")
|
|
result = await handler(payload)
|
|
step_entry["status"] = "done"
|
|
step_entry["result"] = result
|
|
context[f"step_{i}"] = result
|
|
log.info(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) done")
|
|
except Exception as exc:
|
|
step_entry["status"] = "failed"
|
|
step_entry["error"] = str(exc)[:500]
|
|
log.warning(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) failed: {exc}")
|
|
if not step.get("continue_on_failure"):
|
|
overall_status = "failed"
|
|
steps_log.append(step_entry)
|
|
break
|
|
|
|
steps_log.append(step_entry)
|
|
|
|
await db_execute(
|
|
"UPDATE mission_runs SET status=%s, steps_log=%s, completed_at=NOW() WHERE id=%s",
|
|
(overall_status, json.dumps(steps_log, default=str), run_id)
|
|
)
|
|
return {
|
|
"run_id": run_id,
|
|
"status": overall_status,
|
|
"steps": len(steps_log),
|
|
"steps_log": steps_log,
|
|
}
|
|
|
|
async def handle_run_mission(payload: dict) -> dict:
|
|
mission_id = int(payload.get("mission_id") or 0)
|
|
if not mission_id:
|
|
return {"error": "Missing mission_id"}
|
|
source = payload.get("trigger_source", "manual")
|
|
return await _execute_mission(mission_id, source)
|
|
|
|
JOB_HANDLERS["run_mission"] = handle_run_mission
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 8: MISSION DIRECTIVES — OKR / goal tracking with AI review
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def handle_directive_review(payload: dict) -> dict:
|
|
"""AI-powered review of active directives: progress analysis + recommendations."""
|
|
directive_id = payload.get("directive_id")
|
|
provider = payload.get("provider", "claude")
|
|
|
|
# Fetch directives
|
|
if directive_id:
|
|
dirs = await db_fetchall(
|
|
"SELECT * FROM directives WHERE id=%s", (directive_id,)
|
|
)
|
|
else:
|
|
dirs = await db_fetchall(
|
|
"SELECT * FROM directives WHERE status='active' ORDER BY priority DESC LIMIT 10"
|
|
)
|
|
|
|
if not dirs:
|
|
return {"error": "No active directives found"}
|
|
|
|
dir_ids = [d["id"] for d in dirs]
|
|
placeholders = ",".join(["%s"] * len(dir_ids))
|
|
|
|
key_results = await db_fetchall(
|
|
f"SELECT * FROM directive_key_results WHERE directive_id IN ({placeholders}) ORDER BY directive_id,id",
|
|
dir_ids
|
|
) or []
|
|
|
|
links = await db_fetchall(
|
|
f"""SELECT dl.directive_id, dl.link_type, dl.note,
|
|
COALESCE(t.title, a.title) AS linked_title,
|
|
COALESCE(t.status, 'scheduled') AS linked_status
|
|
FROM directive_links dl
|
|
LEFT JOIN tasks t ON dl.link_type='task' AND t.id=dl.link_id
|
|
LEFT JOIN appointments a ON dl.link_type='appointment' AND a.id=dl.link_id
|
|
WHERE dl.directive_id IN ({placeholders})""",
|
|
dir_ids
|
|
) or []
|
|
|
|
# Build context block
|
|
context_lines = []
|
|
for d in dirs:
|
|
d_krs = [kr for kr in key_results if kr["directive_id"] == d["id"]]
|
|
d_links = [lk for lk in links if lk["directive_id"] == d["id"]]
|
|
kr_sum_cur = sum(float(kr["current_value"]) for kr in d_krs)
|
|
kr_sum_tgt = sum(float(kr["target_value"]) for kr in d_krs) or 1
|
|
pct = round(kr_sum_cur / kr_sum_tgt * 100, 1)
|
|
|
|
context_lines.append(f"\n## DIRECTIVE: {d['title']} ({d['category'].upper()}) — {pct}% complete")
|
|
context_lines.append(f" Status: {d['status']} | Priority: {d['priority']}/10 | Target: {d.get('target_date') or 'no date'}")
|
|
if d.get("description"):
|
|
context_lines.append(f" Description: {d['description']}")
|
|
for kr in d_krs:
|
|
context_lines.append(f" KR: {kr['title']} — {kr['current_value']}/{kr['target_value']} {kr['unit']}")
|
|
for lk in d_links:
|
|
status = lk.get("linked_status") or ""
|
|
context_lines.append(f" Linked {lk['link_type']}: {lk.get('linked_title') or lk.get('note') or '—'} [{status}]")
|
|
|
|
context = "\n".join(context_lines)
|
|
today = datetime.utcnow().strftime("%Y-%m-%d")
|
|
|
|
prompt = f"""You are JARVIS, an AI assistant conducting a Mission Directives review for your principal.
|
|
Today is {today}.
|
|
|
|
{context}
|
|
|
|
Provide a concise executive briefing covering:
|
|
1. Overall progress summary (2-3 sentences)
|
|
2. For each directive: current status, what's on track, what needs attention
|
|
3. Top 3 recommended next actions to move the highest-priority directives forward
|
|
4. Any directives at risk of missing their target date
|
|
|
|
Keep the tone confident and action-oriented. Format with clear sections. Under 400 words."""
|
|
|
|
review_text = ""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
"https://api.anthropic.com/v1/messages",
|
|
headers={"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"},
|
|
json={"model": CLAUDE_MODEL, "max_tokens": 600, "messages": [{"role": "user", "content": prompt}]},
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
) as resp:
|
|
rdata = await resp.json()
|
|
review_text = rdata.get("content", [{}])[0].get("text", "")
|
|
except Exception as e:
|
|
review_text = f"[Review generation failed: {e}]"
|
|
|
|
# Store in conversations so HUD can surface it
|
|
await db_execute(
|
|
"INSERT INTO conversations (session_id, role, message, created_at) VALUES ('guardian','assistant',%s,NOW())",
|
|
(review_text,)
|
|
)
|
|
|
|
return {
|
|
"review": review_text,
|
|
"directives": len(dirs),
|
|
"provider": provider,
|
|
}
|
|
|
|
JOB_HANDLERS["directive_review"] = handle_directive_review
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 10: MEMORY CORE — auto-extraction knowledge graph
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
_MEMORY_EXTRACT_PROMPT = """Extract factual information about the user from this conversation exchange. Focus on:
|
|
- Preferences (likes/dislikes, preferred ways of working)
|
|
- People they mention (names, relationships, roles)
|
|
- Places (home, work, locations)
|
|
- Routines (schedules, habits)
|
|
- Goals (things they want to achieve)
|
|
- Instructions to JARVIS (always/never rules)
|
|
- Key facts about their life/work/projects
|
|
|
|
Return a JSON array. Each element: {{"category": "<preference|person|place|routine|goal|fact|instruction>", "subject": "<short label>", "predicate": "<short verb/attribute>", "object": "<the value>", "confidence": <0.0-1.0>}}
|
|
|
|
Rules:
|
|
- Only extract clearly stated facts, not speculation
|
|
- subject/predicate should be concise (under 50 chars each)
|
|
- confidence: 0.95 for explicit statements, 0.75-0.90 for clear implications
|
|
- Skip ephemeral info (what they asked just now, current queries)
|
|
- Max 8 facts per exchange
|
|
- Return [] if nothing durable worth remembering
|
|
|
|
Exchange:
|
|
USER: {user_msg}
|
|
JARVIS: {asst_msg}
|
|
|
|
Return ONLY valid JSON array, nothing else."""
|
|
|
|
async def handle_memory_extract(payload: dict) -> dict:
|
|
user_msg = str(payload.get("user_message", "")).strip()
|
|
asst_msg = str(payload.get("assistant_message", "")).strip()
|
|
conv_id = payload.get("conversation_id")
|
|
|
|
if not user_msg and not asst_msg:
|
|
return {"ok": False, "reason": "empty exchange"}
|
|
|
|
prompt = _MEMORY_EXTRACT_PROMPT.format(
|
|
user_msg=user_msg[:800],
|
|
asst_msg=asst_msg[:800]
|
|
)
|
|
|
|
raw = ""
|
|
try:
|
|
# Use Haiku for speed/cost; fall back to Groq
|
|
headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}
|
|
body = {
|
|
"model": "claude-haiku-4-5-20251001",
|
|
"max_tokens": 800,
|
|
"messages": [{"role": "user", "content": prompt}]
|
|
}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post("https://api.anthropic.com/v1/messages", json=body,
|
|
headers=headers, timeout=aiohttp.ClientTimeout(total=20)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
raw = data["content"][0]["text"].strip()
|
|
else:
|
|
raise RuntimeError(f"Claude {resp.status}")
|
|
except Exception as e:
|
|
log.warning(f"[MEMORY] Claude extract failed ({e}), trying Groq")
|
|
try:
|
|
raw = await llm_call([{"role": "user", "content": prompt}], provider="groq")
|
|
except Exception as e2:
|
|
log.error(f"[MEMORY] All providers failed: {e2}")
|
|
return {"ok": False, "reason": str(e2)}
|
|
|
|
# Parse JSON — strip code fences if present
|
|
if "```" in raw:
|
|
raw = raw.split("```")[1]
|
|
if raw.startswith("json"):
|
|
raw = raw[4:]
|
|
raw = raw.strip()
|
|
|
|
try:
|
|
facts = json.loads(raw)
|
|
if not isinstance(facts, list):
|
|
facts = []
|
|
except Exception:
|
|
log.warning(f"[MEMORY] JSON parse failed. Raw: {raw[:200]}")
|
|
return {"ok": False, "reason": "json_parse_failed"}
|
|
|
|
valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"}
|
|
inserted = 0
|
|
for f in facts:
|
|
if not isinstance(f, dict):
|
|
continue
|
|
subj = str(f.get("subject", "")).strip()[:255]
|
|
pred = str(f.get("predicate", "is")).strip()[:255]
|
|
obj = str(f.get("object", "")).strip()
|
|
cat = f.get("category", "fact")
|
|
conf = min(1.0, max(0.0, float(f.get("confidence", 0.85))))
|
|
if not subj or not obj or cat not in valid_cats:
|
|
continue
|
|
try:
|
|
await db_execute(
|
|
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source, conversation_id)
|
|
VALUES (%s, %s, %s, %s, %s, 'conversation', %s)
|
|
ON DUPLICATE KEY UPDATE
|
|
object=VALUES(object),
|
|
confidence=GREATEST(confidence, VALUES(confidence)),
|
|
confirmed_count=confirmed_count+1,
|
|
last_confirmed_at=NOW(),
|
|
active=1""",
|
|
(cat, subj, pred, obj, conf, conv_id)
|
|
)
|
|
inserted += 1
|
|
except Exception as e:
|
|
log.warning(f"[MEMORY] Insert ({subj},{pred}) failed: {e}")
|
|
|
|
log.info(f"[MEMORY] Stored {inserted}/{len(facts)} facts from conv {conv_id}")
|
|
return {"ok": True, "extracted": inserted, "candidates": len(facts)}
|
|
|
|
|
|
async def handle_memory_store(payload: dict) -> dict:
|
|
"""Explicit memory insertion — from 'remember that X' voice commands."""
|
|
subject = str(payload.get("subject", "user")).strip()[:255]
|
|
predicate = str(payload.get("predicate", "note")).strip()[:255]
|
|
obj = str(payload.get("object", "")).strip()
|
|
category = payload.get("category", "fact")
|
|
if not obj:
|
|
return {"ok": False, "reason": "empty object"}
|
|
valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"}
|
|
if category not in valid_cats:
|
|
category = "fact"
|
|
fid = await db_execute(
|
|
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source)
|
|
VALUES (%s, %s, %s, %s, 1.0, 'explicit')
|
|
ON DUPLICATE KEY UPDATE
|
|
object=VALUES(object), confidence=1.0,
|
|
confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""",
|
|
(category, subject, predicate, obj)
|
|
)
|
|
log.info(f"[MEMORY] Explicit store: [{category}] {subject} {predicate}: {obj}")
|
|
return {"ok": True, "id": fid}
|
|
|
|
|
|
JOB_HANDLERS["memory_extract"] = handle_memory_extract
|
|
JOB_HANDLERS["memory_store"] = handle_memory_store
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# PHASE 9: CLEARANCE PROTOCOL — approval gating for high-risk operations
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _clearance_describe(job_type: str, payload: dict) -> str:
|
|
"""Human-readable description of what the job would do."""
|
|
if job_type == "shell":
|
|
cmd = payload.get("command", payload.get("cmd", "?"))
|
|
agent = payload.get("agent_id", payload.get("agent", "unknown"))
|
|
return f"Execute shell command on agent '{agent}': {str(cmd)[:120]}"
|
|
if job_type == "send_email":
|
|
to = payload.get("to_email") or payload.get("target", "?")
|
|
subj = payload.get("subject", "")
|
|
tid = payload.get("triage_id")
|
|
if tid:
|
|
return f"Send SMTP reply to triage item #{tid} (to: {to})"
|
|
return f"Send email to {to}" + (f" — {subj}" if subj else "")
|
|
if job_type == "remote_exec":
|
|
cmd_type = payload.get("command_type", payload.get("command", "?"))
|
|
agent = payload.get("agent_id", payload.get("agent", "?"))
|
|
return f"Remote execute '{cmd_type}' on agent '{agent}'"
|
|
if job_type == "purge":
|
|
return "Purge all completed/failed jobs from the Arc Reactor queue"
|
|
return f"Execute {job_type} job"
|
|
|
|
async def _check_clearance(job_type: str, payload: dict, created_by: str) -> Optional[dict]:
|
|
"""
|
|
Returns a clearance-pending response dict if approval is required,
|
|
or None if the job can proceed immediately.
|
|
"""
|
|
rule = await db_fetchone(
|
|
"SELECT * FROM clearance_rules WHERE job_type=%s AND enabled=1 AND require_approval=1",
|
|
(job_type,)
|
|
)
|
|
if not rule:
|
|
return None
|
|
desc = _clearance_describe(job_type, payload)
|
|
expires = None
|
|
if rule.get("auto_approve_after_min") is not None:
|
|
expires = datetime.utcnow() + timedelta(minutes=int(rule["auto_approve_after_min"]))
|
|
cr_id = await db_execute(
|
|
"INSERT INTO clearance_requests (job_type,job_payload,risk_level,description,requested_by,status,expires_at) "
|
|
"VALUES (%s,%s,%s,%s,%s,'pending',%s)",
|
|
(job_type, json.dumps(payload), rule["risk_level"], desc, created_by, expires)
|
|
)
|
|
log.warning(f"[CLEARANCE] Request #{cr_id} — {rule['risk_level'].upper()} — {desc}")
|
|
return {
|
|
"status": "pending_clearance",
|
|
"clearance_id": cr_id,
|
|
"risk_level": rule["risk_level"],
|
|
"description": desc,
|
|
"message": f"◈ CLEARANCE REQUIRED — {rule['risk_level'].upper()} risk operation intercepted. Approval needed before execution.",
|
|
}
|
|
|
|
async def _dispatch_cleared_job(cr_id: int, job_type: str, payload: dict, priority: int = 7, decided_by: str = "admin") -> dict:
|
|
"""Approve a clearance request and dispatch the job into the queue."""
|
|
jid = await db_execute(
|
|
"INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)",
|
|
(job_type, json.dumps(payload), priority, f"clearance:{cr_id}")
|
|
)
|
|
await db_execute(
|
|
"UPDATE clearance_requests SET status='approved', decided_by=%s, arc_job_id=%s, decided_at=NOW() WHERE id=%s",
|
|
(decided_by, jid, cr_id)
|
|
)
|
|
log.info(f"[CLEARANCE] Request #{cr_id} approved by {decided_by} → Job #{jid}")
|
|
return {"job_id": jid, "clearance_id": cr_id, "status": "approved"}
|
|
|
|
# ── Clearance watchdog ─────────────────────────────────────────────────────────
|
|
|
|
async def clearance_watchdog() -> None:
|
|
"""Expire timed-out pending requests; auto-approve those with auto_approve_after_min set."""
|
|
log.info("Clearance watchdog started")
|
|
await asyncio.sleep(20)
|
|
while True:
|
|
try:
|
|
now = datetime.utcnow()
|
|
# Expire requests past their expiry with no auto-approve (expires_at IS NULL means never expires)
|
|
expired = await db_fetchall(
|
|
"SELECT cr.*, cru.auto_approve_after_min FROM clearance_requests cr "
|
|
"JOIN clearance_rules cru ON cru.job_type=cr.job_type "
|
|
"WHERE cr.status='pending' AND cr.expires_at IS NOT NULL AND cr.expires_at <= %s",
|
|
(now,)
|
|
) or []
|
|
for req in expired:
|
|
if req.get("auto_approve_after_min") is not None:
|
|
# Auto-approve: dispatch the job
|
|
payload = req.get("job_payload") or {}
|
|
if isinstance(payload, str):
|
|
try: payload = json.loads(payload)
|
|
except Exception: payload = {}
|
|
await _dispatch_cleared_job(req["id"], req["job_type"], payload, decided_by="auto_approve")
|
|
log.info(f"[CLEARANCE] Auto-approved request #{req['id']} ({req['job_type']})")
|
|
else:
|
|
await db_execute(
|
|
"UPDATE clearance_requests SET status='expired', decided_at=NOW() WHERE id=%s",
|
|
(req["id"],)
|
|
)
|
|
log.info(f"[CLEARANCE] Expired request #{req['id']} ({req['job_type']})")
|
|
except Exception as exc:
|
|
log.error(f"Clearance watchdog error: {exc}")
|
|
await asyncio.sleep(60)
|
|
|
|
# ── Mission trigger loop ───────────────────────────────────────────────────────
|
|
|
|
_mission_trigger_state: dict = {} # mission_id → last_triggered ISO
|
|
|
|
async def mission_trigger_loop() -> None:
|
|
"""Check scheduled and event-based mission triggers every 30 seconds."""
|
|
log.info("Mission trigger loop started")
|
|
await asyncio.sleep(15) # stagger startup
|
|
while True:
|
|
try:
|
|
missions = await db_fetchall(
|
|
"SELECT * FROM missions WHERE enabled=1 AND trigger_type != 'manual'"
|
|
)
|
|
for m in missions:
|
|
mid = m["id"]
|
|
ttype = m["trigger_type"]
|
|
cfg = m.get("trigger_config") or {}
|
|
if isinstance(cfg, str):
|
|
try: cfg = json.loads(cfg)
|
|
except Exception: cfg = {}
|
|
|
|
should_run = False
|
|
source = ttype
|
|
|
|
if ttype == "schedule":
|
|
interval_min = int(cfg.get("interval_minutes", 60))
|
|
last_run = m.get("last_run_at")
|
|
if last_run is None:
|
|
should_run = True
|
|
else:
|
|
last_dt = last_run if isinstance(last_run, datetime) else datetime.fromisoformat(str(last_run))
|
|
elapsed = (datetime.utcnow() - last_dt).total_seconds() / 60
|
|
should_run = elapsed >= interval_min
|
|
|
|
elif ttype == "guardian_event":
|
|
severity = cfg.get("severity", "")
|
|
etype = cfg.get("event_type", "")
|
|
last_key = f"ge_{mid}"
|
|
last_seen = _mission_trigger_state.get(last_key, "1970-01-01")
|
|
wheres = ["acknowledged=0", "created_at > %s"]
|
|
params: list = [last_seen]
|
|
if severity: wheres.append("severity=%s"); params.append(severity)
|
|
if etype: wheres.append("event_type=%s"); params.append(etype)
|
|
row = await db_fetchone(
|
|
f"SELECT id, created_at FROM guardian_events WHERE {' AND '.join(wheres)} ORDER BY created_at DESC LIMIT 1",
|
|
params
|
|
)
|
|
if row:
|
|
should_run = True
|
|
_mission_trigger_state[last_key] = str(row["created_at"])
|
|
source = f"guardian_event:{row['id']}"
|
|
|
|
elif ttype == "email_keyword":
|
|
keywords = cfg.get("keywords", [])
|
|
category = cfg.get("category", "")
|
|
last_key = f"ek_{mid}"
|
|
last_seen = _mission_trigger_state.get(last_key, "1970-01-01")
|
|
if keywords:
|
|
kw_conds = " OR ".join(["subject LIKE %s OR summary LIKE %s"] * len(keywords))
|
|
kw_params = []
|
|
for kw in keywords:
|
|
kw_params += [f"%{kw}%", f"%{kw}%"]
|
|
where = f"created_at > %s AND ({kw_conds})"
|
|
params = [last_seen] + kw_params
|
|
if category:
|
|
where += " AND category=%s"
|
|
params.append(category)
|
|
row = await db_fetchone(
|
|
f"SELECT id, created_at FROM email_triage WHERE {where} ORDER BY created_at DESC LIMIT 1",
|
|
params
|
|
)
|
|
if row:
|
|
should_run = True
|
|
_mission_trigger_state[last_key] = str(row["created_at"])
|
|
source = f"email_triage:{row['id']}"
|
|
|
|
if should_run:
|
|
log.info(f"[MISSION TRIGGER] Mission {mid} ({m['name']}) triggered by {source}")
|
|
asyncio.create_task(_execute_mission(mid, source))
|
|
|
|
except Exception as exc:
|
|
log.error(f"Mission trigger loop error: {exc}")
|
|
await asyncio.sleep(30)
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# JOB RUNNER + BACKGROUND TASKS
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
_running_jobs: set = set()
|
|
_stats = {"done": 0, "failed": 0}
|
|
|
|
async def run_job(job: dict) -> None:
|
|
jid = job["id"]
|
|
jtype = job["job_type"]
|
|
_running_jobs.add(jid)
|
|
try:
|
|
payload = job.get("payload") or {}
|
|
if isinstance(payload, str):
|
|
payload = json.loads(payload)
|
|
await db_execute("UPDATE arc_jobs SET status='running', started_at=NOW() WHERE id=%s", (jid,))
|
|
log.info(f"[JOB {jid}] Starting {jtype}")
|
|
handler = JOB_HANDLERS.get(jtype)
|
|
if not handler:
|
|
raise ValueError(f"Unknown job type: {jtype}")
|
|
result = await handler(payload)
|
|
result_str = json.dumps(result, default=str)
|
|
await db_execute("UPDATE arc_jobs SET status='done', result=%s, completed_at=NOW() WHERE id=%s", (result_str, jid))
|
|
_stats["done"] += 1
|
|
log.info(f"[JOB {jid}] Done: {jtype}")
|
|
except Exception as exc:
|
|
err = str(exc)
|
|
await db_execute("UPDATE arc_jobs SET status='failed', error=%s, completed_at=NOW() WHERE id=%s", (err[:2000], jid))
|
|
_stats["failed"] += 1
|
|
log.warning(f"[JOB {jid}] Failed: {err[:120]}")
|
|
finally:
|
|
_running_jobs.discard(jid)
|
|
|
|
async def job_poller() -> None:
|
|
log.info("Arc Reactor job poller started")
|
|
while True:
|
|
try:
|
|
jobs = await db_fetchall("SELECT * FROM arc_jobs WHERE status='queued' ORDER BY priority DESC, id ASC LIMIT 5")
|
|
for job in jobs:
|
|
if job["id"] not in _running_jobs:
|
|
asyncio.create_task(run_job(job))
|
|
except Exception as exc:
|
|
log.error(f"Poller error: {exc}")
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
async def heartbeat_loop() -> None:
|
|
while True:
|
|
try:
|
|
await db_execute(
|
|
"UPDATE arc_status SET last_heartbeat=NOW(), active_jobs=%s, jobs_done=%s, jobs_failed=%s WHERE id=1",
|
|
(len(_running_jobs), _stats["done"], _stats["failed"])
|
|
)
|
|
except Exception as exc:
|
|
log.error(f"Heartbeat error: {exc}")
|
|
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
# FASTAPI APP
|
|
# ═══════════════════════════════════════════════════════════════════════════════
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
log.info(f"◈ JARVIS Arc Reactor v{VERSION} starting on {HOST}:{PORT}")
|
|
await get_pool()
|
|
await db_execute("UPDATE arc_status SET started_at=NOW(), last_heartbeat=NOW(), version=%s WHERE id=1", (VERSION,))
|
|
asyncio.create_task(job_poller())
|
|
asyncio.create_task(heartbeat_loop())
|
|
asyncio.create_task(guardian_loop())
|
|
asyncio.create_task(mission_trigger_loop())
|
|
asyncio.create_task(clearance_watchdog())
|
|
log.info(f"◈ Arc Reactor online — {len(JOB_HANDLERS)} handlers: {', '.join(JOB_HANDLERS)}")
|
|
yield
|
|
log.info("◈ Arc Reactor shutting down")
|
|
if _pool and not _pool.closed:
|
|
_pool.close()
|
|
await _pool.wait_closed()
|
|
|
|
app = FastAPI(title="JARVIS Arc Reactor", version=VERSION, lifespan=lifespan)
|
|
|
|
# ── Mission Ops endpoints ──────────────────────────────────────────────────────
|
|
|
|
@app.get("/missions")
|
|
async def missions_list(enabled: Optional[int] = None):
|
|
if enabled is not None:
|
|
rows = await db_fetchall("SELECT * FROM missions WHERE enabled=%s ORDER BY id DESC", (enabled,))
|
|
else:
|
|
rows = await db_fetchall("SELECT * FROM missions ORDER BY id DESC")
|
|
return rows or []
|
|
|
|
@app.get("/missions/{mission_id}")
|
|
async def mission_get(mission_id: int):
|
|
m = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,))
|
|
if not m:
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
steps = await db_fetchall(
|
|
"SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC",
|
|
(mission_id,)
|
|
)
|
|
runs = await db_fetchall(
|
|
"SELECT id, status, trigger_source, started_at, completed_at FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT 10",
|
|
(mission_id,)
|
|
)
|
|
return {**m, "steps": steps or [], "recent_runs": runs or []}
|
|
|
|
@app.post("/missions")
|
|
async def mission_create(req: Request):
|
|
body = await req.json()
|
|
name = body.get("name", "").strip()
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="name required")
|
|
desc = body.get("description", "")
|
|
ttype = body.get("trigger_type", "manual")
|
|
tcfg = json.dumps(body.get("trigger_config") or {})
|
|
enabled = int(body.get("enabled", 1))
|
|
mid = await db_execute(
|
|
"INSERT INTO missions (name,description,trigger_type,trigger_config,enabled) VALUES (%s,%s,%s,%s,%s)",
|
|
(name, desc, ttype, tcfg, enabled)
|
|
)
|
|
steps = body.get("steps") or []
|
|
for i, s in enumerate(steps):
|
|
await db_execute(
|
|
"INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)",
|
|
(mid, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0)))
|
|
)
|
|
return {"id": mid, "ok": True}
|
|
|
|
@app.put("/missions/{mission_id}")
|
|
async def mission_update(mission_id: int, req: Request):
|
|
body = await req.json()
|
|
fields: list = []
|
|
params: list = []
|
|
for col in ("name", "description", "trigger_type", "enabled"):
|
|
if col in body:
|
|
fields.append(f"{col}=%s")
|
|
params.append(body[col])
|
|
if "trigger_config" in body:
|
|
fields.append("trigger_config=%s")
|
|
params.append(json.dumps(body["trigger_config"]))
|
|
if fields:
|
|
params.append(mission_id)
|
|
await db_execute(f"UPDATE missions SET {','.join(fields)} WHERE id=%s", params)
|
|
if "steps" in body:
|
|
await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,))
|
|
for i, s in enumerate(body["steps"]):
|
|
await db_execute(
|
|
"INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)",
|
|
(mission_id, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0)))
|
|
)
|
|
return {"ok": True}
|
|
|
|
@app.delete("/missions/{mission_id}")
|
|
async def mission_delete(mission_id: int):
|
|
await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,))
|
|
await db_execute("DELETE FROM mission_runs WHERE mission_id=%s", (mission_id,))
|
|
await db_execute("DELETE FROM missions WHERE id=%s", (mission_id,))
|
|
return {"ok": True}
|
|
|
|
@app.post("/missions/{mission_id}/run")
|
|
async def mission_run_endpoint(mission_id: int, req: Request):
|
|
try:
|
|
body = await req.json()
|
|
except Exception:
|
|
body = {}
|
|
source = body.get("trigger_source", "manual") if isinstance(body, dict) else "manual"
|
|
return await _execute_mission(mission_id, source)
|
|
|
|
@app.get("/missions/{mission_id}/runs")
|
|
async def mission_runs_list(mission_id: int, limit: int = 20):
|
|
rows = await db_fetchall(
|
|
"SELECT * FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT %s",
|
|
(mission_id, limit)
|
|
)
|
|
return rows or []
|
|
|
|
# ── Clearance FastAPI endpoints ────────────────────────────────────────────────
|
|
|
|
@app.get("/clearance/pending")
|
|
async def clearance_pending():
|
|
rows = await db_fetchall(
|
|
"SELECT * FROM clearance_requests WHERE status='pending' ORDER BY created_at DESC"
|
|
)
|
|
return rows or []
|
|
|
|
@app.get("/clearance/history")
|
|
async def clearance_history(limit: int = 50):
|
|
rows = await db_fetchall(
|
|
"SELECT * FROM clearance_requests ORDER BY created_at DESC LIMIT %s", (limit,)
|
|
)
|
|
return rows or []
|
|
|
|
@app.post("/clearance/{cr_id}/approve")
|
|
async def clearance_approve(cr_id: int, req: Request):
|
|
try: body = await req.json()
|
|
except Exception: body = {}
|
|
decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin"
|
|
cr = await db_fetchone("SELECT * FROM clearance_requests WHERE id=%s AND status='pending'", (cr_id,))
|
|
if not cr:
|
|
raise HTTPException(status_code=404, detail="Clearance request not found or not pending")
|
|
payload = cr.get("job_payload") or {}
|
|
if isinstance(payload, str):
|
|
try: payload = json.loads(payload)
|
|
except Exception: payload = {}
|
|
return await _dispatch_cleared_job(cr_id, cr["job_type"], payload, decided_by=decided_by)
|
|
|
|
@app.post("/clearance/{cr_id}/deny")
|
|
async def clearance_deny(cr_id: int, req: Request):
|
|
try: body = await req.json()
|
|
except Exception: body = {}
|
|
decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin"
|
|
note = body.get("note", "") if isinstance(body, dict) else ""
|
|
await db_execute(
|
|
"UPDATE clearance_requests SET status='denied', decided_by=%s, decision_note=%s, decided_at=NOW() WHERE id=%s",
|
|
(decided_by, note, cr_id)
|
|
)
|
|
log.info(f"[CLEARANCE] Request #{cr_id} denied by {decided_by}")
|
|
return {"ok": True, "clearance_id": cr_id, "status": "denied"}
|
|
|
|
@app.get("/clearance/rules")
|
|
async def clearance_rules_list():
|
|
rows = await db_fetchall("SELECT * FROM clearance_rules ORDER BY risk_level DESC, job_type ASC")
|
|
return rows or []
|
|
|
|
@app.put("/clearance/rules/{rule_id}")
|
|
async def clearance_rule_update(rule_id: int, req: Request):
|
|
body = await req.json()
|
|
fields: list = []
|
|
params: list = []
|
|
for col in ("risk_level", "require_approval", "auto_approve_after_min", "enabled", "description"):
|
|
if col in body:
|
|
fields.append(f"{col}=%s")
|
|
params.append(body[col])
|
|
if not fields:
|
|
raise HTTPException(status_code=400, detail="Nothing to update")
|
|
params.append(rule_id)
|
|
await db_execute(f"UPDATE clearance_rules SET {','.join(fields)} WHERE id=%s", params)
|
|
return {"ok": True}
|
|
|
|
@app.post("/clearance/rules")
|
|
async def clearance_rule_create(req: Request):
|
|
body = await req.json()
|
|
job_type = body.get("job_type", "").strip()
|
|
if not job_type:
|
|
raise HTTPException(status_code=400, detail="job_type required")
|
|
rid = await db_execute(
|
|
"INSERT INTO clearance_rules (job_type,risk_level,require_approval,auto_approve_after_min,description,enabled) "
|
|
"VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE "
|
|
"risk_level=%s,require_approval=%s,auto_approve_after_min=%s,description=%s,enabled=%s",
|
|
(job_type,
|
|
body.get("risk_level","high"), int(body.get("require_approval",1)),
|
|
body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)),
|
|
body.get("risk_level","high"), int(body.get("require_approval",1)),
|
|
body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)))
|
|
)
|
|
return {"ok": True, "id": rid}
|
|
|
|
# ── Memory Core FastAPI endpoints ──────────────────────────────────────────────
|
|
|
|
@app.get("/memory/facts")
|
|
async def memory_facts_list(limit: int = 100, category: str = "", search: str = ""):
|
|
where = "WHERE active=1"
|
|
params: list = []
|
|
if category:
|
|
where += " AND category=%s"
|
|
params.append(category)
|
|
if search:
|
|
where += " AND (subject LIKE %s OR predicate LIKE %s OR object LIKE %s)"
|
|
params += [f"%{search}%"] * 3
|
|
rows = await db_fetchall(
|
|
f"SELECT * FROM memory_facts {where} ORDER BY confirmed_count DESC, last_confirmed_at DESC LIMIT %s",
|
|
params + [limit]
|
|
)
|
|
return rows or []
|
|
|
|
@app.post("/memory/facts")
|
|
async def memory_fact_create(req: Request):
|
|
body = await req.json()
|
|
subj = str(body.get("subject", "")).strip()[:255]
|
|
pred = str(body.get("predicate", "is")).strip()[:255]
|
|
obj = str(body.get("object", "")).strip()
|
|
cat = body.get("category", "fact")
|
|
if not subj or not obj:
|
|
raise HTTPException(status_code=400, detail="subject and object required")
|
|
valid = {"preference","person","place","routine","goal","fact","instruction"}
|
|
if cat not in valid: cat = "fact"
|
|
fid = await db_execute(
|
|
"""INSERT INTO memory_facts (category, subject, predicate, object, confidence, source)
|
|
VALUES (%s,%s,%s,%s,1.0,'explicit')
|
|
ON DUPLICATE KEY UPDATE
|
|
object=VALUES(object), confidence=1.0,
|
|
confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""",
|
|
(cat, subj, pred, obj)
|
|
)
|
|
return {"ok": True, "id": fid}
|
|
|
|
@app.delete("/memory/facts/{fact_id}")
|
|
async def memory_fact_delete(fact_id: int):
|
|
await db_execute("UPDATE memory_facts SET active=0 WHERE id=%s", (fact_id,))
|
|
return {"ok": True}
|
|
|
|
@app.delete("/memory/facts")
|
|
async def memory_facts_clear(category: str = ""):
|
|
if category:
|
|
await db_execute("UPDATE memory_facts SET active=0 WHERE category=%s", (category,))
|
|
else:
|
|
await db_execute("UPDATE memory_facts SET active=0 WHERE active=1", ())
|
|
return {"ok": True}
|
|
|
|
@app.get("/memory/context")
|
|
async def memory_context(message: str = "", limit: int = 15):
|
|
"""Return relevant memory facts for a given message (for prompt injection)."""
|
|
params: list = []
|
|
if message.strip():
|
|
words = [w for w in message.lower().split() if len(w) > 3][:10]
|
|
if words:
|
|
like_clauses = " OR ".join(["subject LIKE %s OR object LIKE %s"] * len(words))
|
|
for w in words:
|
|
params += [f"%{w}%", f"%{w}%"]
|
|
rows = await db_fetchall(
|
|
f"SELECT * FROM memory_facts WHERE active=1 AND ({like_clauses}) "
|
|
f"ORDER BY confirmed_count DESC, confidence DESC LIMIT %s",
|
|
params + [limit]
|
|
) or []
|
|
# Pad with top general facts if sparse
|
|
if len(rows) < 5:
|
|
existing = [r["id"] for r in rows]
|
|
excl = ("AND id NOT IN (" + ",".join(["%s"]*len(existing)) + ")") if existing else ""
|
|
extra = await db_fetchall(
|
|
f"SELECT * FROM memory_facts WHERE active=1 {excl} "
|
|
f"ORDER BY confirmed_count DESC LIMIT %s",
|
|
existing + [max(1, limit - len(rows))]
|
|
) or []
|
|
rows = rows + extra
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,)
|
|
) or []
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,)
|
|
) or []
|
|
lines = [f"[{r['category']}] {r['subject']} {r['predicate']}: {r['object']}" for r in rows]
|
|
return {"facts": rows, "context": "\n".join(lines) if lines else ""}
|
|
|
|
@app.get("/memory/stats")
|
|
async def memory_stats():
|
|
total = await db_fetchone("SELECT COUNT(*) cnt FROM memory_facts WHERE active=1")
|
|
by_cat = await db_fetchall(
|
|
"SELECT category, COUNT(*) cnt FROM memory_facts WHERE active=1 GROUP BY category ORDER BY cnt DESC"
|
|
)
|
|
recent = await db_fetchall(
|
|
"SELECT * FROM memory_facts WHERE active=1 ORDER BY last_confirmed_at DESC LIMIT 5"
|
|
)
|
|
return {
|
|
"total": total["cnt"] if total else 0,
|
|
"by_category": by_cat or [],
|
|
"recent": recent or [],
|
|
}
|
|
|
|
@app.get("/status")
|
|
async def status():
|
|
row = await db_fetchone("SELECT * FROM arc_status WHERE id=1")
|
|
queued = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='queued'")
|
|
running = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='running'")
|
|
return {
|
|
"online": True, "version": VERSION,
|
|
"last_heartbeat": row["last_heartbeat"].isoformat() if row and row["last_heartbeat"] else None,
|
|
"started_at": row["started_at"].isoformat() if row and row["started_at"] else None,
|
|
"jobs_done": row["jobs_done"] if row else 0,
|
|
"jobs_failed": row["jobs_failed"] if row else 0,
|
|
"active_jobs": len(_running_jobs),
|
|
"queued_jobs": queued["cnt"] if queued else 0,
|
|
"running_jobs": running["cnt"] if running else 0,
|
|
"handlers": list(JOB_HANDLERS.keys()),
|
|
}
|
|
|
|
@app.post("/job")
|
|
async def create_job(request: Request):
|
|
body = await request.json()
|
|
jtype = body.get("type", "")
|
|
payload = body.get("payload", {})
|
|
priority = int(body.get("priority", 5))
|
|
created_by = body.get("created_by", "jarvis")
|
|
if not jtype:
|
|
raise HTTPException(status_code=400, detail="Missing job type")
|
|
clearance = await _check_clearance(jtype, payload, created_by)
|
|
if clearance:
|
|
return clearance
|
|
jid = await db_execute(
|
|
"INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)",
|
|
(jtype, json.dumps(payload), priority, created_by)
|
|
)
|
|
return {"job_id": jid, "status": "queued"}
|
|
|
|
@app.get("/job/{job_id}")
|
|
async def get_job(job_id: int):
|
|
job = await db_fetchone("SELECT * FROM arc_jobs WHERE id=%s", (job_id,))
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
result = job.get("result")
|
|
if result and isinstance(result, str):
|
|
try:
|
|
result = json.loads(result)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"id": job["id"], "job_type": job["job_type"], "status": job["status"],
|
|
"result": result, "error": job.get("error"),
|
|
"created_at": job["created_at"].isoformat() if job["created_at"] else None,
|
|
"started_at": job["started_at"].isoformat() if job["started_at"] else None,
|
|
"completed_at": job["completed_at"].isoformat() if job["completed_at"] else None,
|
|
}
|
|
|
|
@app.delete("/job/{job_id}")
|
|
async def cancel_job(job_id: int):
|
|
await db_execute("UPDATE arc_jobs SET status='cancelled' WHERE id=%s AND status='queued'", (job_id,))
|
|
return {"ok": True}
|
|
|
|
@app.get("/jobs")
|
|
async def list_jobs(limit: int = 50, status: str = ""):
|
|
if status:
|
|
rows = await db_fetchall(
|
|
"SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs WHERE status=%s ORDER BY id DESC LIMIT %s",
|
|
(status, limit)
|
|
)
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s",
|
|
(limit,)
|
|
)
|
|
for r in rows:
|
|
if r.get("created_at"): r["created_at"] = r["created_at"].isoformat()
|
|
if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat()
|
|
return rows
|
|
|
|
@app.get("/jobs/recent")
|
|
async def recent_jobs(limit: int = 10, job_type: str = ""):
|
|
if job_type:
|
|
rows = await db_fetchall(
|
|
"SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs WHERE job_type=%s ORDER BY id DESC LIMIT %s",
|
|
(job_type, limit)
|
|
)
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s",
|
|
(limit,)
|
|
)
|
|
for r in rows:
|
|
if r.get("created_at"): r["created_at"] = r["created_at"].isoformat()
|
|
if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat()
|
|
if r.get("result") and isinstance(r["result"], str):
|
|
try:
|
|
r["result"] = json.loads(r["result"])
|
|
except Exception:
|
|
pass
|
|
return rows
|
|
|
|
@app.get("/triage")
|
|
async def get_triage(limit: int = 30, account: str = "gmail"):
|
|
rows = await db_fetchall(
|
|
"""SELECT id, from_name, from_email, subject, date_received, category, priority,
|
|
summary, draft_reply, action_taken, created_at
|
|
FROM email_triage WHERE account=%s
|
|
ORDER BY priority DESC, created_at DESC LIMIT %s""",
|
|
(account, limit)
|
|
)
|
|
for r in rows:
|
|
if r.get("date_received"): r["date_received"] = str(r["date_received"])
|
|
if r.get("created_at"): r["created_at"] = str(r["created_at"])
|
|
return rows
|
|
|
|
@app.post("/triage/{triage_id}/action")
|
|
async def triage_action(triage_id: int, request: Request):
|
|
body = await request.json()
|
|
action = body.get("action", "dismissed")
|
|
await db_execute("UPDATE email_triage SET action_taken=%s WHERE id=%s", (action, triage_id))
|
|
return {"ok": True}
|
|
|
|
@app.delete("/jobs/purge")
|
|
async def purge_jobs():
|
|
await db_execute("DELETE FROM arc_jobs WHERE status IN ('done','failed','cancelled') AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)")
|
|
return {"ok": True}
|
|
|
|
# ── VISION PROTOCOL endpoints ─────────────────────────────────────────────────
|
|
|
|
@app.get("/screenshots")
|
|
async def list_screenshots(limit: int = 20, agent: str = ""):
|
|
if agent:
|
|
rows = await db_fetchall(
|
|
"SELECT id, agent_id, hostname, method, width, height, file_size, "
|
|
" vision_analysis, vision_provider, created_at "
|
|
"FROM agent_screenshots WHERE hostname LIKE %s "
|
|
"ORDER BY created_at DESC LIMIT %s",
|
|
(f"%{agent}%", limit)
|
|
)
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT id, agent_id, hostname, method, width, height, file_size, "
|
|
" vision_analysis, vision_provider, created_at "
|
|
"FROM agent_screenshots ORDER BY created_at DESC LIMIT %s",
|
|
(limit,)
|
|
)
|
|
return rows or []
|
|
|
|
@app.get("/screenshots/{screenshot_id}")
|
|
async def get_screenshot(screenshot_id: int):
|
|
row = await db_fetchone(
|
|
"SELECT * FROM agent_screenshots WHERE id=%s", (screenshot_id,)
|
|
)
|
|
if not row:
|
|
from fastapi import HTTPException
|
|
raise HTTPException(status_code=404, detail="Screenshot not found")
|
|
return row
|
|
|
|
@app.delete("/screenshots/{screenshot_id}")
|
|
async def delete_screenshot(screenshot_id: int):
|
|
await db_execute("DELETE FROM agent_screenshots WHERE id=%s", (screenshot_id,))
|
|
return {"ok": True}
|
|
|
|
@app.delete("/screenshots/purge")
|
|
async def purge_screenshots():
|
|
await db_execute("DELETE FROM agent_screenshots WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY)")
|
|
return {"ok": True}
|
|
|
|
# ── GUARDIAN MODE endpoints ───────────────────────────────────────────────────
|
|
|
|
@app.get("/guardian/status")
|
|
async def guardian_status():
|
|
cfg = await _guardian_get_config()
|
|
counts = await db_fetchone(
|
|
"""SELECT
|
|
SUM(acknowledged=0) AS unread,
|
|
SUM(severity='critical' AND acknowledged=0) AS critical_unread,
|
|
SUM(severity='warning' AND acknowledged=0) AS warning_unread,
|
|
SUM(created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)) AS events_24h
|
|
FROM guardian_events"""
|
|
)
|
|
return {
|
|
"enabled": cfg.get("enabled", "1") == "1",
|
|
"scan_interval": int(cfg.get("scan_interval", 120)),
|
|
"last_scan": _guardian_state.get("last_scan"),
|
|
"last_sitrep": _guardian_state.get("last_sitrep"),
|
|
"thresholds": {
|
|
"cpu": float(cfg.get("cpu_threshold", 85)),
|
|
"memory": float(cfg.get("mem_threshold", 88)),
|
|
"disk": float(cfg.get("disk_threshold", 88)),
|
|
"offline_minutes": int(cfg.get("offline_minutes", 3)),
|
|
},
|
|
"counts": dict(counts) if counts else {},
|
|
}
|
|
|
|
@app.get("/guardian/events")
|
|
async def guardian_events_list(limit: int = 30, unread: bool = False,
|
|
severity: str = "", since: str = ""):
|
|
wheres = []
|
|
params: list = []
|
|
if unread:
|
|
wheres.append("acknowledged = 0")
|
|
if severity:
|
|
wheres.append("severity = %s"); params.append(severity)
|
|
if since:
|
|
wheres.append("created_at > %s"); params.append(since)
|
|
where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else ""
|
|
params.append(limit)
|
|
rows = await db_fetchall(
|
|
f"SELECT * FROM guardian_events {where_sql} ORDER BY created_at DESC LIMIT %s",
|
|
params
|
|
)
|
|
return rows or []
|
|
|
|
@app.post("/guardian/events/{event_id}/ack")
|
|
async def guardian_ack(event_id: int):
|
|
await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE id=%s", (event_id,))
|
|
return {"ok": True}
|
|
|
|
@app.post("/guardian/events/ack_all")
|
|
async def guardian_ack_all():
|
|
await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE acknowledged=0")
|
|
return {"ok": True}
|
|
|
|
@app.get("/guardian/chat")
|
|
async def guardian_chat_events(since: str = ""):
|
|
"""Return proactive guardian messages injected into conversations."""
|
|
if since:
|
|
rows = await db_fetchall(
|
|
"SELECT id, message, created_at FROM conversations "
|
|
"WHERE session_id='guardian' AND created_at > %s ORDER BY created_at ASC",
|
|
(since,)
|
|
)
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT id, message, created_at FROM conversations "
|
|
"WHERE session_id='guardian' ORDER BY created_at DESC LIMIT 10"
|
|
)
|
|
return rows or []
|
|
|
|
# ── COMMS v2 endpoints ───────────────────────────────────────────────────────
|
|
|
|
@app.get("/comms/sent")
|
|
async def comms_sent(limit: int = 50, status: str = ""):
|
|
if status:
|
|
rows = await db_fetchall(
|
|
"SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id "
|
|
"FROM email_sent WHERE status=%s ORDER BY sent_at DESC LIMIT %s",
|
|
(status, limit)
|
|
)
|
|
else:
|
|
rows = await db_fetchall(
|
|
"SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id "
|
|
"FROM email_sent ORDER BY sent_at DESC LIMIT %s",
|
|
(limit,)
|
|
)
|
|
return rows or []
|
|
|
|
@app.get("/comms/sent/{sent_id}")
|
|
async def comms_sent_get(sent_id: int):
|
|
row = await db_fetchone("SELECT * FROM email_sent WHERE id=%s", (sent_id,))
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
return row
|
|
|
|
@app.delete("/comms/sent/{sent_id}")
|
|
async def comms_sent_delete(sent_id: int):
|
|
await db_execute("DELETE FROM email_sent WHERE id=%s", (sent_id,))
|
|
return {"ok": True}
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run("reactor:app", host=HOST, port=PORT, log_level="info", access_log=False)
|