#!/usr/bin/env python3 """ JARVIS Arc Reactor — Core Daemon v9.0 Phase 1: Job queue, ping/echo/shell Phase 2: Intel Protocol (research), Iron Protocol (tool loop), LLM router Phase 3: Gmail Triage (Comms Protocol), Remote Exec (Field Protocol) Phase 4: Vision Protocol (screenshot, vision, sysinfo) Phase 5: Guardian Mode (continuous awareness, proactive alerts, SITREP) Phase 6: Comms v2 — send email, compose, schedule event, meeting prep Phase 7: Mission Ops — multi-step automated workflows with trigger engine Phase 8: Mission Directives — OKR/goal tracking with AI progress review Phase 9: Clearance Protocol — approval gating for high-risk operations """ import asyncio import email as _email_lib import imaplib import json import logging import os import re import sys import traceback from contextlib import asynccontextmanager from datetime import datetime, timedelta from email.header import decode_header as _decode_header from email.utils import parsedate_to_datetime from typing import Any, Optional import aiomysql import aiohttp import uvicorn from fastapi import FastAPI, HTTPException, Request # ── CONFIG ──────────────────────────────────────────────────────────────────── HOST = "127.0.0.1" PORT = 7474 VERSION = "9.0.0" DB_HOST = "localhost" DB_PORT = 3306 DB_USER = "jarvis_user" DB_PASS = "J4rv1s_Pr0t0c0l_2026!" DB_NAME = "jarvis_db" LOG_FILE = "/home/jarvis.orbishosting.com/logs/arc_reactor.log" POLL_INTERVAL = 3 HEARTBEAT_INTERVAL = 30 CLAUDE_API_KEY = "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" CLAUDE_MODEL = "claude-sonnet-4-6" GROQ_API_KEY = "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" GROQ_MODEL = "llama-3.3-70b-versatile" OLLAMA_HOST = "http://10.48.200.95:11434" OLLAMA_MODEL = "llama3.2:1b" GMAIL_USER = "myronblair@gmail.com" GMAIL_PASS = "demsvdylwweacbcx" ICLOUD_USER = "myronblair@icloud.com" ICLOUD_PASS = "yxfi-yvzu-geqk-japr" # ── LOGGING ─────────────────────────────────────────────────────────────────── os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) logging.basicConfig( level=logging.INFO, format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout), ], ) log = logging.getLogger("arc_reactor") # ── DB POOL ─────────────────────────────────────────────────────────────────── _pool: Optional[aiomysql.Pool] = None async def get_pool() -> aiomysql.Pool: global _pool if _pool is None or _pool.closed: _pool = await aiomysql.create_pool( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, db=DB_NAME, autocommit=True, minsize=2, maxsize=10, charset="utf8mb4", ) return _pool async def db_execute(sql: str, args: tuple = ()) -> int: pool = await get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, args) return cur.lastrowid async def db_fetchone(sql: str, args: tuple = ()) -> Optional[dict]: pool = await get_pool() async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) return await cur.fetchone() async def db_fetchall(sql: str, args: tuple = ()) -> list: pool = await get_pool() async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) return await cur.fetchall() # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 1 HANDLERS # ═══════════════════════════════════════════════════════════════════════════════ async def handle_ping(payload: dict) -> dict: return {"pong": True, "timestamp": datetime.utcnow().isoformat(), "version": VERSION} async def handle_echo(payload: dict) -> dict: return {"echo": payload.get("message", ""), "timestamp": datetime.utcnow().isoformat()} async def handle_shell(payload: dict) -> dict: cmd = payload.get("command", "").strip() allowed = ("df ", "free ", "uptime", "hostname", "uname", "ps ", "cat /proc/") if not any(cmd.startswith(p) for p in allowed): raise ValueError(f"Command not in whitelist: {cmd[:40]}") proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) return {"stdout": stdout.decode().strip(), "stderr": stderr.decode().strip(), "exit_code": proc.returncode} # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 2: LLM ROUTER # ═══════════════════════════════════════════════════════════════════════════════ async def llm_call(messages: list, provider: str = "claude", system: str = "") -> str: if provider == "claude" and CLAUDE_API_KEY: return await _claude_call(messages, system) elif provider == "groq" and GROQ_API_KEY: return await _groq_call(messages, system) elif provider == "ollama": return await _ollama_call(messages, system) for p in ["groq", "ollama"]: try: return await llm_call(messages, p, system) except Exception: continue raise RuntimeError("All LLM providers failed") async def _claude_call(messages: list, system: str = "") -> str: payload = {"model": CLAUDE_MODEL, "max_tokens": 4096, "messages": messages} if system: payload["system"] = system headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post("https://api.anthropic.com/v1/messages", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp: data = await resp.json() if resp.status != 200: raise RuntimeError(f"Claude API error {resp.status}: {data.get('error',{}).get('message','')}") return data["content"][0]["text"] async def _groq_call(messages: list, system: str = "") -> str: all_msgs = ([{"role": "system", "content": system}] if system else []) + messages payload = {"model": GROQ_MODEL, "messages": all_msgs, "max_tokens": 4096} headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post("https://api.groq.com/openai/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp: data = await resp.json() if resp.status != 200: raise RuntimeError(f"Groq error {resp.status}") return data["choices"][0]["message"]["content"] async def _ollama_call(messages: list, system: str = "") -> str: prompt = (system + "\n\n" if system else "") + "\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages) async with aiohttp.ClientSession() as session: async with session.post(f"{OLLAMA_HOST}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, timeout=aiohttp.ClientTimeout(total=30)) as resp: data = await resp.json() return data.get("response", "") async def handle_llm(payload: dict) -> dict: message = payload.get("message", "") system = payload.get("system", "You are JARVIS, an Iron Man-style AI assistant.") provider = payload.get("provider", "claude") if not message: raise ValueError("Missing message") result = await llm_call([{"role": "user", "content": message}], provider, system) return {"response": result, "provider": provider} # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 2: INTEL PROTOCOL — RESEARCH ENGINE # ═══════════════════════════════════════════════════════════════════════════════ async def _web_search(query: str, max_results: int = 6) -> list: try: from duckduckgo_search import DDGS results = [] with DDGS() as ddgs: for r in ddgs.text(query, max_results=max_results): results.append({"url": r.get("href",""), "title": r.get("title",""), "snippet": r.get("body","")}) return results except Exception as e: log.warning(f"DDG search failed: {e}") return [] async def _fetch_url_content(url: str, timeout: int = 12) -> str: try: import trafilatura headers = {"User-Agent": "Mozilla/5.0 (compatible; JARVIS-Research/3.0)"} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout), allow_redirects=True, ssl=False) as resp: if resp.status != 200: return "" html = await resp.text(errors="replace") text = trafilatura.extract(html, include_links=False, include_images=False, no_fallback=False, favor_precision=True) return (text or "")[:4000] except Exception as e: log.debug(f"Fetch failed {url[:60]}: {e}") return "" async def handle_research(payload: dict) -> dict: query = payload.get("query", "").strip() depth = payload.get("depth", "standard") provider = payload.get("provider", "claude") if not query: raise ValueError("Missing research query") max_sources = {"quick": 3, "standard": 5, "deep": 8}.get(depth, 5) log.info(f"[INTEL] Research: '{query}' depth={depth} sources={max_sources}") search_results = await _web_search(query, max_results=max_sources + 2) if not search_results: search_results = [{"url": "", "title": query, "snippet": f"No search results for: {query}"}] fetch_tasks = [_fetch_url_content(r["url"]) for r in search_results if r.get("url")] page_texts = await asyncio.gather(*fetch_tasks, return_exceptions=True) sources = [] for i, r in enumerate(search_results[:max_sources]): text = page_texts[i] if i < len(page_texts) and isinstance(page_texts[i], str) else "" sources.append({"url": r["url"], "title": r["title"], "snippet": r["snippet"], "content": text}) context_parts = [] for i, s in enumerate(sources, 1): body = s["content"] or s["snippet"] if body: context_parts.append(f"SOURCE {i}: {s['title']}\nURL: {s['url']}\n{body[:3000]}\n") context_text = "\n---\n".join(context_parts) if context_parts else "No page content available." synthesis_prompt = ( f'You are JARVIS, an advanced AI research assistant. The user asked: "{query}"\n\n' f"You have retrieved the following source material:\n\n{context_text}\n\n" f"Provide a comprehensive research briefing with:\n" f"1. **EXECUTIVE SUMMARY** (2-3 sentences)\n" f"2. **KEY FINDINGS** (5-8 bullet points)\n" f"3. **DETAILS** (thorough synthesis, 2-4 paragraphs)\n" f"4. **CONFIDENCE** (High/Medium/Low based on source quality)\n\n" f"Be precise, factual, and cite source numbers where relevant." ) try: synthesis = await llm_call([{"role": "user", "content": synthesis_prompt}], provider=provider) except Exception as e: log.warning(f"[INTEL] Synthesis failed, falling back: {e}") synthesis = f"**RESEARCH BRIEFING: {query}**\n\n" + "\n\n".join(f"**{s['title']}**\n{s['snippet']}" for s in sources) key_points = [] for line in synthesis.split("\n"): line = line.strip() if line.startswith(("- ", "• ", "* ")) and len(line) > 10: key_points.append(re.sub(r'^[-•*]\s*', '', line)) elif re.match(r'^\d+\.\s+', line) and len(line) > 10: key_points.append(re.sub(r'^\d+\.\s+', '', line)) clean_sources = [{"url": s["url"], "title": s["title"] or s["url"]} for s in sources if s.get("url")] log.info(f"[INTEL] Research complete: '{query}' — {len(sources)} sources") return {"query": query, "depth": depth, "synthesis": synthesis, "key_points": key_points[:10], "sources": clean_sources, "source_count": len(clean_sources), "provider": provider} # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 2: IRON PROTOCOL — MULTI-STEP TOOL LOOP # ═══════════════════════════════════════════════════════════════════════════════ AGENT_TOOLS = [ {"name": "web_search", "description": "Search the web for current information.", "input_schema": {"type": "object", "properties": {"query": {"type": "string"}, "max_results": {"type": "integer", "default": 5}}, "required": ["query"]}}, {"name": "fetch_url", "description": "Fetch a web page and extract its main text content.", "input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}}, {"name": "jarvis_agents", "description": "Get status of all JARVIS agents.", "input_schema": {"type": "object", "properties": {}}}, {"name": "jarvis_alerts", "description": "Get current active JARVIS alerts.", "input_schema": {"type": "object", "properties": {}}}, {"name": "current_time", "description": "Get current date and time.", "input_schema": {"type": "object", "properties": {}}}, ] async def execute_agent_tool(tool_name: str, tool_input: dict) -> str: try: if tool_name == "web_search": results = await _web_search(tool_input.get("query", ""), tool_input.get("max_results", 5)) if not results: return "No search results found." return "\n".join(f"{i+1}. {r['title']}\n URL: {r['url']}\n {r['snippet']}" for i, r in enumerate(results)) elif tool_name == "fetch_url": url = tool_input.get("url", "") if not url: return "No URL provided." return await _fetch_url_content(url) or "Could not extract content." elif tool_name == "jarvis_agents": agents = await db_fetchall("SELECT hostname, agent_type, ip_address, status, last_seen FROM registered_agents ORDER BY status='online' DESC, hostname") if not agents: return "No agents registered." return "\n".join(f"- {a['hostname']} ({a['agent_type']}) @ {a['ip_address']} — {a['status'].upper()}" for a in agents) elif tool_name == "jarvis_alerts": alerts = await db_fetchall("SELECT title, severity, message FROM alerts WHERE resolved=0 ORDER BY created_at DESC LIMIT 10") return "\n".join(f"- [{a['severity'].upper()}] {a['title']}: {a['message']}" for a in alerts) or "No active alerts." elif tool_name == "current_time": return datetime.utcnow().strftime("UTC: %Y-%m-%d %H:%M:%S") else: return f"Unknown tool: {tool_name}" except Exception as e: return f"Tool error ({tool_name}): {str(e)[:200]}" async def handle_tool_loop(payload: dict) -> dict: task = payload.get("task", "").strip() max_iterations = min(int(payload.get("max_iterations", 15)), 200) system_prompt = payload.get("system", ( "You are JARVIS, an advanced autonomous AI assistant with access to tools. " "Use tools methodically to complete the task. Be thorough but concise. " "When you have enough information, provide a clear final answer." )) if not task: raise ValueError("Missing task") log.info(f"[IRON] Tool loop: '{task[:80]}' max_iter={max_iterations}") import anthropic client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) messages = [{"role": "user", "content": task}] iteration = 0 final_text = "" while iteration < max_iterations: iteration += 1 response = await client.messages.create( model=CLAUDE_MODEL, max_tokens=4096, system=system_prompt, tools=AGENT_TOOLS, messages=messages, ) assistant_content = [] text_parts = [] tool_uses = [] for block in response.content: if block.type == "text": assistant_content.append({"type": "text", "text": block.text}) text_parts.append(block.text) elif block.type == "tool_use": assistant_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input}) tool_uses.append(block) messages.append({"role": "assistant", "content": assistant_content}) if text_parts: final_text = "\n".join(text_parts) if response.stop_reason == "end_turn" or not tool_uses: log.info(f"[IRON] Complete after {iteration} iterations") break tool_results = [] for tu in tool_uses: log.info(f"[IRON] Tool: {tu.name}") result = await execute_agent_tool(tu.name, tu.input) tool_results.append({"type": "tool_result", "tool_use_id": tu.id, "content": result[:3000]}) messages.append({"role": "user", "content": tool_results}) tools_used = list({b["name"] for m in messages if isinstance(m["content"], list) for b in m["content"] if isinstance(b, dict) and b.get("type") == "tool_use"}) return {"task": task, "result": final_text, "iterations": iteration, "tools_used": tools_used} # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 3: COMMS PROTOCOL — GMAIL TRIAGE # ═══════════════════════════════════════════════════════════════════════════════ def _imap_fetch_sync(account: str, max_msgs: int) -> list: """Synchronous IMAP fetch — called via run_in_executor.""" if account == "icloud": host, port, user, passwd = "imap.mail.me.com", 993, ICLOUD_USER, ICLOUD_PASS else: host, port, user, passwd = "imap.gmail.com", 993, GMAIL_USER, GMAIL_PASS if not user or not passwd: return [] try: mbox = imaplib.IMAP4_SSL(host, port) mbox.login(user, passwd) mbox.select("INBOX") _, data = mbox.search(None, "ALL") ids = data[0].split()[-max_msgs:] msgs = [] for mid in reversed(ids): _, raw = mbox.fetch(mid, "(RFC822)") if not raw or not raw[0]: continue msg = _email_lib.message_from_bytes(raw[0][1]) # Subject subj_raw = msg.get("Subject", "") subject = "" for part, enc in _decode_header(subj_raw): if isinstance(part, bytes): subject += part.decode(enc or "utf-8", errors="replace") else: subject += str(part) # From from_raw = msg.get("From", "") from_name = "" for part, enc in _decode_header(from_raw): if isinstance(part, bytes): from_name += part.decode(enc or "utf-8", errors="replace") else: from_name += str(part) from_name = from_name.strip().strip('"') em = re.search(r"<([^>]+)>", from_raw) from_email = em.group(1) if em else from_raw # Date date_str = msg.get("Date", "") # Body preview body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: body = payload.decode(part.get_content_charset() or "utf-8", errors="replace") break else: payload = msg.get_payload(decode=True) if payload: body = payload.decode(msg.get_content_charset() or "utf-8", errors="replace") body = " ".join(body.split())[:500] msg_uid = (msg.get("Message-ID", "") or f"{from_email}:{subject}:{date_str}").strip("<>") msgs.append({ "msg_id": msg_uid[:255], "from_name": from_name[:100], "from_email": from_email[:100], "subject": subject[:255], "date_raw": date_str, "body": body, }) mbox.logout() return msgs except Exception as e: log.warning(f"IMAP fetch failed ({account}): {e}") return [] async def handle_gmail_triage(payload: dict) -> dict: account = payload.get("account", "gmail") max_emails = min(int(payload.get("max_emails", 20)), 40) provider = payload.get("provider", "claude") log.info(f"[COMMS] Gmail triage: account={account} max={max_emails}") loop = asyncio.get_event_loop() emails = await loop.run_in_executor(None, lambda: _imap_fetch_sync(account, max_emails)) if not emails: return {"account": account, "triaged": 0, "urgent": 0, "action": 0, "meeting": 0, "items": [], "error": "No emails fetched or IMAP unavailable"} email_list = "" for i, e in enumerate(emails, 1): email_list += ( f"EMAIL {i}:\n" f"From: {e['from_name']} <{e['from_email']}>\n" f"Subject: {e['subject']}\n" f"Date: {e['date_raw']}\n" f"Body: {e['body'][:400]}\n\n" ) triage_prompt = ( f"You are JARVIS, triaging {len(emails)} emails for Myron Blair.\n\n" "For each email assign:\n" "- category: urgent | action | reply | meeting | info | promo | spam\n" "- priority: 1-10 (10 = drop everything)\n" "- summary: one sentence\n" "- draft_reply: for urgent/action/reply/meeting ONLY — brief professional reply. Empty string otherwise.\n\n" "Return ONLY a valid JSON array. Example:\n" '[{"index":1,"category":"urgent","priority":9,"summary":"Contract needs signing today","draft_reply":"Hi, I will review and sign today."}]\n\n' f"EMAILS TO TRIAGE:\n{email_list}" ) try: raw = await llm_call([{"role": "user", "content": triage_prompt}], provider) raw = raw.strip() if raw.startswith("```"): raw = "\n".join(raw.split("\n")[1:]) if raw.endswith("```"): raw = raw[:-3] triage_items = json.loads(raw.strip()) except Exception as e: log.warning(f"[COMMS] Triage parse error: {e}") triage_items = [{"index": i+1, "category": "info", "priority": 3, "summary": f"Subject: {e2['subject']}", "draft_reply": ""} for i, e2 in enumerate(emails)] # Save to email_triage table saved = 0 for item in triage_items: idx = int(item.get("index", 0)) - 1 if idx < 0 or idx >= len(emails): continue e = emails[idx] try: date_parsed = None if e.get("date_raw"): try: date_parsed = parsedate_to_datetime(e["date_raw"]).strftime("%Y-%m-%d %H:%M:%S") except Exception: pass await db_execute( """INSERT INTO email_triage (msg_id, account, from_name, from_email, subject, date_received, category, priority, summary, draft_reply, action_taken) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'none') ON DUPLICATE KEY UPDATE category=VALUES(category), priority=VALUES(priority), summary=VALUES(summary), draft_reply=VALUES(draft_reply)""", (e["msg_id"], account, e["from_name"], e["from_email"], e["subject"], date_parsed, item.get("category", "info"), int(item.get("priority", 3)), item.get("summary", "")[:500], item.get("draft_reply", "")[:3000]) ) saved += 1 except Exception as ex: log.debug(f"[COMMS] Save triage error: {ex}") counts = {} for item in triage_items: c = item.get("category", "info") counts[c] = counts.get(c, 0) + 1 urgent_items = [] for it in triage_items: idx = int(it.get("index", 0)) - 1 if 0 <= idx < len(emails) and it.get("category") in ("urgent", "action", "reply", "meeting"): urgent_items.append({ "from": emails[idx]["from_name"], "from_email": emails[idx]["from_email"], "subject": emails[idx]["subject"][:80], "summary": it.get("summary", ""), "priority": it.get("priority", 0), "draft_reply": it.get("draft_reply", ""), "category": it.get("category", ""), }) urgent_items.sort(key=lambda x: -x["priority"]) log.info(f"[COMMS] Triage complete: {len(emails)} emails, {saved} saved, counts={counts}") return { "account": account, "triaged": len(emails), "saved": saved, "counts": counts, "urgent": counts.get("urgent", 0), "action": counts.get("action", 0), "meeting": counts.get("meeting", 0), "reply": counts.get("reply", 0), "items": urgent_items[:10], } # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 3: FIELD PROTOCOL — REMOTE EXEC # ═══════════════════════════════════════════════════════════════════════════════ async def handle_remote_exec(payload: dict) -> dict: """ Queue a command to a JARVIS Field Station agent and wait for the result. payload: { agent, command_type, command_data, timeout } """ agent_name = payload.get("agent", "").strip() cmd_type = payload.get("command_type", "ping") cmd_data = payload.get("command_data", {}) timeout_secs = min(int(payload.get("timeout", 35)), 120) if not agent_name: raise ValueError("Missing agent name") log.info(f"[FIELD] remote_exec {cmd_type} on {agent_name}") # NOTE: _dispatch_agent_command is defined in Phase 4 block below. # Forward declaration — called at runtime, not at import. dispatch = await _dispatch_agent_command(agent_name, cmd_type, cmd_data, timeout_secs) return { "agent": dispatch["agent"], "agent_id": dispatch["agent_id"], "command_type": cmd_type, "command_data": cmd_data, "cmd_id": dispatch["cmd_id"], "status": dispatch["status"], "result": dispatch["result"], } # ── PHASE 4: VISION PROTOCOL ────────────────────────────────────────────────── async def _dispatch_agent_command(agent_name: str, cmd_type: str, cmd_data: dict, timeout_secs: int = 45) -> dict: """Shared helper: find agent, queue command, poll for result.""" agent = await db_fetchone( """SELECT agent_id, hostname, status FROM registered_agents WHERE (hostname LIKE %s OR agent_id LIKE %s) AND status='online' LIMIT 1""", (f"%{agent_name}%", f"%{agent_name}%") ) if not agent: all_agents = await db_fetchall("SELECT hostname FROM registered_agents WHERE status='online'") names = [a["hostname"] for a in all_agents] raise ValueError(f"No online agent matching '{agent_name}'. Online: {', '.join(names) or 'none'}") cmd_id = await db_execute( "INSERT INTO agent_commands (agent_id, command_type, command_data) VALUES (%s, %s, %s)", (agent["agent_id"], cmd_type, json.dumps(cmd_data)) ) elapsed = 0 while elapsed < timeout_secs: await asyncio.sleep(2) elapsed += 2 row = await db_fetchone("SELECT status, result FROM agent_commands WHERE id=%s", (cmd_id,)) if row and row["status"] in ("executed", "failed"): result = row.get("result") if result and isinstance(result, str): try: result = json.loads(result) except Exception: pass return {"agent": agent["hostname"], "agent_id": agent["agent_id"], "cmd_id": cmd_id, "status": row["status"], "result": result or {}} await db_execute("UPDATE agent_commands SET status='failed' WHERE id=%s AND status='pending'", (cmd_id,)) raise TimeoutError(f"No response from {agent['hostname']} within {timeout_secs}s") async def handle_screenshot(payload: dict) -> dict: """ Capture a screenshot (or system snapshot) from a field agent, then optionally run vision analysis via Claude. payload: { agent, analyze: true|false, analyze_prompt: "...", timeout: 45 } """ agent_name = payload.get("agent", "").strip() do_analyze = bool(payload.get("analyze", True)) analyze_prompt = payload.get("analyze_prompt", "Describe what you see on this screen in detail. Note any important status indicators, errors, running processes, or interesting information.") timeout_secs = min(int(payload.get("timeout", 45)), 120) if not agent_name: raise ValueError("Missing agent name") log.info(f"[VISION] Screenshot request: agent={agent_name} analyze={do_analyze}") # Dispatch screenshot command to field agent dispatch = await _dispatch_agent_command(agent_name, "screenshot", {}, timeout_secs) result = dispatch.get("result", {}) hostname = dispatch.get("agent", agent_name) if dispatch.get("status") == "failed" or not result.get("success"): err = result.get("error", "Agent returned failure") if isinstance(result, dict) else str(result) return {"agent": hostname, "success": False, "error": err} image_b64 = result.get("image_b64", "") method = result.get("method", "unknown") width = result.get("width", 0) height = result.get("height", 0) file_size = result.get("file_size", 0) # Run Claude vision analysis if we have an image analysis = "" provider_used = "" if do_analyze and image_b64: try: import anthropic client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) msg = await client.messages.create( model="claude-opus-4-8-20251101", max_tokens=1024, messages=[{ "role": "user", "content": [ {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_b64}}, {"type": "text", "text": analyze_prompt}, ], }], ) analysis = msg.content[0].text if msg.content else "" provider_used = "claude" log.info(f"[VISION] Claude analysis complete ({len(analysis)} chars)") except Exception as e: log.warning(f"[VISION] Claude vision failed: {e}") analysis = f"Vision analysis unavailable: {e}" elif do_analyze and not image_b64 and result.get("snapshot_type") == "text": # Text-only sysinfo snapshot — summarize with LLM try: snap_text = json.dumps(result, indent=2)[:3000] prompt = f"Summarize this server system snapshot for JARVIS. Highlight any concerns:\n\n{snap_text}" analysis = await llm_call([{"role": "user", "content": prompt}], "claude") provider_used = "claude" except Exception as e: analysis = f"Analysis unavailable: {e}" # Store screenshot screenshot_id = await db_execute( """INSERT INTO agent_screenshots (agent_id, hostname, method, image_b64, width, height, file_size, vision_analysis, vision_provider) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", (dispatch.get("agent_id", ""), hostname, method, image_b64, width, height, file_size, analysis, provider_used) ) log.info(f"[VISION] Screenshot saved: id={screenshot_id} agent={hostname} method={method}") return { "agent": hostname, "screenshot_id": screenshot_id, "method": method, "width": width, "height": height, "file_size": file_size, "has_image": bool(image_b64), "analysis": analysis, "provider": provider_used, } async def handle_vision(payload: dict) -> dict: """ Run Claude vision on an already-stored screenshot or a provided base64 image. payload: { screenshot_id OR image_b64, prompt, provider } """ screenshot_id = payload.get("screenshot_id") image_b64 = payload.get("image_b64", "") prompt = payload.get("prompt", "Describe this image in detail.") provider = payload.get("provider", "claude") if screenshot_id: row = await db_fetchone( "SELECT image_b64, hostname, method FROM agent_screenshots WHERE id=%s", (int(screenshot_id),) ) if not row or not row["image_b64"]: raise ValueError(f"Screenshot {screenshot_id} not found or has no image data") image_b64 = row["image_b64"] hostname = row.get("hostname", "unknown") else: hostname = "direct" if not image_b64: raise ValueError("No image data provided") log.info(f"[VISION] Analysis: screenshot_id={screenshot_id} agent={hostname}") try: import anthropic client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) msg = await client.messages.create( model="claude-opus-4-8-20251101", max_tokens=2048, messages=[{ "role": "user", "content": [ {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_b64}}, {"type": "text", "text": prompt}, ], }], ) analysis = msg.content[0].text if msg.content else "" except Exception as e: raise RuntimeError(f"Vision analysis failed: {e}") # Update stored screenshot if we have an ID if screenshot_id: await db_execute( "UPDATE agent_screenshots SET vision_analysis=%s, vision_provider=%s WHERE id=%s", (analysis, "claude", int(screenshot_id)) ) return { "agent": hostname, "screenshot_id": screenshot_id, "prompt": prompt, "analysis": analysis, "provider": "claude", } async def handle_sysinfo(payload: dict) -> dict: """ Get a structured system snapshot from a field agent (no image). Optionally ask Claude to summarize/assess the health of the machine. payload: { agent, analyze: true|false, timeout: 30 } """ agent_name = payload.get("agent", "").strip() do_analyze = bool(payload.get("analyze", True)) timeout_secs = min(int(payload.get("timeout", 30)), 60) if not agent_name: raise ValueError("Missing agent name") dispatch = await _dispatch_agent_command(agent_name, "sysinfo", {}, timeout_secs) result = dispatch.get("result", {}) hostname = dispatch.get("agent", agent_name) if dispatch.get("status") == "failed": return {"agent": hostname, "success": False, "error": result.get("error", "Command failed")} analysis = "" if do_analyze: try: snap = json.dumps(result, indent=2)[:3000] summary_prompt = ( f"You are JARVIS analyzing a field station sysinfo snapshot from '{hostname}'.\n\n" f"Snapshot:\n{snap}\n\n" "Provide a concise health assessment: status (healthy/warning/critical), " "key metrics, any concerns, and recommended actions if needed. " "Keep it under 150 words, JARVIS style." ) analysis = await llm_call([{"role": "user", "content": summary_prompt}], "claude") except Exception as e: analysis = f"Analysis unavailable: {e}" return { "agent": hostname, "success": True, "snapshot": result, "analysis": analysis, } # ═══════════════════════════════════════════════════════════════════════════════ # JOB HANDLER REGISTRY # ═══════════════════════════════════════════════════════════════════════════════ JOB_HANDLERS = { "ping": handle_ping, "echo": handle_echo, "shell": handle_shell, "llm": handle_llm, "research": handle_research, "tool_loop": handle_tool_loop, "gmail_triage": handle_gmail_triage, "remote_exec": handle_remote_exec, # Phase 4 "screenshot": handle_screenshot, "vision": handle_vision, "sysinfo": handle_sysinfo, # Phase 5 "sitrep": None, # registered after guardian functions are defined "guardian_config": None, } # ── PHASE 5: GUARDIAN MODE ──────────────────────────────────────────────────── # In-memory state: tracks last alert time per (agent_id, metric) to debounce _guardian_state: dict = { "enabled": True, "last_scan": None, "last_sitrep": None, "alert_cooldown": {}, # key: "agent_id:metric" → last_alert_epoch "online_snapshot": {}, # agent_id → was_online bool } GUARDIAN_COOLDOWN = 600 # seconds between repeat alerts for same metric async def _guardian_get_config() -> dict: rows = await db_fetchall("SELECT key_name, value FROM guardian_config") return {r["key_name"]: r["value"] for r in rows} if rows else {} async def _guardian_emit(event_type: str, severity: str, agent_id: str, hostname: str, metric: str, value: float, threshold: float, message: str, ai_analysis: str = "") -> int: return await db_execute( """INSERT INTO guardian_events (event_type, severity, agent_id, hostname, metric, value, threshold, message, ai_analysis) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", (event_type, severity, agent_id, hostname, metric, value, threshold, message, ai_analysis) ) async def _guardian_cooldown_ok(agent_id: str, metric: str) -> bool: """Return True if enough time has passed since last alert for this agent+metric.""" import time key = f"{agent_id}:{metric}" last = _guardian_state["alert_cooldown"].get(key, 0) if (time.time() - last) >= GUARDIAN_COOLDOWN: _guardian_state["alert_cooldown"][key] = time.time() return True return False async def guardian_loop() -> None: """ Background task — runs every SCAN_INTERVAL seconds. Checks all agents for anomalies, emits guardian_events, optionally generates AI analysis for critical findings. """ import time log.info("[GUARDIAN] Guardian Mode activated") while True: try: cfg = await _guardian_get_config() if cfg.get("enabled", "1") == "0": await asyncio.sleep(60) continue scan_interval = int(cfg.get("scan_interval", 120)) cpu_thresh = float(cfg.get("cpu_threshold", 85)) mem_thresh = float(cfg.get("mem_threshold", 88)) disk_thresh = float(cfg.get("disk_threshold", 88)) offline_mins = int(cfg.get("offline_minutes", 3)) ai_on = cfg.get("ai_analysis", "1") == "1" proactive_chat = cfg.get("proactive_chat", "1") == "1" _guardian_state["last_scan"] = datetime.utcnow().isoformat() critical_findings = [] # ── 1. Agent online/offline transitions ─────────────────────────── agents = await db_fetchall( "SELECT agent_id, hostname, status, last_seen FROM registered_agents" ) current_snapshot = {} for ag in agents: aid = ag["agent_id"] hostname = ag["hostname"] is_online = ag["status"] == "online" current_snapshot[aid] = is_online was_online = _guardian_state["online_snapshot"].get(aid) if was_online is True and not is_online: # Agent went offline if await _guardian_cooldown_ok(aid, "offline"): eid = await _guardian_emit("agent_offline", "critical", aid, hostname, "status", 0, 1, f"Field station '{hostname}' went OFFLINE") critical_findings.append(f"⚠ {hostname} OFFLINE") log.warning(f"[GUARDIAN] {hostname} went offline → event #{eid}") elif was_online is False and is_online: # Agent came back if await _guardian_cooldown_ok(aid, "online"): await _guardian_emit("agent_online", "info", aid, hostname, "status", 1, 0, f"Field station '{hostname}' back ONLINE") log.info(f"[GUARDIAN] {hostname} came back online") _guardian_state["online_snapshot"] = current_snapshot # ── 2. Metrics analysis ─────────────────────────────────────────── # Get latest metric per online agent metrics_rows = await db_fetchall( """SELECT m.agent_id, r.hostname, m.metrics_json FROM agent_metrics m JOIN registered_agents r ON r.agent_id = m.agent_id WHERE r.status = 'online' AND m.recorded_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE) ORDER BY m.recorded_at DESC""" ) seen_agents: set = set() for row in metrics_rows: aid = row["agent_id"] if aid in seen_agents: continue seen_agents.add(aid) hostname = row["hostname"] try: m = json.loads(row["metrics_json"]) if isinstance(row["metrics_json"], str) else row["metrics_json"] sys_data = m.get("system", {}) cpu = sys_data.get("cpu_percent") if cpu is not None and float(cpu) >= cpu_thresh: sev = "critical" if float(cpu) >= 95 else "warning" if await _guardian_cooldown_ok(aid, "cpu"): msg = f"{hostname}: CPU at {cpu:.1f}% (threshold: {cpu_thresh}%)" await _guardian_emit("cpu_high", sev, aid, hostname, "cpu_percent", float(cpu), cpu_thresh, msg) critical_findings.append(f"CPU {hostname} {cpu:.0f}%") mem = sys_data.get("memory", {}) mem_pct = mem.get("percent") if isinstance(mem, dict) else None if mem_pct is not None and float(mem_pct) >= mem_thresh: sev = "critical" if float(mem_pct) >= 95 else "warning" if await _guardian_cooldown_ok(aid, "memory"): msg = f"{hostname}: Memory at {mem_pct:.1f}% (threshold: {mem_thresh}%)" await _guardian_emit("mem_high", sev, aid, hostname, "mem_percent", float(mem_pct), mem_thresh, msg) critical_findings.append(f"MEM {hostname} {mem_pct:.0f}%") disks = sys_data.get("disk", []) if isinstance(disks, list): for disk in disks: dpct = disk.get("percent", 0) if dpct and float(str(dpct).rstrip("%")) >= disk_thresh: mount = disk.get("mountpoint", "/") key = f"disk_{mount}" if await _guardian_cooldown_ok(aid, key): msg = f"{hostname}: Disk {mount} at {dpct}% (threshold: {disk_thresh}%)" sev = "critical" if float(str(dpct).rstrip("%")) >= 95 else "warning" await _guardian_emit("disk_high", sev, aid, hostname, key, float(str(dpct).rstrip("%")), disk_thresh, msg) critical_findings.append(f"DISK {hostname}{mount} {dpct}%") # Service anomalies services = sys_data.get("services", []) for svc in services: if svc.get("status") == "failed": svc_name = svc.get("service", "unknown") if await _guardian_cooldown_ok(aid, f"svc_{svc_name}"): msg = f"{hostname}: Service '{svc_name}' is FAILED" await _guardian_emit("service_down", "critical", aid, hostname, f"service:{svc_name}", 0, 1, msg) critical_findings.append(f"SVC {hostname}/{svc_name} FAILED") except Exception as e: log.debug(f"[GUARDIAN] Metrics parse error for {row['hostname']}: {e}") # ── 3. AI analysis for critical findings ────────────────────────── if critical_findings and ai_on: try: findings_text = "\n".join(f"- {f}" for f in critical_findings) ai_prompt = ( f"You are JARVIS. The following anomalies were just detected:\n\n" f"{findings_text}\n\n" "Provide a brief (2-3 sentences), Iron Man-style alert message " "for Myron. Be direct about severity and what action to take. " "No markdown, no headers." ) ai_msg = await llm_call([{"role": "user", "content": ai_prompt}], "claude") # Update the most recent guardian event with AI analysis await db_execute( """UPDATE guardian_events SET ai_analysis=%s WHERE ai_analysis='' AND created_at > DATE_SUB(NOW(), INTERVAL 1 MINUTE) ORDER BY id DESC LIMIT 1""", (ai_msg,) ) # Inject proactive chat message if configured if proactive_chat: await _guardian_inject_chat(f"◈ GUARDIAN ALERT — {ai_msg}") except Exception as e: log.warning(f"[GUARDIAN] AI analysis error: {e}") if critical_findings: log.warning(f"[GUARDIAN] Scan complete — {len(critical_findings)} findings: {', '.join(critical_findings)}") else: log.debug(f"[GUARDIAN] Scan complete — all clear") except Exception as exc: log.error(f"[GUARDIAN] Loop error: {exc}") await asyncio.sleep(scan_interval) async def _guardian_inject_chat(message: str) -> None: """Write a proactive JARVIS message into the conversations table so the HUD picks it up.""" try: await db_execute( """INSERT INTO conversations (session_id, role, message, created_at) VALUES ('guardian', 'assistant', %s, NOW())""", (message,) ) except Exception as e: log.debug(f"[GUARDIAN] Chat inject error: {e}") async def handle_sitrep(payload: dict) -> dict: """ Situation Report — comprehensive health briefing across all field stations. payload: { detail: brief|full, provider: claude } """ detail = payload.get("detail", "full") provider = payload.get("provider", "claude") log.info(f"[GUARDIAN] SITREP requested (detail={detail})") # 1. Gather agent status agents = await db_fetchall( """SELECT agent_id, hostname, status, ip_address, agent_type, capabilities, last_seen FROM registered_agents ORDER BY status DESC, hostname ASC""" ) # 2. Get latest metrics per agent metrics_map = {} metrics_rows = await db_fetchall( """SELECT m.agent_id, m.metrics_json, m.recorded_at FROM agent_metrics m WHERE m.recorded_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE) ORDER BY m.recorded_at DESC""" ) for row in metrics_rows: if row["agent_id"] not in metrics_map: try: m = json.loads(row["metrics_json"]) if isinstance(row["metrics_json"], str) else row["metrics_json"] metrics_map[row["agent_id"]] = m except Exception: pass # 3. Recent guardian events (last 24h) recent_events = await db_fetchall( """SELECT event_type, severity, hostname, metric, value, threshold, message, created_at FROM guardian_events WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR) ORDER BY created_at DESC LIMIT 20""" ) # 4. Arc Reactor job stats arc_stats = await db_fetchone( """SELECT SUM(status='queued') AS queued, SUM(status='running') AS running, SUM(status='done') AS done, SUM(status='failed') AS failed FROM arc_jobs WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)""" ) # 5. Build context for Claude agent_lines = [] for ag in agents: m = metrics_map.get(ag["agent_id"], {}) sys = m.get("system", {}) if m else {} cpu = sys.get("cpu_percent", "?") mem = sys.get("memory", {}).get("percent", "?") if isinstance(sys.get("memory"), dict) else "?" disk = next((d.get("percent","?") for d in (sys.get("disk") or []) if d.get("mountpoint") == "/"), "?") line = (f" {ag['hostname']} ({ag['status'].upper()}) — " f"CPU:{cpu}% MEM:{mem}% DISK:{disk}%") agent_lines.append(line) event_lines = [ f" [{e['severity'].upper()}] {e['hostname']}: {e['message']} ({e['created_at']})" for e in recent_events ] or [" No events in last 24h"] sitrep_context = ( f"JARVIS SITREP — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}\n\n" f"FIELD STATIONS ({len(agents)} total):\n" + "\n".join(agent_lines) + "\n\n" f"ARC REACTOR (last 24h): queued={arc_stats.get('queued',0)} " f"running={arc_stats.get('running',0)} done={arc_stats.get('done',0)} " f"failed={arc_stats.get('failed',0)}\n\n" f"GUARDIAN EVENTS (last 24h):\n" + "\n".join(event_lines) ) prompt = ( f"{sitrep_context}\n\n" f"You are JARVIS. Deliver a {'brief 3-sentence' if detail == 'brief' else 'comprehensive'} " f"situation report for Myron Blair. Iron Man style — direct, confident, actionable. " f"Highlight: overall health status, any critical issues requiring immediate attention, " f"and key metrics. " + ("Keep it under 80 words." if detail == "brief" else "Structure: Overall Status → Issues → Metrics → Recommendation.") ) analysis = await llm_call([{"role": "user", "content": prompt}], provider) _guardian_state["last_sitrep"] = datetime.utcnow().isoformat() # Log SITREP as guardian event await _guardian_emit("sitrep", "info", "system", "system", "sitrep", 0, 0, f"SITREP generated ({detail})", analysis) online_count = sum(1 for a in agents if a["status"] == "online") offline_count = len(agents) - online_count critical_events = sum(1 for e in recent_events if e["severity"] == "critical") return { "sitrep": analysis, "agents_online": online_count, "agents_offline": offline_count, "agents_total": len(agents), "events_24h": len(recent_events), "critical_24h": critical_events, "arc_stats": dict(arc_stats) if arc_stats else {}, "timestamp": datetime.utcnow().isoformat(), "detail": detail, } async def handle_guardian_config(payload: dict) -> dict: """ Get or set Guardian Mode configuration. payload: { action: get|set, key: ..., value: ... } or { action: set, config: {...} } """ action = payload.get("action", "get") if action == "get": cfg = await _guardian_get_config() return {"config": cfg, "guardian_state": { "enabled": _guardian_state.get("enabled"), "last_scan": _guardian_state.get("last_scan"), "last_sitrep": _guardian_state.get("last_sitrep"), }} elif action == "set": updates = payload.get("config", {}) if payload.get("key") and payload.get("value") is not None: updates[payload["key"]] = str(payload["value"]) for k, v in updates.items(): await db_execute( "INSERT INTO guardian_config (key_name, value) VALUES (%s, %s) " "ON DUPLICATE KEY UPDATE value=%s, updated_at=NOW()", (k, str(v), str(v)) ) if k == "enabled": _guardian_state["enabled"] = v not in ("0", "false", "off") return {"ok": True, "updated": list(updates.keys())} raise ValueError(f"Unknown guardian_config action: {action}") # Register Phase 5 handlers JOB_HANDLERS["sitrep"] = handle_sitrep JOB_HANDLERS["guardian_config"] = handle_guardian_config # ── PHASE 6: COMMS v2 — SEND EMAIL + SCHEDULE + MEETING PREP ───────────────── import smtplib import ssl as _ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import formataddr, make_msgid SMTP_SETTINGS = { "gmail": ("smtp.gmail.com", 587, GMAIL_USER, GMAIL_PASS), "icloud": ("smtp.mail.me.com", 587, ICLOUD_USER, ICLOUD_PASS), } def _smtp_send_sync(account: str, to_email: str, to_name: str, subject: str, body: str, reply_to_msg_id: str = "") -> dict: """Synchronous SMTP send — run in executor.""" host, port, user, passwd = SMTP_SETTINGS.get(account, SMTP_SETTINGS["gmail"]) if not user or not passwd: return {"success": False, "error": "No credentials configured for account"} try: msg = MIMEMultipart("alternative") from_addr = formataddr(("JARVIS / Myron Blair", user)) msg["From"] = from_addr msg["To"] = formataddr((to_name, to_email)) if to_name else to_email msg["Subject"] = subject msg_id = make_msgid(domain=user.split("@")[1]) msg["Message-ID"] = msg_id if reply_to_msg_id: msg["In-Reply-To"] = f"<{reply_to_msg_id.strip('<>')}>" msg["References"] = f"<{reply_to_msg_id.strip('<>')}>" msg.attach(MIMEText(body, "plain", "utf-8")) ctx = _ssl.create_default_context() with smtplib.SMTP(host, port, timeout=20) as smtp: smtp.ehlo() smtp.starttls(context=ctx) smtp.login(user, passwd) smtp.sendmail(user, [to_email], msg.as_bytes()) return {"success": True, "message_id": msg_id} except Exception as e: return {"success": False, "error": str(e)} async def handle_send_email(payload: dict) -> dict: """ Send an email from Gmail or iCloud. payload: { account: gmail|icloud, to_email: recipient address, to_name: recipient display name (optional), subject: subject line, body: plain-text body, triage_id: int (optional — if replying to a triage item), reply_to_msg_id: original Message-ID for threading (optional), compose: true — ask Claude to draft the body from a prompt first } """ account = payload.get("account", "gmail") to_email = payload.get("to_email", "").strip() to_name = payload.get("to_name", "").strip() subject = payload.get("subject", "").strip() body = payload.get("body", "").strip() triage_id = payload.get("triage_id") reply_msg_id = payload.get("reply_to_msg_id", "") compose_prompt = payload.get("compose_prompt", "") # If triage_id is given, pull recipient + subject + draft from email_triage if triage_id and not to_email: row = await db_fetchone( "SELECT from_email, from_name, subject, draft_reply, msg_id FROM email_triage WHERE id=%s", (int(triage_id),) ) if row: to_email = to_email or row["from_email"] to_name = to_name or row["from_name"] subject = subject or ("Re: " + (row["subject"] or "")) body = body or row.get("draft_reply", "") reply_msg_id = reply_msg_id or row.get("msg_id", "") if not to_email: raise ValueError("Missing to_email") # If compose requested, use Claude to write the body if compose_prompt and not body: draft_prompt = ( f"You are drafting an email on behalf of Myron Blair.\n" f"To: {to_name or to_email}\n" f"Subject: {subject}\n\n" f"Instructions: {compose_prompt}\n\n" "Write a professional, concise email body. No subject line, no salutation header. " "Start with 'Hi [name],' or similar. Sign off as 'Myron Blair'." ) body = await llm_call([{"role": "user", "content": draft_prompt}], "claude") if not subject: raise ValueError("Missing subject") if not body: raise ValueError("Missing email body — provide body or compose_prompt") log.info(f"[COMMS] Sending email: {account} → {to_email} | {subject[:60]}") loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: _smtp_send_sync(account, to_email, to_name, subject, body, reply_msg_id) ) # Record in email_sent status = "sent" if result["success"] else "failed" await db_execute( """INSERT INTO email_sent (account, to_email, to_name, subject, body, triage_id, status, error, message_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""", (account, to_email, to_name, subject, body, triage_id or None, status, result.get("error", ""), result.get("message_id", "")) ) # Update triage item if this was a reply if triage_id and result["success"]: await db_execute( "UPDATE email_triage SET action_taken='replied' WHERE id=%s", (int(triage_id),) ) log.info(f"[COMMS] Send result: {status} → {to_email}") return { "success": result["success"], "account": account, "to_email": to_email, "subject": subject, "status": status, "message_id": result.get("message_id", ""), "error": result.get("error", ""), "body": body, } async def handle_compose_email(payload: dict) -> dict: """ Compose an email from natural-language instructions — draft only (no auto-send). payload: { to_email, to_name, subject_hint, instructions, account, send: false } """ to_email = payload.get("to_email", "").strip() to_name = payload.get("to_name", "").strip() subject_hint = payload.get("subject_hint", "").strip() instructions = payload.get("instructions", "").strip() account = payload.get("account", "gmail") auto_send = bool(payload.get("send", False)) if not instructions: raise ValueError("Missing instructions for compose") # Generate subject if not given if not subject_hint: subj_prompt = f"Write a concise email subject line (max 8 words) for an email about: {instructions}" subject_hint = (await llm_call([{"role": "user", "content": subj_prompt}], "claude")).strip().strip('"') # Draft full body draft_prompt = ( f"You are drafting an email for Myron Blair.\n" f"To: {to_name or to_email or 'the recipient'}\n" f"Subject: {subject_hint}\n\n" f"Instructions: {instructions}\n\n" "Write a professional, concise email. Start with 'Hi [name],' or 'Hello,' " "and sign off as 'Myron Blair'. Return only the email body text." ) body = await llm_call([{"role": "user", "content": draft_prompt}], "claude") if auto_send and to_email: return await handle_send_email({ "account": account, "to_email": to_email, "to_name": to_name, "subject": subject_hint, "body": body, }) # Draft-only — save to email_sent with status='queued' for review draft_id = await db_execute( """INSERT INTO email_sent (account, to_email, to_name, subject, body, status) VALUES (%s,%s,%s,%s,%s,'queued')""", (account, to_email, to_name, subject_hint, body) ) log.info(f"[COMMS] Composed draft #{draft_id}: {to_email} | {subject_hint[:60]}") return { "draft_id": draft_id, "to_email": to_email, "to_name": to_name, "subject": subject_hint, "body": body, "account": account, "sent": False, "status": "queued", } async def handle_schedule_event(payload: dict) -> dict: """ Create or find an appointment in the JARVIS planner from natural language. payload: { title, description, start_at, end_at, location, category, natural_language, all_day, reminder_min, generate_ics } """ nl = payload.get("natural_language", "").strip() title = payload.get("title", "").strip() start_at = payload.get("start_at", "") end_at = payload.get("end_at", "") location = payload.get("location", "") category = payload.get("category", "work") all_day = bool(payload.get("all_day", False)) reminder = int(payload.get("reminder_min", 30)) description = payload.get("description", "") gen_ics = bool(payload.get("generate_ics", True)) # If natural language given, parse with Claude if nl and (not title or not start_at): now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") parse_prompt = ( f"Current datetime: {now_str}\n\n" f"Parse this scheduling request into JSON:\n\"{nl}\"\n\n" "Return ONLY valid JSON with these fields (no markdown):\n" '{"title":"...","start_at":"YYYY-MM-DD HH:MM:SS","end_at":"YYYY-MM-DD HH:MM:SS or null",' '"location":"...","category":"personal|work|medical|other","all_day":false,' '"description":"..."}\n' "Use 24h time. Default duration 1 hour if not specified. " "If only a date (no time) is given, set all_day=true." ) raw = await llm_call([{"role": "user", "content": parse_prompt}], "claude") raw = raw.strip().strip("```json").strip("```").strip() try: parsed = json.loads(raw) title = title or parsed.get("title", nl[:100]) start_at = start_at or parsed.get("start_at", "") end_at = end_at or parsed.get("end_at") or "" location = location or parsed.get("location", "") category = parsed.get("category", category) all_day = parsed.get("all_day", all_day) description = description or parsed.get("description", "") except Exception as e: log.warning(f"[COMMS] Schedule parse failed: {e} — raw: {raw[:100]}") if not title: title = nl[:100] if not title: raise ValueError("Could not determine event title from input") if not start_at: raise ValueError("Could not determine start time from input") # Insert into appointments table appt_id = await db_execute( """INSERT INTO appointments (title, description, category, start_at, end_at, location, all_day, reminder_min, external_source) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,'manual')""", (title, description, category, start_at, end_at or None, location, int(all_day), reminder) ) # Generate ICS if requested ics_content = "" if gen_ics and start_at: try: from datetime import datetime as dt uid = f"jarvis-{appt_id}@orbishosting.com" dtstart = start_at.replace(" ", "T").replace("-", "").replace(":", "") dtend = (end_at or start_at).replace(" ", "T").replace("-", "").replace(":", "") dtstamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") ics_content = ( "BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//JARVIS//EN\r\n" "BEGIN:VEVENT\r\n" f"UID:{uid}\r\n" f"DTSTAMP:{dtstamp}\r\n" f"DTSTART:{dtstart}\r\n" f"DTEND:{dtend}\r\n" f"SUMMARY:{title}\r\n" f"DESCRIPTION:{description}\r\n" f"LOCATION:{location}\r\n" "END:VEVENT\r\nEND:VCALENDAR\r\n" ) except Exception: pass log.info(f"[COMMS] Event scheduled: #{appt_id} '{title}' @ {start_at}") return { "appointment_id": appt_id, "title": title, "start_at": start_at, "end_at": end_at, "location": location, "category": category, "all_day": all_day, "ics": ics_content, "description": description, } async def handle_meeting_prep(payload: dict) -> dict: """ Prepare a briefing for an upcoming meeting. payload: { appointment_id OR title OR timeframe, research: true } """ appt_id = payload.get("appointment_id") title = payload.get("title", "").strip() timeframe = payload.get("timeframe", "today") do_research = bool(payload.get("research", True)) # Find the appointment appt = None if appt_id: appt = await db_fetchone("SELECT * FROM appointments WHERE id=%s", (int(appt_id),)) elif title: appt = await db_fetchone( "SELECT * FROM appointments WHERE title LIKE %s AND start_at >= NOW() ORDER BY start_at LIMIT 1", (f"%{title}%",) ) else: appt = await db_fetchone( """SELECT * FROM appointments WHERE start_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 24 HOUR) ORDER BY start_at LIMIT 1""" ) if not appt: # Check tasks too task = await db_fetchone( "SELECT * FROM tasks WHERE title LIKE %s AND status != 'done' ORDER BY due_date LIMIT 1", (f"%{title}%",) ) if task: appt = {"title": task["title"], "description": task.get("notes",""), "start_at": str(task.get("due_date") or ""), "location": ""} if not appt: return {"success": False, "error": f"No upcoming appointment found matching '{title or timeframe}'"} appt_title = appt.get("title", "") appt_start = str(appt.get("start_at", "")) appt_desc = appt.get("description", "") appt_loc = appt.get("location", "") # Optional: kick off a research job on the meeting topic/attendees research_summary = "" if do_research and appt_title: try: research_result = await handle_research({ "query": f"background information on: {appt_title}", "depth": "quick", "provider": "claude", }) research_summary = research_result.get("synthesis", "")[:1500] except Exception: pass # Build meeting briefing context = ( f"Meeting: {appt_title}\n" f"Time: {appt_start}\n" f"Location: {appt_loc or 'Not specified'}\n" f"Notes: {appt_desc or 'None'}\n" ) if research_summary: context += f"\nBackground Research:\n{research_summary}" prompt = ( f"You are JARVIS. Prepare a concise meeting briefing for Myron Blair.\n\n" f"{context}\n\n" "Format: Meeting summary → Key context/background → Suggested talking points → " "Action items to prepare. Keep it sharp and actionable. Iron Man style." ) briefing = await llm_call([{"role": "user", "content": prompt}], "claude") log.info(f"[COMMS] Meeting prep complete: '{appt_title}'") return { "appointment_id": appt.get("id"), "title": appt_title, "start_at": appt_start, "location": appt_loc, "briefing": briefing, "has_research": bool(research_summary), } # Register Phase 6 handlers JOB_HANDLERS["send_email"] = handle_send_email JOB_HANDLERS["compose_email"] = handle_compose_email JOB_HANDLERS["schedule_event"] = handle_schedule_event JOB_HANDLERS["meeting_prep"] = handle_meeting_prep # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 7: MISSION OPS — multi-step automated workflows # ═══════════════════════════════════════════════════════════════════════════════ import re as _re def _render_template(text: str, context: dict) -> str: """Replace {{key.subkey}} tokens in a string/JSON with values from context.""" if not isinstance(text, str): return text def replacer(m): path = m.group(1).strip().split(".") val = context for p in path: if isinstance(val, dict): val = val.get(p, m.group(0)) else: return m.group(0) return str(val) if not isinstance(val, (dict, list)) else json.dumps(val) return _re.sub(r'\{\{([^}]+)\}\}', replacer, text) def _apply_templates(payload: Any, context: dict) -> Any: """Recursively apply template substitution to a payload dict/list/str.""" if isinstance(payload, str): return _render_template(payload, context) if isinstance(payload, dict): return {k: _apply_templates(v, context) for k, v in payload.items()} if isinstance(payload, list): return [_apply_templates(v, context) for v in payload] return payload async def _execute_mission(mission_id: int, trigger_source: str = "manual") -> dict: """Run all steps of a mission sequentially, log results, return summary.""" mission = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,)) if not mission: return {"error": f"Mission {mission_id} not found"} steps = await db_fetchall( "SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC", (mission_id,) ) if not steps: return {"error": "Mission has no steps"} # Create run record run_id = await db_execute( "INSERT INTO mission_runs (mission_id, status, trigger_source, steps_log) VALUES (%s,'running',%s,'[]')", (mission_id, trigger_source) ) await db_execute( "UPDATE missions SET last_run_at=NOW(), run_count=run_count+1 WHERE id=%s", (mission_id,) ) context = {"trigger": {"source": trigger_source, "mission_id": mission_id}} steps_log = [] overall_status = "done" for i, step in enumerate(steps): step_label = step.get("label") or step["job_type"] raw_payload = step.get("job_payload") or {} if isinstance(raw_payload, str): try: raw_payload = json.loads(raw_payload) except Exception: raw_payload = {} payload = _apply_templates(raw_payload, context) step_entry = { "step": i, "label": step_label, "job_type": step["job_type"], "status": "running", "result": None, "error": None, } try: handler = JOB_HANDLERS.get(step["job_type"]) if not handler: raise ValueError(f"Unknown job type: {step['job_type']}") result = await handler(payload) step_entry["status"] = "done" step_entry["result"] = result context[f"step_{i}"] = result log.info(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) done") except Exception as exc: step_entry["status"] = "failed" step_entry["error"] = str(exc)[:500] log.warning(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) failed: {exc}") if not step.get("continue_on_failure"): overall_status = "failed" steps_log.append(step_entry) break steps_log.append(step_entry) await db_execute( "UPDATE mission_runs SET status=%s, steps_log=%s, completed_at=NOW() WHERE id=%s", (overall_status, json.dumps(steps_log, default=str), run_id) ) return { "run_id": run_id, "status": overall_status, "steps": len(steps_log), "steps_log": steps_log, } async def handle_run_mission(payload: dict) -> dict: mission_id = int(payload.get("mission_id") or 0) if not mission_id: return {"error": "Missing mission_id"} source = payload.get("trigger_source", "manual") return await _execute_mission(mission_id, source) JOB_HANDLERS["run_mission"] = handle_run_mission # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 8: MISSION DIRECTIVES — OKR / goal tracking with AI review # ═══════════════════════════════════════════════════════════════════════════════ async def handle_directive_review(payload: dict) -> dict: """AI-powered review of active directives: progress analysis + recommendations.""" directive_id = payload.get("directive_id") provider = payload.get("provider", "claude") # Fetch directives if directive_id: dirs = await db_fetchall( "SELECT * FROM directives WHERE id=%s", (directive_id,) ) else: dirs = await db_fetchall( "SELECT * FROM directives WHERE status='active' ORDER BY priority DESC LIMIT 10" ) if not dirs: return {"error": "No active directives found"} dir_ids = [d["id"] for d in dirs] placeholders = ",".join(["%s"] * len(dir_ids)) key_results = await db_fetchall( f"SELECT * FROM directive_key_results WHERE directive_id IN ({placeholders}) ORDER BY directive_id,id", dir_ids ) or [] links = await db_fetchall( f"""SELECT dl.directive_id, dl.link_type, dl.note, COALESCE(t.title, a.title) AS linked_title, COALESCE(t.status, 'scheduled') AS linked_status FROM directive_links dl LEFT JOIN tasks t ON dl.link_type='task' AND t.id=dl.link_id LEFT JOIN appointments a ON dl.link_type='appointment' AND a.id=dl.link_id WHERE dl.directive_id IN ({placeholders})""", dir_ids ) or [] # Build context block context_lines = [] for d in dirs: d_krs = [kr for kr in key_results if kr["directive_id"] == d["id"]] d_links = [lk for lk in links if lk["directive_id"] == d["id"]] kr_sum_cur = sum(float(kr["current_value"]) for kr in d_krs) kr_sum_tgt = sum(float(kr["target_value"]) for kr in d_krs) or 1 pct = round(kr_sum_cur / kr_sum_tgt * 100, 1) context_lines.append(f"\n## DIRECTIVE: {d['title']} ({d['category'].upper()}) — {pct}% complete") context_lines.append(f" Status: {d['status']} | Priority: {d['priority']}/10 | Target: {d.get('target_date') or 'no date'}") if d.get("description"): context_lines.append(f" Description: {d['description']}") for kr in d_krs: context_lines.append(f" KR: {kr['title']} — {kr['current_value']}/{kr['target_value']} {kr['unit']}") for lk in d_links: status = lk.get("linked_status") or "" context_lines.append(f" Linked {lk['link_type']}: {lk.get('linked_title') or lk.get('note') or '—'} [{status}]") context = "\n".join(context_lines) today = datetime.utcnow().strftime("%Y-%m-%d") prompt = f"""You are JARVIS, an AI assistant conducting a Mission Directives review for your principal. Today is {today}. {context} Provide a concise executive briefing covering: 1. Overall progress summary (2-3 sentences) 2. For each directive: current status, what's on track, what needs attention 3. Top 3 recommended next actions to move the highest-priority directives forward 4. Any directives at risk of missing their target date Keep the tone confident and action-oriented. Format with clear sections. Under 400 words.""" review_text = "" try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.anthropic.com/v1/messages", headers={"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}, json={"model": CLAUDE_MODEL, "max_tokens": 600, "messages": [{"role": "user", "content": prompt}]}, timeout=aiohttp.ClientTimeout(total=30), ) as resp: rdata = await resp.json() review_text = rdata.get("content", [{}])[0].get("text", "") except Exception as e: review_text = f"[Review generation failed: {e}]" # Store in conversations so HUD can surface it await db_execute( "INSERT INTO conversations (session_id, role, message, created_at) VALUES ('guardian','assistant',%s,NOW())", (review_text,) ) return { "review": review_text, "directives": len(dirs), "provider": provider, } JOB_HANDLERS["directive_review"] = handle_directive_review # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 10: MEMORY CORE — auto-extraction knowledge graph # ═══════════════════════════════════════════════════════════════════════════════ _MEMORY_EXTRACT_PROMPT = """Extract factual information about the user from this conversation exchange. Focus on: - Preferences (likes/dislikes, preferred ways of working) - People they mention (names, relationships, roles) - Places (home, work, locations) - Routines (schedules, habits) - Goals (things they want to achieve) - Instructions to JARVIS (always/never rules) - Key facts about their life/work/projects Return a JSON array. Each element: {{"category": "", "subject": "", "predicate": "", "object": "", "confidence": <0.0-1.0>}} Rules: - Only extract clearly stated facts, not speculation - subject/predicate should be concise (under 50 chars each) - confidence: 0.95 for explicit statements, 0.75-0.90 for clear implications - Skip ephemeral info (what they asked just now, current queries) - Max 8 facts per exchange - Return [] if nothing durable worth remembering Exchange: USER: {user_msg} JARVIS: {asst_msg} Return ONLY valid JSON array, nothing else.""" async def handle_memory_extract(payload: dict) -> dict: user_msg = str(payload.get("user_message", "")).strip() asst_msg = str(payload.get("assistant_message", "")).strip() conv_id = payload.get("conversation_id") if not user_msg and not asst_msg: return {"ok": False, "reason": "empty exchange"} prompt = _MEMORY_EXTRACT_PROMPT.format( user_msg=user_msg[:800], asst_msg=asst_msg[:800] ) raw = "" try: # Use Haiku for speed/cost; fall back to Groq headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = { "model": "claude-haiku-4-5-20251001", "max_tokens": 800, "messages": [{"role": "user", "content": prompt}] } async with aiohttp.ClientSession() as session: async with session.post("https://api.anthropic.com/v1/messages", json=body, headers=headers, timeout=aiohttp.ClientTimeout(total=20)) as resp: if resp.status == 200: data = await resp.json() raw = data["content"][0]["text"].strip() else: raise RuntimeError(f"Claude {resp.status}") except Exception as e: log.warning(f"[MEMORY] Claude extract failed ({e}), trying Groq") try: raw = await llm_call([{"role": "user", "content": prompt}], provider="groq") except Exception as e2: log.error(f"[MEMORY] All providers failed: {e2}") return {"ok": False, "reason": str(e2)} # Parse JSON — strip code fences if present if "```" in raw: raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] raw = raw.strip() try: facts = json.loads(raw) if not isinstance(facts, list): facts = [] except Exception: log.warning(f"[MEMORY] JSON parse failed. Raw: {raw[:200]}") return {"ok": False, "reason": "json_parse_failed"} valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"} inserted = 0 for f in facts: if not isinstance(f, dict): continue subj = str(f.get("subject", "")).strip()[:255] pred = str(f.get("predicate", "is")).strip()[:255] obj = str(f.get("object", "")).strip() cat = f.get("category", "fact") conf = min(1.0, max(0.0, float(f.get("confidence", 0.85)))) if not subj or not obj or cat not in valid_cats: continue try: await db_execute( """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source, conversation_id) VALUES (%s, %s, %s, %s, %s, 'conversation', %s) ON DUPLICATE KEY UPDATE object=VALUES(object), confidence=GREATEST(confidence, VALUES(confidence)), confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""", (cat, subj, pred, obj, conf, conv_id) ) inserted += 1 except Exception as e: log.warning(f"[MEMORY] Insert ({subj},{pred}) failed: {e}") log.info(f"[MEMORY] Stored {inserted}/{len(facts)} facts from conv {conv_id}") return {"ok": True, "extracted": inserted, "candidates": len(facts)} async def handle_memory_store(payload: dict) -> dict: """Explicit memory insertion — from 'remember that X' voice commands.""" subject = str(payload.get("subject", "user")).strip()[:255] predicate = str(payload.get("predicate", "note")).strip()[:255] obj = str(payload.get("object", "")).strip() category = payload.get("category", "fact") if not obj: return {"ok": False, "reason": "empty object"} valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"} if category not in valid_cats: category = "fact" fid = await db_execute( """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source) VALUES (%s, %s, %s, %s, 1.0, 'explicit') ON DUPLICATE KEY UPDATE object=VALUES(object), confidence=1.0, confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""", (category, subject, predicate, obj) ) log.info(f"[MEMORY] Explicit store: [{category}] {subject} {predicate}: {obj}") return {"ok": True, "id": fid} JOB_HANDLERS["memory_extract"] = handle_memory_extract JOB_HANDLERS["memory_store"] = handle_memory_store # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 9: CLEARANCE PROTOCOL — approval gating for high-risk operations # ═══════════════════════════════════════════════════════════════════════════════ def _clearance_describe(job_type: str, payload: dict) -> str: """Human-readable description of what the job would do.""" if job_type == "shell": cmd = payload.get("command", payload.get("cmd", "?")) agent = payload.get("agent_id", payload.get("agent", "unknown")) return f"Execute shell command on agent '{agent}': {str(cmd)[:120]}" if job_type == "send_email": to = payload.get("to_email") or payload.get("target", "?") subj = payload.get("subject", "") tid = payload.get("triage_id") if tid: return f"Send SMTP reply to triage item #{tid} (to: {to})" return f"Send email to {to}" + (f" — {subj}" if subj else "") if job_type == "remote_exec": cmd_type = payload.get("command_type", payload.get("command", "?")) agent = payload.get("agent_id", payload.get("agent", "?")) return f"Remote execute '{cmd_type}' on agent '{agent}'" if job_type == "purge": return "Purge all completed/failed jobs from the Arc Reactor queue" return f"Execute {job_type} job" async def _check_clearance(job_type: str, payload: dict, created_by: str) -> Optional[dict]: """ Returns a clearance-pending response dict if approval is required, or None if the job can proceed immediately. """ rule = await db_fetchone( "SELECT * FROM clearance_rules WHERE job_type=%s AND enabled=1 AND require_approval=1", (job_type,) ) if not rule: return None desc = _clearance_describe(job_type, payload) expires = None if rule.get("auto_approve_after_min") is not None: expires = datetime.utcnow() + timedelta(minutes=int(rule["auto_approve_after_min"])) cr_id = await db_execute( "INSERT INTO clearance_requests (job_type,job_payload,risk_level,description,requested_by,status,expires_at) " "VALUES (%s,%s,%s,%s,%s,'pending',%s)", (job_type, json.dumps(payload), rule["risk_level"], desc, created_by, expires) ) log.warning(f"[CLEARANCE] Request #{cr_id} — {rule['risk_level'].upper()} — {desc}") return { "status": "pending_clearance", "clearance_id": cr_id, "risk_level": rule["risk_level"], "description": desc, "message": f"◈ CLEARANCE REQUIRED — {rule['risk_level'].upper()} risk operation intercepted. Approval needed before execution.", } async def _dispatch_cleared_job(cr_id: int, job_type: str, payload: dict, priority: int = 7, decided_by: str = "admin") -> dict: """Approve a clearance request and dispatch the job into the queue.""" jid = await db_execute( "INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)", (job_type, json.dumps(payload), priority, f"clearance:{cr_id}") ) await db_execute( "UPDATE clearance_requests SET status='approved', decided_by=%s, arc_job_id=%s, decided_at=NOW() WHERE id=%s", (decided_by, jid, cr_id) ) log.info(f"[CLEARANCE] Request #{cr_id} approved by {decided_by} → Job #{jid}") return {"job_id": jid, "clearance_id": cr_id, "status": "approved"} # ── Clearance watchdog ───────────────────────────────────────────────────────── async def clearance_watchdog() -> None: """Expire timed-out pending requests; auto-approve those with auto_approve_after_min set.""" log.info("Clearance watchdog started") await asyncio.sleep(20) while True: try: now = datetime.utcnow() # Expire requests past their expiry with no auto-approve (expires_at IS NULL means never expires) expired = await db_fetchall( "SELECT cr.*, cru.auto_approve_after_min FROM clearance_requests cr " "JOIN clearance_rules cru ON cru.job_type=cr.job_type " "WHERE cr.status='pending' AND cr.expires_at IS NOT NULL AND cr.expires_at <= %s", (now,) ) or [] for req in expired: if req.get("auto_approve_after_min") is not None: # Auto-approve: dispatch the job payload = req.get("job_payload") or {} if isinstance(payload, str): try: payload = json.loads(payload) except Exception: payload = {} await _dispatch_cleared_job(req["id"], req["job_type"], payload, decided_by="auto_approve") log.info(f"[CLEARANCE] Auto-approved request #{req['id']} ({req['job_type']})") else: await db_execute( "UPDATE clearance_requests SET status='expired', decided_at=NOW() WHERE id=%s", (req["id"],) ) log.info(f"[CLEARANCE] Expired request #{req['id']} ({req['job_type']})") except Exception as exc: log.error(f"Clearance watchdog error: {exc}") await asyncio.sleep(60) # ── Mission trigger loop ─────────────────────────────────────────────────────── _mission_trigger_state: dict = {} # mission_id → last_triggered ISO async def mission_trigger_loop() -> None: """Check scheduled and event-based mission triggers every 30 seconds.""" log.info("Mission trigger loop started") await asyncio.sleep(15) # stagger startup while True: try: missions = await db_fetchall( "SELECT * FROM missions WHERE enabled=1 AND trigger_type != 'manual'" ) for m in missions: mid = m["id"] ttype = m["trigger_type"] cfg = m.get("trigger_config") or {} if isinstance(cfg, str): try: cfg = json.loads(cfg) except Exception: cfg = {} should_run = False source = ttype if ttype == "schedule": interval_min = int(cfg.get("interval_minutes", 60)) last_run = m.get("last_run_at") if last_run is None: should_run = True else: last_dt = last_run if isinstance(last_run, datetime) else datetime.fromisoformat(str(last_run)) elapsed = (datetime.utcnow() - last_dt).total_seconds() / 60 should_run = elapsed >= interval_min elif ttype == "guardian_event": severity = cfg.get("severity", "") etype = cfg.get("event_type", "") last_key = f"ge_{mid}" last_seen = _mission_trigger_state.get(last_key, "1970-01-01") wheres = ["acknowledged=0", "created_at > %s"] params: list = [last_seen] if severity: wheres.append("severity=%s"); params.append(severity) if etype: wheres.append("event_type=%s"); params.append(etype) row = await db_fetchone( f"SELECT id, created_at FROM guardian_events WHERE {' AND '.join(wheres)} ORDER BY created_at DESC LIMIT 1", params ) if row: should_run = True _mission_trigger_state[last_key] = str(row["created_at"]) source = f"guardian_event:{row['id']}" elif ttype == "email_keyword": keywords = cfg.get("keywords", []) category = cfg.get("category", "") last_key = f"ek_{mid}" last_seen = _mission_trigger_state.get(last_key, "1970-01-01") if keywords: kw_conds = " OR ".join(["subject LIKE %s OR summary LIKE %s"] * len(keywords)) kw_params = [] for kw in keywords: kw_params += [f"%{kw}%", f"%{kw}%"] where = f"created_at > %s AND ({kw_conds})" params = [last_seen] + kw_params if category: where += " AND category=%s" params.append(category) row = await db_fetchone( f"SELECT id, created_at FROM email_triage WHERE {where} ORDER BY created_at DESC LIMIT 1", params ) if row: should_run = True _mission_trigger_state[last_key] = str(row["created_at"]) source = f"email_triage:{row['id']}" if should_run: log.info(f"[MISSION TRIGGER] Mission {mid} ({m['name']}) triggered by {source}") asyncio.create_task(_execute_mission(mid, source)) except Exception as exc: log.error(f"Mission trigger loop error: {exc}") await asyncio.sleep(30) # ═══════════════════════════════════════════════════════════════════════════════ # JOB RUNNER + BACKGROUND TASKS # ═══════════════════════════════════════════════════════════════════════════════ _running_jobs: set = set() _stats = {"done": 0, "failed": 0} async def run_job(job: dict) -> None: jid = job["id"] jtype = job["job_type"] _running_jobs.add(jid) try: payload = job.get("payload") or {} if isinstance(payload, str): payload = json.loads(payload) await db_execute("UPDATE arc_jobs SET status='running', started_at=NOW() WHERE id=%s", (jid,)) log.info(f"[JOB {jid}] Starting {jtype}") handler = JOB_HANDLERS.get(jtype) if not handler: raise ValueError(f"Unknown job type: {jtype}") result = await handler(payload) result_str = json.dumps(result, default=str) await db_execute("UPDATE arc_jobs SET status='done', result=%s, completed_at=NOW() WHERE id=%s", (result_str, jid)) _stats["done"] += 1 log.info(f"[JOB {jid}] Done: {jtype}") except Exception as exc: err = str(exc) await db_execute("UPDATE arc_jobs SET status='failed', error=%s, completed_at=NOW() WHERE id=%s", (err[:2000], jid)) _stats["failed"] += 1 log.warning(f"[JOB {jid}] Failed: {err[:120]}") finally: _running_jobs.discard(jid) async def job_poller() -> None: log.info("Arc Reactor job poller started") while True: try: jobs = await db_fetchall("SELECT * FROM arc_jobs WHERE status='queued' ORDER BY priority DESC, id ASC LIMIT 5") for job in jobs: if job["id"] not in _running_jobs: asyncio.create_task(run_job(job)) except Exception as exc: log.error(f"Poller error: {exc}") await asyncio.sleep(POLL_INTERVAL) async def heartbeat_loop() -> None: while True: try: await db_execute( "UPDATE arc_status SET last_heartbeat=NOW(), active_jobs=%s, jobs_done=%s, jobs_failed=%s WHERE id=1", (len(_running_jobs), _stats["done"], _stats["failed"]) ) except Exception as exc: log.error(f"Heartbeat error: {exc}") await asyncio.sleep(HEARTBEAT_INTERVAL) # ═══════════════════════════════════════════════════════════════════════════════ # FASTAPI APP # ═══════════════════════════════════════════════════════════════════════════════ @asynccontextmanager async def lifespan(app: FastAPI): log.info(f"◈ JARVIS Arc Reactor v{VERSION} starting on {HOST}:{PORT}") await get_pool() await db_execute("UPDATE arc_status SET started_at=NOW(), last_heartbeat=NOW(), version=%s WHERE id=1", (VERSION,)) asyncio.create_task(job_poller()) asyncio.create_task(heartbeat_loop()) asyncio.create_task(guardian_loop()) asyncio.create_task(mission_trigger_loop()) asyncio.create_task(clearance_watchdog()) log.info(f"◈ Arc Reactor online — {len(JOB_HANDLERS)} handlers: {', '.join(JOB_HANDLERS)}") yield log.info("◈ Arc Reactor shutting down") if _pool and not _pool.closed: _pool.close() await _pool.wait_closed() app = FastAPI(title="JARVIS Arc Reactor", version=VERSION, lifespan=lifespan) # ── Mission Ops endpoints ────────────────────────────────────────────────────── @app.get("/missions") async def missions_list(enabled: Optional[int] = None): if enabled is not None: rows = await db_fetchall("SELECT * FROM missions WHERE enabled=%s ORDER BY id DESC", (enabled,)) else: rows = await db_fetchall("SELECT * FROM missions ORDER BY id DESC") return rows or [] @app.get("/missions/{mission_id}") async def mission_get(mission_id: int): m = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,)) if not m: raise HTTPException(status_code=404, detail="Not found") steps = await db_fetchall( "SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC", (mission_id,) ) runs = await db_fetchall( "SELECT id, status, trigger_source, started_at, completed_at FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT 10", (mission_id,) ) return {**m, "steps": steps or [], "recent_runs": runs or []} @app.post("/missions") async def mission_create(req: Request): body = await req.json() name = body.get("name", "").strip() if not name: raise HTTPException(status_code=400, detail="name required") desc = body.get("description", "") ttype = body.get("trigger_type", "manual") tcfg = json.dumps(body.get("trigger_config") or {}) enabled = int(body.get("enabled", 1)) mid = await db_execute( "INSERT INTO missions (name,description,trigger_type,trigger_config,enabled) VALUES (%s,%s,%s,%s,%s)", (name, desc, ttype, tcfg, enabled) ) steps = body.get("steps") or [] for i, s in enumerate(steps): await db_execute( "INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)", (mid, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0))) ) return {"id": mid, "ok": True} @app.put("/missions/{mission_id}") async def mission_update(mission_id: int, req: Request): body = await req.json() fields: list = [] params: list = [] for col in ("name", "description", "trigger_type", "enabled"): if col in body: fields.append(f"{col}=%s") params.append(body[col]) if "trigger_config" in body: fields.append("trigger_config=%s") params.append(json.dumps(body["trigger_config"])) if fields: params.append(mission_id) await db_execute(f"UPDATE missions SET {','.join(fields)} WHERE id=%s", params) if "steps" in body: await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,)) for i, s in enumerate(body["steps"]): await db_execute( "INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)", (mission_id, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0))) ) return {"ok": True} @app.delete("/missions/{mission_id}") async def mission_delete(mission_id: int): await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,)) await db_execute("DELETE FROM mission_runs WHERE mission_id=%s", (mission_id,)) await db_execute("DELETE FROM missions WHERE id=%s", (mission_id,)) return {"ok": True} @app.post("/missions/{mission_id}/run") async def mission_run_endpoint(mission_id: int, req: Request): try: body = await req.json() except Exception: body = {} source = body.get("trigger_source", "manual") if isinstance(body, dict) else "manual" return await _execute_mission(mission_id, source) @app.get("/missions/{mission_id}/runs") async def mission_runs_list(mission_id: int, limit: int = 20): rows = await db_fetchall( "SELECT * FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT %s", (mission_id, limit) ) return rows or [] # ── Clearance FastAPI endpoints ──────────────────────────────────────────────── @app.get("/clearance/pending") async def clearance_pending(): rows = await db_fetchall( "SELECT * FROM clearance_requests WHERE status='pending' ORDER BY created_at DESC" ) return rows or [] @app.get("/clearance/history") async def clearance_history(limit: int = 50): rows = await db_fetchall( "SELECT * FROM clearance_requests ORDER BY created_at DESC LIMIT %s", (limit,) ) return rows or [] @app.post("/clearance/{cr_id}/approve") async def clearance_approve(cr_id: int, req: Request): try: body = await req.json() except Exception: body = {} decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin" cr = await db_fetchone("SELECT * FROM clearance_requests WHERE id=%s AND status='pending'", (cr_id,)) if not cr: raise HTTPException(status_code=404, detail="Clearance request not found or not pending") payload = cr.get("job_payload") or {} if isinstance(payload, str): try: payload = json.loads(payload) except Exception: payload = {} return await _dispatch_cleared_job(cr_id, cr["job_type"], payload, decided_by=decided_by) @app.post("/clearance/{cr_id}/deny") async def clearance_deny(cr_id: int, req: Request): try: body = await req.json() except Exception: body = {} decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin" note = body.get("note", "") if isinstance(body, dict) else "" await db_execute( "UPDATE clearance_requests SET status='denied', decided_by=%s, decision_note=%s, decided_at=NOW() WHERE id=%s", (decided_by, note, cr_id) ) log.info(f"[CLEARANCE] Request #{cr_id} denied by {decided_by}") return {"ok": True, "clearance_id": cr_id, "status": "denied"} @app.get("/clearance/rules") async def clearance_rules_list(): rows = await db_fetchall("SELECT * FROM clearance_rules ORDER BY risk_level DESC, job_type ASC") return rows or [] @app.put("/clearance/rules/{rule_id}") async def clearance_rule_update(rule_id: int, req: Request): body = await req.json() fields: list = [] params: list = [] for col in ("risk_level", "require_approval", "auto_approve_after_min", "enabled", "description"): if col in body: fields.append(f"{col}=%s") params.append(body[col]) if not fields: raise HTTPException(status_code=400, detail="Nothing to update") params.append(rule_id) await db_execute(f"UPDATE clearance_rules SET {','.join(fields)} WHERE id=%s", params) return {"ok": True} @app.post("/clearance/rules") async def clearance_rule_create(req: Request): body = await req.json() job_type = body.get("job_type", "").strip() if not job_type: raise HTTPException(status_code=400, detail="job_type required") rid = await db_execute( "INSERT INTO clearance_rules (job_type,risk_level,require_approval,auto_approve_after_min,description,enabled) " "VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE " "risk_level=%s,require_approval=%s,auto_approve_after_min=%s,description=%s,enabled=%s", (job_type, body.get("risk_level","high"), int(body.get("require_approval",1)), body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)), body.get("risk_level","high"), int(body.get("require_approval",1)), body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1))) ) return {"ok": True, "id": rid} # ── Memory Core FastAPI endpoints ────────────────────────────────────────────── @app.get("/memory/facts") async def memory_facts_list(limit: int = 100, category: str = "", search: str = ""): where = "WHERE active=1" params: list = [] if category: where += " AND category=%s" params.append(category) if search: where += " AND (subject LIKE %s OR predicate LIKE %s OR object LIKE %s)" params += [f"%{search}%"] * 3 rows = await db_fetchall( f"SELECT * FROM memory_facts {where} ORDER BY confirmed_count DESC, last_confirmed_at DESC LIMIT %s", params + [limit] ) return rows or [] @app.post("/memory/facts") async def memory_fact_create(req: Request): body = await req.json() subj = str(body.get("subject", "")).strip()[:255] pred = str(body.get("predicate", "is")).strip()[:255] obj = str(body.get("object", "")).strip() cat = body.get("category", "fact") if not subj or not obj: raise HTTPException(status_code=400, detail="subject and object required") valid = {"preference","person","place","routine","goal","fact","instruction"} if cat not in valid: cat = "fact" fid = await db_execute( """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source) VALUES (%s,%s,%s,%s,1.0,'explicit') ON DUPLICATE KEY UPDATE object=VALUES(object), confidence=1.0, confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""", (cat, subj, pred, obj) ) return {"ok": True, "id": fid} @app.delete("/memory/facts/{fact_id}") async def memory_fact_delete(fact_id: int): await db_execute("UPDATE memory_facts SET active=0 WHERE id=%s", (fact_id,)) return {"ok": True} @app.delete("/memory/facts") async def memory_facts_clear(category: str = ""): if category: await db_execute("UPDATE memory_facts SET active=0 WHERE category=%s", (category,)) else: await db_execute("UPDATE memory_facts SET active=0 WHERE active=1", ()) return {"ok": True} @app.get("/memory/context") async def memory_context(message: str = "", limit: int = 15): """Return relevant memory facts for a given message (for prompt injection).""" params: list = [] if message.strip(): words = [w for w in message.lower().split() if len(w) > 3][:10] if words: like_clauses = " OR ".join(["subject LIKE %s OR object LIKE %s"] * len(words)) for w in words: params += [f"%{w}%", f"%{w}%"] rows = await db_fetchall( f"SELECT * FROM memory_facts WHERE active=1 AND ({like_clauses}) " f"ORDER BY confirmed_count DESC, confidence DESC LIMIT %s", params + [limit] ) or [] # Pad with top general facts if sparse if len(rows) < 5: existing = [r["id"] for r in rows] excl = ("AND id NOT IN (" + ",".join(["%s"]*len(existing)) + ")") if existing else "" extra = await db_fetchall( f"SELECT * FROM memory_facts WHERE active=1 {excl} " f"ORDER BY confirmed_count DESC LIMIT %s", existing + [max(1, limit - len(rows))] ) or [] rows = rows + extra else: rows = await db_fetchall( "SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,) ) or [] else: rows = await db_fetchall( "SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,) ) or [] lines = [f"[{r['category']}] {r['subject']} {r['predicate']}: {r['object']}" for r in rows] return {"facts": rows, "context": "\n".join(lines) if lines else ""} @app.get("/memory/stats") async def memory_stats(): total = await db_fetchone("SELECT COUNT(*) cnt FROM memory_facts WHERE active=1") by_cat = await db_fetchall( "SELECT category, COUNT(*) cnt FROM memory_facts WHERE active=1 GROUP BY category ORDER BY cnt DESC" ) recent = await db_fetchall( "SELECT * FROM memory_facts WHERE active=1 ORDER BY last_confirmed_at DESC LIMIT 5" ) return { "total": total["cnt"] if total else 0, "by_category": by_cat or [], "recent": recent or [], } @app.get("/status") async def status(): row = await db_fetchone("SELECT * FROM arc_status WHERE id=1") queued = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='queued'") running = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='running'") return { "online": True, "version": VERSION, "last_heartbeat": row["last_heartbeat"].isoformat() if row and row["last_heartbeat"] else None, "started_at": row["started_at"].isoformat() if row and row["started_at"] else None, "jobs_done": row["jobs_done"] if row else 0, "jobs_failed": row["jobs_failed"] if row else 0, "active_jobs": len(_running_jobs), "queued_jobs": queued["cnt"] if queued else 0, "running_jobs": running["cnt"] if running else 0, "handlers": list(JOB_HANDLERS.keys()), } @app.post("/job") async def create_job(request: Request): body = await request.json() jtype = body.get("type", "") payload = body.get("payload", {}) priority = int(body.get("priority", 5)) created_by = body.get("created_by", "jarvis") if not jtype: raise HTTPException(status_code=400, detail="Missing job type") clearance = await _check_clearance(jtype, payload, created_by) if clearance: return clearance jid = await db_execute( "INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)", (jtype, json.dumps(payload), priority, created_by) ) return {"job_id": jid, "status": "queued"} @app.get("/job/{job_id}") async def get_job(job_id: int): job = await db_fetchone("SELECT * FROM arc_jobs WHERE id=%s", (job_id,)) if not job: raise HTTPException(status_code=404, detail="Job not found") result = job.get("result") if result and isinstance(result, str): try: result = json.loads(result) except Exception: pass return { "id": job["id"], "job_type": job["job_type"], "status": job["status"], "result": result, "error": job.get("error"), "created_at": job["created_at"].isoformat() if job["created_at"] else None, "started_at": job["started_at"].isoformat() if job["started_at"] else None, "completed_at": job["completed_at"].isoformat() if job["completed_at"] else None, } @app.delete("/job/{job_id}") async def cancel_job(job_id: int): await db_execute("UPDATE arc_jobs SET status='cancelled' WHERE id=%s AND status='queued'", (job_id,)) return {"ok": True} @app.get("/jobs") async def list_jobs(limit: int = 50, status: str = ""): if status: rows = await db_fetchall( "SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs WHERE status=%s ORDER BY id DESC LIMIT %s", (status, limit) ) else: rows = await db_fetchall( "SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s", (limit,) ) for r in rows: if r.get("created_at"): r["created_at"] = r["created_at"].isoformat() if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat() return rows @app.get("/jobs/recent") async def recent_jobs(limit: int = 10, job_type: str = ""): if job_type: rows = await db_fetchall( "SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs WHERE job_type=%s ORDER BY id DESC LIMIT %s", (job_type, limit) ) else: rows = await db_fetchall( "SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s", (limit,) ) for r in rows: if r.get("created_at"): r["created_at"] = r["created_at"].isoformat() if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat() if r.get("result") and isinstance(r["result"], str): try: r["result"] = json.loads(r["result"]) except Exception: pass return rows @app.get("/triage") async def get_triage(limit: int = 30, account: str = "gmail"): rows = await db_fetchall( """SELECT id, from_name, from_email, subject, date_received, category, priority, summary, draft_reply, action_taken, created_at FROM email_triage WHERE account=%s ORDER BY priority DESC, created_at DESC LIMIT %s""", (account, limit) ) for r in rows: if r.get("date_received"): r["date_received"] = str(r["date_received"]) if r.get("created_at"): r["created_at"] = str(r["created_at"]) return rows @app.post("/triage/{triage_id}/action") async def triage_action(triage_id: int, request: Request): body = await request.json() action = body.get("action", "dismissed") await db_execute("UPDATE email_triage SET action_taken=%s WHERE id=%s", (action, triage_id)) return {"ok": True} @app.delete("/jobs/purge") async def purge_jobs(): await db_execute("DELETE FROM arc_jobs WHERE status IN ('done','failed','cancelled') AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)") return {"ok": True} # ── VISION PROTOCOL endpoints ───────────────────────────────────────────────── @app.get("/screenshots") async def list_screenshots(limit: int = 20, agent: str = ""): if agent: rows = await db_fetchall( "SELECT id, agent_id, hostname, method, width, height, file_size, " " vision_analysis, vision_provider, created_at " "FROM agent_screenshots WHERE hostname LIKE %s " "ORDER BY created_at DESC LIMIT %s", (f"%{agent}%", limit) ) else: rows = await db_fetchall( "SELECT id, agent_id, hostname, method, width, height, file_size, " " vision_analysis, vision_provider, created_at " "FROM agent_screenshots ORDER BY created_at DESC LIMIT %s", (limit,) ) return rows or [] @app.get("/screenshots/{screenshot_id}") async def get_screenshot(screenshot_id: int): row = await db_fetchone( "SELECT * FROM agent_screenshots WHERE id=%s", (screenshot_id,) ) if not row: from fastapi import HTTPException raise HTTPException(status_code=404, detail="Screenshot not found") return row @app.delete("/screenshots/{screenshot_id}") async def delete_screenshot(screenshot_id: int): await db_execute("DELETE FROM agent_screenshots WHERE id=%s", (screenshot_id,)) return {"ok": True} @app.delete("/screenshots/purge") async def purge_screenshots(): await db_execute("DELETE FROM agent_screenshots WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY)") return {"ok": True} # ── GUARDIAN MODE endpoints ─────────────────────────────────────────────────── @app.get("/guardian/status") async def guardian_status(): cfg = await _guardian_get_config() counts = await db_fetchone( """SELECT SUM(acknowledged=0) AS unread, SUM(severity='critical' AND acknowledged=0) AS critical_unread, SUM(severity='warning' AND acknowledged=0) AS warning_unread, SUM(created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)) AS events_24h FROM guardian_events""" ) return { "enabled": cfg.get("enabled", "1") == "1", "scan_interval": int(cfg.get("scan_interval", 120)), "last_scan": _guardian_state.get("last_scan"), "last_sitrep": _guardian_state.get("last_sitrep"), "thresholds": { "cpu": float(cfg.get("cpu_threshold", 85)), "memory": float(cfg.get("mem_threshold", 88)), "disk": float(cfg.get("disk_threshold", 88)), "offline_minutes": int(cfg.get("offline_minutes", 3)), }, "counts": dict(counts) if counts else {}, } @app.get("/guardian/events") async def guardian_events_list(limit: int = 30, unread: bool = False, severity: str = "", since: str = ""): wheres = [] params: list = [] if unread: wheres.append("acknowledged = 0") if severity: wheres.append("severity = %s"); params.append(severity) if since: wheres.append("created_at > %s"); params.append(since) where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else "" params.append(limit) rows = await db_fetchall( f"SELECT * FROM guardian_events {where_sql} ORDER BY created_at DESC LIMIT %s", params ) return rows or [] @app.post("/guardian/events/{event_id}/ack") async def guardian_ack(event_id: int): await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE id=%s", (event_id,)) return {"ok": True} @app.post("/guardian/events/ack_all") async def guardian_ack_all(): await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE acknowledged=0") return {"ok": True} @app.get("/guardian/chat") async def guardian_chat_events(since: str = ""): """Return proactive guardian messages injected into conversations.""" if since: rows = await db_fetchall( "SELECT id, message, created_at FROM conversations " "WHERE session_id='guardian' AND created_at > %s ORDER BY created_at ASC", (since,) ) else: rows = await db_fetchall( "SELECT id, message, created_at FROM conversations " "WHERE session_id='guardian' ORDER BY created_at DESC LIMIT 10" ) return rows or [] # ── COMMS v2 endpoints ─────────────────────────────────────────────────────── @app.get("/comms/sent") async def comms_sent(limit: int = 50, status: str = ""): if status: rows = await db_fetchall( "SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id " "FROM email_sent WHERE status=%s ORDER BY sent_at DESC LIMIT %s", (status, limit) ) else: rows = await db_fetchall( "SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id " "FROM email_sent ORDER BY sent_at DESC LIMIT %s", (limit,) ) return rows or [] @app.get("/comms/sent/{sent_id}") async def comms_sent_get(sent_id: int): row = await db_fetchone("SELECT * FROM email_sent WHERE id=%s", (sent_id,)) if not row: raise HTTPException(status_code=404, detail="Not found") return row @app.delete("/comms/sent/{sent_id}") async def comms_sent_delete(sent_id: int): await db_execute("DELETE FROM email_sent WHERE id=%s", (sent_id,)) return {"ok": True} if __name__ == "__main__": uvicorn.run("reactor:app", host=HOST, port=PORT, log_level="info", access_log=False)