diff --git a/deploy/reactor.py b/deploy/reactor.py new file mode 100644 index 0000000..813847b --- /dev/null +++ b/deploy/reactor.py @@ -0,0 +1,2773 @@ +#!/usr/bin/env python3 +""" +JARVIS Arc Reactor — Core Daemon v9.0 +Phase 1: Job queue, ping/echo/shell +Phase 2: Intel Protocol (research), Iron Protocol (tool loop), LLM router +Phase 3: Gmail Triage (Comms Protocol), Remote Exec (Field Protocol) +Phase 4: Vision Protocol (screenshot, vision, sysinfo) +Phase 5: Guardian Mode (continuous awareness, proactive alerts, SITREP) +Phase 6: Comms v2 — send email, compose, schedule event, meeting prep +Phase 7: Mission Ops — multi-step automated workflows with trigger engine +Phase 8: Mission Directives — OKR/goal tracking with AI progress review +Phase 9: Clearance Protocol — approval gating for high-risk operations +""" + +import asyncio +import email as _email_lib +import imaplib +import json +import logging +import os +import re +import sys +import traceback +from contextlib import asynccontextmanager +from datetime import datetime, timedelta +from email.header import decode_header as _decode_header +from email.utils import parsedate_to_datetime +from typing import Any, Optional + +import aiomysql +import aiohttp +import uvicorn +from fastapi import FastAPI, HTTPException, Request + +# ── CONFIG ──────────────────────────────────────────────────────────────────── +HOST = "127.0.0.1" +PORT = 7474 +VERSION = "9.0.0" +DB_HOST = "localhost" +DB_PORT = 3306 +DB_USER = "jarvis_user" +DB_PASS = "J4rv1s_Pr0t0c0l_2026!" +DB_NAME = "jarvis_db" +LOG_FILE = "/home/jarvis.orbishosting.com/logs/arc_reactor.log" +POLL_INTERVAL = 3 +HEARTBEAT_INTERVAL = 30 + +CLAUDE_API_KEY = "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" +CLAUDE_MODEL = "claude-sonnet-4-6" +GROQ_API_KEY = "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" +GROQ_MODEL = "llama-3.3-70b-versatile" +OLLAMA_HOST = "http://10.48.200.95:11434" +OLLAMA_MODEL = "llama3.2:1b" + +GMAIL_USER = "myronblair@gmail.com" +GMAIL_PASS = "demsvdylwweacbcx" +ICLOUD_USER = "myronblair@icloud.com" +ICLOUD_PASS = "yxfi-yvzu-geqk-japr" + +# ── LOGGING ─────────────────────────────────────────────────────────────────── +os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler(sys.stdout), + ], +) +log = logging.getLogger("arc_reactor") + +# ── DB POOL ─────────────────────────────────────────────────────────────────── +_pool: Optional[aiomysql.Pool] = None + +async def get_pool() -> aiomysql.Pool: + global _pool + if _pool is None or _pool.closed: + _pool = await aiomysql.create_pool( + host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, + db=DB_NAME, autocommit=True, minsize=2, maxsize=10, charset="utf8mb4", + ) + return _pool + +async def db_execute(sql: str, args: tuple = ()) -> int: + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(sql, args) + return cur.lastrowid + +async def db_fetchone(sql: str, args: tuple = ()) -> Optional[dict]: + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchone() + +async def db_fetchall(sql: str, args: tuple = ()) -> list: + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchall() + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 1 HANDLERS +# ═══════════════════════════════════════════════════════════════════════════════ + +async def handle_ping(payload: dict) -> dict: + return {"pong": True, "timestamp": datetime.utcnow().isoformat(), "version": VERSION} + +async def handle_echo(payload: dict) -> dict: + return {"echo": payload.get("message", ""), "timestamp": datetime.utcnow().isoformat()} + +async def handle_shell(payload: dict) -> dict: + cmd = payload.get("command", "").strip() + allowed = ("df ", "free ", "uptime", "hostname", "uname", "ps ", "cat /proc/") + if not any(cmd.startswith(p) for p in allowed): + raise ValueError(f"Command not in whitelist: {cmd[:40]}") + proc = await asyncio.create_subprocess_shell( + cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) + return {"stdout": stdout.decode().strip(), "stderr": stderr.decode().strip(), "exit_code": proc.returncode} + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 2: LLM ROUTER +# ═══════════════════════════════════════════════════════════════════════════════ + +async def llm_call(messages: list, provider: str = "claude", system: str = "") -> str: + if provider == "claude" and CLAUDE_API_KEY: + return await _claude_call(messages, system) + elif provider == "groq" and GROQ_API_KEY: + return await _groq_call(messages, system) + elif provider == "ollama": + return await _ollama_call(messages, system) + for p in ["groq", "ollama"]: + try: + return await llm_call(messages, p, system) + except Exception: + continue + raise RuntimeError("All LLM providers failed") + +async def _claude_call(messages: list, system: str = "") -> str: + payload = {"model": CLAUDE_MODEL, "max_tokens": 4096, "messages": messages} + if system: + payload["system"] = system + headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"} + async with aiohttp.ClientSession() as session: + async with session.post("https://api.anthropic.com/v1/messages", json=payload, headers=headers, + timeout=aiohttp.ClientTimeout(total=60)) as resp: + data = await resp.json() + if resp.status != 200: + raise RuntimeError(f"Claude API error {resp.status}: {data.get('error',{}).get('message','')}") + return data["content"][0]["text"] + +async def _groq_call(messages: list, system: str = "") -> str: + all_msgs = ([{"role": "system", "content": system}] if system else []) + messages + payload = {"model": GROQ_MODEL, "messages": all_msgs, "max_tokens": 4096} + headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"} + async with aiohttp.ClientSession() as session: + async with session.post("https://api.groq.com/openai/v1/chat/completions", json=payload, + headers=headers, timeout=aiohttp.ClientTimeout(total=45)) as resp: + data = await resp.json() + if resp.status != 200: + raise RuntimeError(f"Groq error {resp.status}") + return data["choices"][0]["message"]["content"] + +async def _ollama_call(messages: list, system: str = "") -> str: + prompt = (system + "\n\n" if system else "") + "\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages) + async with aiohttp.ClientSession() as session: + async with session.post(f"{OLLAMA_HOST}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt, "stream": False}, + timeout=aiohttp.ClientTimeout(total=30)) as resp: + data = await resp.json() + return data.get("response", "") + +async def handle_llm(payload: dict) -> dict: + message = payload.get("message", "") + system = payload.get("system", "You are JARVIS, an Iron Man-style AI assistant.") + provider = payload.get("provider", "claude") + if not message: + raise ValueError("Missing message") + result = await llm_call([{"role": "user", "content": message}], provider, system) + return {"response": result, "provider": provider} + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 2: INTEL PROTOCOL — RESEARCH ENGINE +# ═══════════════════════════════════════════════════════════════════════════════ + +async def _web_search(query: str, max_results: int = 6) -> list: + try: + from duckduckgo_search import DDGS + results = [] + with DDGS() as ddgs: + for r in ddgs.text(query, max_results=max_results): + results.append({"url": r.get("href",""), "title": r.get("title",""), "snippet": r.get("body","")}) + return results + except Exception as e: + log.warning(f"DDG search failed: {e}") + return [] + +async def _fetch_url_content(url: str, timeout: int = 12) -> str: + try: + import trafilatura + headers = {"User-Agent": "Mozilla/5.0 (compatible; JARVIS-Research/3.0)"} + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout), + allow_redirects=True, ssl=False) as resp: + if resp.status != 200: + return "" + html = await resp.text(errors="replace") + text = trafilatura.extract(html, include_links=False, include_images=False, no_fallback=False, favor_precision=True) + return (text or "")[:4000] + except Exception as e: + log.debug(f"Fetch failed {url[:60]}: {e}") + return "" + +async def handle_research(payload: dict) -> dict: + query = payload.get("query", "").strip() + depth = payload.get("depth", "standard") + provider = payload.get("provider", "claude") + if not query: + raise ValueError("Missing research query") + + max_sources = {"quick": 3, "standard": 5, "deep": 8}.get(depth, 5) + log.info(f"[INTEL] Research: '{query}' depth={depth} sources={max_sources}") + + search_results = await _web_search(query, max_results=max_sources + 2) + if not search_results: + search_results = [{"url": "", "title": query, "snippet": f"No search results for: {query}"}] + + fetch_tasks = [_fetch_url_content(r["url"]) for r in search_results if r.get("url")] + page_texts = await asyncio.gather(*fetch_tasks, return_exceptions=True) + + sources = [] + for i, r in enumerate(search_results[:max_sources]): + text = page_texts[i] if i < len(page_texts) and isinstance(page_texts[i], str) else "" + sources.append({"url": r["url"], "title": r["title"], "snippet": r["snippet"], "content": text}) + + context_parts = [] + for i, s in enumerate(sources, 1): + body = s["content"] or s["snippet"] + if body: + context_parts.append(f"SOURCE {i}: {s['title']}\nURL: {s['url']}\n{body[:3000]}\n") + + context_text = "\n---\n".join(context_parts) if context_parts else "No page content available." + synthesis_prompt = ( + f'You are JARVIS, an advanced AI research assistant. The user asked: "{query}"\n\n' + f"You have retrieved the following source material:\n\n{context_text}\n\n" + f"Provide a comprehensive research briefing with:\n" + f"1. **EXECUTIVE SUMMARY** (2-3 sentences)\n" + f"2. **KEY FINDINGS** (5-8 bullet points)\n" + f"3. **DETAILS** (thorough synthesis, 2-4 paragraphs)\n" + f"4. **CONFIDENCE** (High/Medium/Low based on source quality)\n\n" + f"Be precise, factual, and cite source numbers where relevant." + ) + + try: + synthesis = await llm_call([{"role": "user", "content": synthesis_prompt}], provider=provider) + except Exception as e: + log.warning(f"[INTEL] Synthesis failed, falling back: {e}") + synthesis = f"**RESEARCH BRIEFING: {query}**\n\n" + "\n\n".join(f"**{s['title']}**\n{s['snippet']}" for s in sources) + + key_points = [] + for line in synthesis.split("\n"): + line = line.strip() + if line.startswith(("- ", "• ", "* ")) and len(line) > 10: + key_points.append(re.sub(r'^[-•*]\s*', '', line)) + elif re.match(r'^\d+\.\s+', line) and len(line) > 10: + key_points.append(re.sub(r'^\d+\.\s+', '', line)) + + clean_sources = [{"url": s["url"], "title": s["title"] or s["url"]} for s in sources if s.get("url")] + log.info(f"[INTEL] Research complete: '{query}' — {len(sources)} sources") + return {"query": query, "depth": depth, "synthesis": synthesis, "key_points": key_points[:10], + "sources": clean_sources, "source_count": len(clean_sources), "provider": provider} + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 2: IRON PROTOCOL — MULTI-STEP TOOL LOOP +# ═══════════════════════════════════════════════════════════════════════════════ + +AGENT_TOOLS = [ + {"name": "web_search", "description": "Search the web for current information.", + "input_schema": {"type": "object", "properties": {"query": {"type": "string"}, "max_results": {"type": "integer", "default": 5}}, "required": ["query"]}}, + {"name": "fetch_url", "description": "Fetch a web page and extract its main text content.", + "input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}}, + {"name": "jarvis_agents", "description": "Get status of all JARVIS agents.", + "input_schema": {"type": "object", "properties": {}}}, + {"name": "jarvis_alerts", "description": "Get current active JARVIS alerts.", + "input_schema": {"type": "object", "properties": {}}}, + {"name": "current_time", "description": "Get current date and time.", + "input_schema": {"type": "object", "properties": {}}}, +] + +async def execute_agent_tool(tool_name: str, tool_input: dict) -> str: + try: + if tool_name == "web_search": + results = await _web_search(tool_input.get("query", ""), tool_input.get("max_results", 5)) + if not results: + return "No search results found." + return "\n".join(f"{i+1}. {r['title']}\n URL: {r['url']}\n {r['snippet']}" for i, r in enumerate(results)) + elif tool_name == "fetch_url": + url = tool_input.get("url", "") + if not url: + return "No URL provided." + return await _fetch_url_content(url) or "Could not extract content." + elif tool_name == "jarvis_agents": + agents = await db_fetchall("SELECT hostname, agent_type, ip_address, status, last_seen FROM registered_agents ORDER BY status='online' DESC, hostname") + if not agents: + return "No agents registered." + return "\n".join(f"- {a['hostname']} ({a['agent_type']}) @ {a['ip_address']} — {a['status'].upper()}" for a in agents) + elif tool_name == "jarvis_alerts": + alerts = await db_fetchall("SELECT title, severity, message FROM alerts WHERE resolved=0 ORDER BY created_at DESC LIMIT 10") + return "\n".join(f"- [{a['severity'].upper()}] {a['title']}: {a['message']}" for a in alerts) or "No active alerts." + elif tool_name == "current_time": + return datetime.utcnow().strftime("UTC: %Y-%m-%d %H:%M:%S") + else: + return f"Unknown tool: {tool_name}" + except Exception as e: + return f"Tool error ({tool_name}): {str(e)[:200]}" + +async def handle_tool_loop(payload: dict) -> dict: + task = payload.get("task", "").strip() + max_iterations = min(int(payload.get("max_iterations", 15)), 200) + system_prompt = payload.get("system", ( + "You are JARVIS, an advanced autonomous AI assistant with access to tools. " + "Use tools methodically to complete the task. Be thorough but concise. " + "When you have enough information, provide a clear final answer." + )) + if not task: + raise ValueError("Missing task") + + log.info(f"[IRON] Tool loop: '{task[:80]}' max_iter={max_iterations}") + + import anthropic + client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) + messages = [{"role": "user", "content": task}] + iteration = 0 + final_text = "" + + while iteration < max_iterations: + iteration += 1 + response = await client.messages.create( + model=CLAUDE_MODEL, max_tokens=4096, system=system_prompt, + tools=AGENT_TOOLS, messages=messages, + ) + assistant_content = [] + text_parts = [] + tool_uses = [] + for block in response.content: + if block.type == "text": + assistant_content.append({"type": "text", "text": block.text}) + text_parts.append(block.text) + elif block.type == "tool_use": + assistant_content.append({"type": "tool_use", "id": block.id, "name": block.name, "input": block.input}) + tool_uses.append(block) + messages.append({"role": "assistant", "content": assistant_content}) + if text_parts: + final_text = "\n".join(text_parts) + if response.stop_reason == "end_turn" or not tool_uses: + log.info(f"[IRON] Complete after {iteration} iterations") + break + tool_results = [] + for tu in tool_uses: + log.info(f"[IRON] Tool: {tu.name}") + result = await execute_agent_tool(tu.name, tu.input) + tool_results.append({"type": "tool_result", "tool_use_id": tu.id, "content": result[:3000]}) + messages.append({"role": "user", "content": tool_results}) + + tools_used = list({b["name"] for m in messages if isinstance(m["content"], list) + for b in m["content"] if isinstance(b, dict) and b.get("type") == "tool_use"}) + return {"task": task, "result": final_text, "iterations": iteration, "tools_used": tools_used} + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 3: COMMS PROTOCOL — GMAIL TRIAGE +# ═══════════════════════════════════════════════════════════════════════════════ + +def _imap_fetch_sync(account: str, max_msgs: int) -> list: + """Synchronous IMAP fetch — called via run_in_executor.""" + if account == "icloud": + host, port, user, passwd = "imap.mail.me.com", 993, ICLOUD_USER, ICLOUD_PASS + else: + host, port, user, passwd = "imap.gmail.com", 993, GMAIL_USER, GMAIL_PASS + if not user or not passwd: + return [] + try: + mbox = imaplib.IMAP4_SSL(host, port) + mbox.login(user, passwd) + mbox.select("INBOX") + _, data = mbox.search(None, "ALL") + ids = data[0].split()[-max_msgs:] + msgs = [] + for mid in reversed(ids): + _, raw = mbox.fetch(mid, "(RFC822)") + if not raw or not raw[0]: + continue + msg = _email_lib.message_from_bytes(raw[0][1]) + # Subject + subj_raw = msg.get("Subject", "") + subject = "" + for part, enc in _decode_header(subj_raw): + if isinstance(part, bytes): + subject += part.decode(enc or "utf-8", errors="replace") + else: + subject += str(part) + # From + from_raw = msg.get("From", "") + from_name = "" + for part, enc in _decode_header(from_raw): + if isinstance(part, bytes): + from_name += part.decode(enc or "utf-8", errors="replace") + else: + from_name += str(part) + from_name = from_name.strip().strip('"') + em = re.search(r"<([^>]+)>", from_raw) + from_email = em.group(1) if em else from_raw + # Date + date_str = msg.get("Date", "") + # Body preview + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + payload = part.get_payload(decode=True) + if payload: + body = payload.decode(part.get_content_charset() or "utf-8", errors="replace") + break + else: + payload = msg.get_payload(decode=True) + if payload: + body = payload.decode(msg.get_content_charset() or "utf-8", errors="replace") + body = " ".join(body.split())[:500] + msg_uid = (msg.get("Message-ID", "") or f"{from_email}:{subject}:{date_str}").strip("<>") + msgs.append({ + "msg_id": msg_uid[:255], + "from_name": from_name[:100], + "from_email": from_email[:100], + "subject": subject[:255], + "date_raw": date_str, + "body": body, + }) + mbox.logout() + return msgs + except Exception as e: + log.warning(f"IMAP fetch failed ({account}): {e}") + return [] + +async def handle_gmail_triage(payload: dict) -> dict: + account = payload.get("account", "gmail") + max_emails = min(int(payload.get("max_emails", 20)), 40) + provider = payload.get("provider", "claude") + + log.info(f"[COMMS] Gmail triage: account={account} max={max_emails}") + + loop = asyncio.get_event_loop() + emails = await loop.run_in_executor(None, lambda: _imap_fetch_sync(account, max_emails)) + + if not emails: + return {"account": account, "triaged": 0, "urgent": 0, "action": 0, + "meeting": 0, "items": [], "error": "No emails fetched or IMAP unavailable"} + + email_list = "" + for i, e in enumerate(emails, 1): + email_list += ( + f"EMAIL {i}:\n" + f"From: {e['from_name']} <{e['from_email']}>\n" + f"Subject: {e['subject']}\n" + f"Date: {e['date_raw']}\n" + f"Body: {e['body'][:400]}\n\n" + ) + + triage_prompt = ( + f"You are JARVIS, triaging {len(emails)} emails for Myron Blair.\n\n" + "For each email assign:\n" + "- category: urgent | action | reply | meeting | info | promo | spam\n" + "- priority: 1-10 (10 = drop everything)\n" + "- summary: one sentence\n" + "- draft_reply: for urgent/action/reply/meeting ONLY — brief professional reply. Empty string otherwise.\n\n" + "Return ONLY a valid JSON array. Example:\n" + '[{"index":1,"category":"urgent","priority":9,"summary":"Contract needs signing today","draft_reply":"Hi, I will review and sign today."}]\n\n' + f"EMAILS TO TRIAGE:\n{email_list}" + ) + + try: + raw = await llm_call([{"role": "user", "content": triage_prompt}], provider) + raw = raw.strip() + if raw.startswith("```"): + raw = "\n".join(raw.split("\n")[1:]) + if raw.endswith("```"): + raw = raw[:-3] + triage_items = json.loads(raw.strip()) + except Exception as e: + log.warning(f"[COMMS] Triage parse error: {e}") + triage_items = [{"index": i+1, "category": "info", "priority": 3, + "summary": f"Subject: {e2['subject']}", "draft_reply": ""} + for i, e2 in enumerate(emails)] + + # Save to email_triage table + saved = 0 + for item in triage_items: + idx = int(item.get("index", 0)) - 1 + if idx < 0 or idx >= len(emails): + continue + e = emails[idx] + try: + date_parsed = None + if e.get("date_raw"): + try: + date_parsed = parsedate_to_datetime(e["date_raw"]).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + pass + await db_execute( + """INSERT INTO email_triage + (msg_id, account, from_name, from_email, subject, date_received, + category, priority, summary, draft_reply, action_taken) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'none') + ON DUPLICATE KEY UPDATE + category=VALUES(category), priority=VALUES(priority), + summary=VALUES(summary), draft_reply=VALUES(draft_reply)""", + (e["msg_id"], account, e["from_name"], e["from_email"], e["subject"], + date_parsed, item.get("category", "info"), int(item.get("priority", 3)), + item.get("summary", "")[:500], item.get("draft_reply", "")[:3000]) + ) + saved += 1 + except Exception as ex: + log.debug(f"[COMMS] Save triage error: {ex}") + + counts = {} + for item in triage_items: + c = item.get("category", "info") + counts[c] = counts.get(c, 0) + 1 + + urgent_items = [] + for it in triage_items: + idx = int(it.get("index", 0)) - 1 + if 0 <= idx < len(emails) and it.get("category") in ("urgent", "action", "reply", "meeting"): + urgent_items.append({ + "from": emails[idx]["from_name"], + "from_email": emails[idx]["from_email"], + "subject": emails[idx]["subject"][:80], + "summary": it.get("summary", ""), + "priority": it.get("priority", 0), + "draft_reply": it.get("draft_reply", ""), + "category": it.get("category", ""), + }) + urgent_items.sort(key=lambda x: -x["priority"]) + + log.info(f"[COMMS] Triage complete: {len(emails)} emails, {saved} saved, counts={counts}") + return { + "account": account, + "triaged": len(emails), + "saved": saved, + "counts": counts, + "urgent": counts.get("urgent", 0), + "action": counts.get("action", 0), + "meeting": counts.get("meeting", 0), + "reply": counts.get("reply", 0), + "items": urgent_items[:10], + } + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 3: FIELD PROTOCOL — REMOTE EXEC +# ═══════════════════════════════════════════════════════════════════════════════ + +async def handle_remote_exec(payload: dict) -> dict: + """ + Queue a command to a JARVIS Field Station agent and wait for the result. + payload: { agent, command_type, command_data, timeout } + """ + agent_name = payload.get("agent", "").strip() + cmd_type = payload.get("command_type", "ping") + cmd_data = payload.get("command_data", {}) + timeout_secs = min(int(payload.get("timeout", 35)), 120) + + if not agent_name: + raise ValueError("Missing agent name") + + log.info(f"[FIELD] remote_exec {cmd_type} on {agent_name}") + + # NOTE: _dispatch_agent_command is defined in Phase 4 block below. + # Forward declaration — called at runtime, not at import. + dispatch = await _dispatch_agent_command(agent_name, cmd_type, cmd_data, timeout_secs) + return { + "agent": dispatch["agent"], + "agent_id": dispatch["agent_id"], + "command_type": cmd_type, + "command_data": cmd_data, + "cmd_id": dispatch["cmd_id"], + "status": dispatch["status"], + "result": dispatch["result"], + } + +# ── PHASE 4: VISION PROTOCOL ────────────────────────────────────────────────── + +async def _dispatch_agent_command(agent_name: str, cmd_type: str, cmd_data: dict, + timeout_secs: int = 45) -> dict: + """Shared helper: find agent, queue command, poll for result.""" + agent = await db_fetchone( + """SELECT agent_id, hostname, status FROM registered_agents + WHERE (hostname LIKE %s OR agent_id LIKE %s) AND status='online' LIMIT 1""", + (f"%{agent_name}%", f"%{agent_name}%") + ) + if not agent: + all_agents = await db_fetchall("SELECT hostname FROM registered_agents WHERE status='online'") + names = [a["hostname"] for a in all_agents] + raise ValueError(f"No online agent matching '{agent_name}'. Online: {', '.join(names) or 'none'}") + + cmd_id = await db_execute( + "INSERT INTO agent_commands (agent_id, command_type, command_data) VALUES (%s, %s, %s)", + (agent["agent_id"], cmd_type, json.dumps(cmd_data)) + ) + elapsed = 0 + while elapsed < timeout_secs: + await asyncio.sleep(2) + elapsed += 2 + row = await db_fetchone("SELECT status, result FROM agent_commands WHERE id=%s", (cmd_id,)) + if row and row["status"] in ("executed", "failed"): + result = row.get("result") + if result and isinstance(result, str): + try: + result = json.loads(result) + except Exception: + pass + return {"agent": agent["hostname"], "agent_id": agent["agent_id"], + "cmd_id": cmd_id, "status": row["status"], "result": result or {}} + await db_execute("UPDATE agent_commands SET status='failed' WHERE id=%s AND status='pending'", (cmd_id,)) + raise TimeoutError(f"No response from {agent['hostname']} within {timeout_secs}s") + + +async def handle_screenshot(payload: dict) -> dict: + """ + Capture a screenshot (or system snapshot) from a field agent, then optionally + run vision analysis via Claude. + payload: { agent, analyze: true|false, analyze_prompt: "...", timeout: 45 } + """ + agent_name = payload.get("agent", "").strip() + do_analyze = bool(payload.get("analyze", True)) + analyze_prompt = payload.get("analyze_prompt", "Describe what you see on this screen in detail. Note any important status indicators, errors, running processes, or interesting information.") + timeout_secs = min(int(payload.get("timeout", 45)), 120) + + if not agent_name: + raise ValueError("Missing agent name") + + log.info(f"[VISION] Screenshot request: agent={agent_name} analyze={do_analyze}") + + # Dispatch screenshot command to field agent + dispatch = await _dispatch_agent_command(agent_name, "screenshot", {}, timeout_secs) + result = dispatch.get("result", {}) + hostname = dispatch.get("agent", agent_name) + + if dispatch.get("status") == "failed" or not result.get("success"): + err = result.get("error", "Agent returned failure") if isinstance(result, dict) else str(result) + return {"agent": hostname, "success": False, "error": err} + + image_b64 = result.get("image_b64", "") + method = result.get("method", "unknown") + width = result.get("width", 0) + height = result.get("height", 0) + file_size = result.get("file_size", 0) + + # Run Claude vision analysis if we have an image + analysis = "" + provider_used = "" + if do_analyze and image_b64: + try: + import anthropic + client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) + msg = await client.messages.create( + model="claude-opus-4-8-20251101", + max_tokens=1024, + messages=[{ + "role": "user", + "content": [ + {"type": "image", "source": {"type": "base64", + "media_type": "image/png", "data": image_b64}}, + {"type": "text", "text": analyze_prompt}, + ], + }], + ) + analysis = msg.content[0].text if msg.content else "" + provider_used = "claude" + log.info(f"[VISION] Claude analysis complete ({len(analysis)} chars)") + except Exception as e: + log.warning(f"[VISION] Claude vision failed: {e}") + analysis = f"Vision analysis unavailable: {e}" + elif do_analyze and not image_b64 and result.get("snapshot_type") == "text": + # Text-only sysinfo snapshot — summarize with LLM + try: + snap_text = json.dumps(result, indent=2)[:3000] + prompt = f"Summarize this server system snapshot for JARVIS. Highlight any concerns:\n\n{snap_text}" + analysis = await llm_call([{"role": "user", "content": prompt}], "claude") + provider_used = "claude" + except Exception as e: + analysis = f"Analysis unavailable: {e}" + + # Store screenshot + screenshot_id = await db_execute( + """INSERT INTO agent_screenshots + (agent_id, hostname, method, image_b64, width, height, file_size, + vision_analysis, vision_provider) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", + (dispatch.get("agent_id", ""), hostname, method, + image_b64, width, height, file_size, analysis, provider_used) + ) + + log.info(f"[VISION] Screenshot saved: id={screenshot_id} agent={hostname} method={method}") + return { + "agent": hostname, + "screenshot_id": screenshot_id, + "method": method, + "width": width, + "height": height, + "file_size": file_size, + "has_image": bool(image_b64), + "analysis": analysis, + "provider": provider_used, + } + + +async def handle_vision(payload: dict) -> dict: + """ + Run Claude vision on an already-stored screenshot or a provided base64 image. + payload: { screenshot_id OR image_b64, prompt, provider } + """ + screenshot_id = payload.get("screenshot_id") + image_b64 = payload.get("image_b64", "") + prompt = payload.get("prompt", "Describe this image in detail.") + provider = payload.get("provider", "claude") + + if screenshot_id: + row = await db_fetchone( + "SELECT image_b64, hostname, method FROM agent_screenshots WHERE id=%s", + (int(screenshot_id),) + ) + if not row or not row["image_b64"]: + raise ValueError(f"Screenshot {screenshot_id} not found or has no image data") + image_b64 = row["image_b64"] + hostname = row.get("hostname", "unknown") + else: + hostname = "direct" + + if not image_b64: + raise ValueError("No image data provided") + + log.info(f"[VISION] Analysis: screenshot_id={screenshot_id} agent={hostname}") + + try: + import anthropic + client = anthropic.AsyncAnthropic(api_key=CLAUDE_API_KEY) + msg = await client.messages.create( + model="claude-opus-4-8-20251101", + max_tokens=2048, + messages=[{ + "role": "user", + "content": [ + {"type": "image", "source": {"type": "base64", + "media_type": "image/png", "data": image_b64}}, + {"type": "text", "text": prompt}, + ], + }], + ) + analysis = msg.content[0].text if msg.content else "" + except Exception as e: + raise RuntimeError(f"Vision analysis failed: {e}") + + # Update stored screenshot if we have an ID + if screenshot_id: + await db_execute( + "UPDATE agent_screenshots SET vision_analysis=%s, vision_provider=%s WHERE id=%s", + (analysis, "claude", int(screenshot_id)) + ) + + return { + "agent": hostname, + "screenshot_id": screenshot_id, + "prompt": prompt, + "analysis": analysis, + "provider": "claude", + } + + +async def handle_sysinfo(payload: dict) -> dict: + """ + Get a structured system snapshot from a field agent (no image). + Optionally ask Claude to summarize/assess the health of the machine. + payload: { agent, analyze: true|false, timeout: 30 } + """ + agent_name = payload.get("agent", "").strip() + do_analyze = bool(payload.get("analyze", True)) + timeout_secs = min(int(payload.get("timeout", 30)), 60) + + if not agent_name: + raise ValueError("Missing agent name") + + dispatch = await _dispatch_agent_command(agent_name, "sysinfo", {}, timeout_secs) + result = dispatch.get("result", {}) + hostname = dispatch.get("agent", agent_name) + + if dispatch.get("status") == "failed": + return {"agent": hostname, "success": False, "error": result.get("error", "Command failed")} + + analysis = "" + if do_analyze: + try: + snap = json.dumps(result, indent=2)[:3000] + summary_prompt = ( + f"You are JARVIS analyzing a field station sysinfo snapshot from '{hostname}'.\n\n" + f"Snapshot:\n{snap}\n\n" + "Provide a concise health assessment: status (healthy/warning/critical), " + "key metrics, any concerns, and recommended actions if needed. " + "Keep it under 150 words, JARVIS style." + ) + analysis = await llm_call([{"role": "user", "content": summary_prompt}], "claude") + except Exception as e: + analysis = f"Analysis unavailable: {e}" + + return { + "agent": hostname, + "success": True, + "snapshot": result, + "analysis": analysis, + } + + +# ═══════════════════════════════════════════════════════════════════════════════ +# JOB HANDLER REGISTRY +# ═══════════════════════════════════════════════════════════════════════════════ + +JOB_HANDLERS = { + "ping": handle_ping, + "echo": handle_echo, + "shell": handle_shell, + "llm": handle_llm, + "research": handle_research, + "tool_loop": handle_tool_loop, + "gmail_triage": handle_gmail_triage, + "remote_exec": handle_remote_exec, + # Phase 4 + "screenshot": handle_screenshot, + "vision": handle_vision, + "sysinfo": handle_sysinfo, + # Phase 5 + "sitrep": None, # registered after guardian functions are defined + "guardian_config": None, +} + +# ── PHASE 5: GUARDIAN MODE ──────────────────────────────────────────────────── + +# In-memory state: tracks last alert time per (agent_id, metric) to debounce +_guardian_state: dict = { + "enabled": True, + "last_scan": None, + "last_sitrep": None, + "alert_cooldown": {}, # key: "agent_id:metric" → last_alert_epoch + "online_snapshot": {}, # agent_id → was_online bool +} +GUARDIAN_COOLDOWN = 600 # seconds between repeat alerts for same metric + + +async def _guardian_get_config() -> dict: + rows = await db_fetchall("SELECT key_name, value FROM guardian_config") + return {r["key_name"]: r["value"] for r in rows} if rows else {} + + +async def _guardian_emit(event_type: str, severity: str, agent_id: str, + hostname: str, metric: str, value: float, + threshold: float, message: str, ai_analysis: str = "") -> int: + return await db_execute( + """INSERT INTO guardian_events + (event_type, severity, agent_id, hostname, metric, value, threshold, + message, ai_analysis) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", + (event_type, severity, agent_id, hostname, metric, value, threshold, + message, ai_analysis) + ) + + +async def _guardian_cooldown_ok(agent_id: str, metric: str) -> bool: + """Return True if enough time has passed since last alert for this agent+metric.""" + import time + key = f"{agent_id}:{metric}" + last = _guardian_state["alert_cooldown"].get(key, 0) + if (time.time() - last) >= GUARDIAN_COOLDOWN: + _guardian_state["alert_cooldown"][key] = time.time() + return True + return False + + +async def guardian_loop() -> None: + """ + Background task — runs every SCAN_INTERVAL seconds. + Checks all agents for anomalies, emits guardian_events, optionally + generates AI analysis for critical findings. + """ + import time + log.info("[GUARDIAN] Guardian Mode activated") + + while True: + try: + cfg = await _guardian_get_config() + if cfg.get("enabled", "1") == "0": + await asyncio.sleep(60) + continue + + scan_interval = int(cfg.get("scan_interval", 120)) + cpu_thresh = float(cfg.get("cpu_threshold", 85)) + mem_thresh = float(cfg.get("mem_threshold", 88)) + disk_thresh = float(cfg.get("disk_threshold", 88)) + offline_mins = int(cfg.get("offline_minutes", 3)) + ai_on = cfg.get("ai_analysis", "1") == "1" + proactive_chat = cfg.get("proactive_chat", "1") == "1" + + _guardian_state["last_scan"] = datetime.utcnow().isoformat() + critical_findings = [] + + # ── 1. Agent online/offline transitions ─────────────────────────── + agents = await db_fetchall( + "SELECT agent_id, hostname, status, last_seen FROM registered_agents" + ) + current_snapshot = {} + for ag in agents: + aid = ag["agent_id"] + hostname = ag["hostname"] + is_online = ag["status"] == "online" + current_snapshot[aid] = is_online + was_online = _guardian_state["online_snapshot"].get(aid) + + if was_online is True and not is_online: + # Agent went offline + if await _guardian_cooldown_ok(aid, "offline"): + eid = await _guardian_emit("agent_offline", "critical", aid, hostname, + "status", 0, 1, f"Field station '{hostname}' went OFFLINE") + critical_findings.append(f"⚠ {hostname} OFFLINE") + log.warning(f"[GUARDIAN] {hostname} went offline → event #{eid}") + + elif was_online is False and is_online: + # Agent came back + if await _guardian_cooldown_ok(aid, "online"): + await _guardian_emit("agent_online", "info", aid, hostname, + "status", 1, 0, f"Field station '{hostname}' back ONLINE") + log.info(f"[GUARDIAN] {hostname} came back online") + + _guardian_state["online_snapshot"] = current_snapshot + + # ── 2. Metrics analysis ─────────────────────────────────────────── + # Get latest metric per online agent + metrics_rows = await db_fetchall( + """SELECT m.agent_id, r.hostname, m.metric_data + FROM agent_metrics m + JOIN registered_agents r ON r.agent_id = m.agent_id + WHERE r.status = 'online' + AND m.recorded_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE) + ORDER BY m.recorded_at DESC""" + ) + seen_agents: set = set() + for row in metrics_rows: + aid = row["agent_id"] + if aid in seen_agents: + continue + seen_agents.add(aid) + hostname = row["hostname"] + + try: + m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"] + sys_data = m + + cpu = sys_data.get("cpu_percent") + if cpu is not None and float(cpu) >= cpu_thresh: + sev = "critical" if float(cpu) >= 95 else "warning" + if await _guardian_cooldown_ok(aid, "cpu"): + msg = f"{hostname}: CPU at {cpu:.1f}% (threshold: {cpu_thresh}%)" + await _guardian_emit("cpu_high", sev, aid, hostname, + "cpu_percent", float(cpu), cpu_thresh, msg) + critical_findings.append(f"CPU {hostname} {cpu:.0f}%") + + mem = sys_data.get("memory", {}) + mem_pct = mem.get("percent") if isinstance(mem, dict) else None + if mem_pct is not None and float(mem_pct) >= mem_thresh: + sev = "critical" if float(mem_pct) >= 95 else "warning" + if await _guardian_cooldown_ok(aid, "memory"): + msg = f"{hostname}: Memory at {mem_pct:.1f}% (threshold: {mem_thresh}%)" + await _guardian_emit("mem_high", sev, aid, hostname, + "mem_percent", float(mem_pct), mem_thresh, msg) + critical_findings.append(f"MEM {hostname} {mem_pct:.0f}%") + + disks = sys_data.get("disk", []) + if isinstance(disks, list): + for disk in disks: + dpct = disk.get("percent", 0) + if dpct and float(str(dpct).rstrip("%")) >= disk_thresh: + mount = disk.get("mountpoint", "/") + key = f"disk_{mount}" + if await _guardian_cooldown_ok(aid, key): + msg = f"{hostname}: Disk {mount} at {dpct}% (threshold: {disk_thresh}%)" + sev = "critical" if float(str(dpct).rstrip("%")) >= 95 else "warning" + await _guardian_emit("disk_high", sev, aid, hostname, + key, float(str(dpct).rstrip("%")), disk_thresh, msg) + critical_findings.append(f"DISK {hostname}{mount} {dpct}%") + + # Service anomalies + services = sys_data.get("services", []) + for svc in services: + if svc.get("status") == "failed": + svc_name = svc.get("service", "unknown") + if await _guardian_cooldown_ok(aid, f"svc_{svc_name}"): + msg = f"{hostname}: Service '{svc_name}' is FAILED" + await _guardian_emit("service_down", "critical", aid, hostname, + f"service:{svc_name}", 0, 1, msg) + critical_findings.append(f"SVC {hostname}/{svc_name} FAILED") + + except Exception as e: + log.debug(f"[GUARDIAN] Metrics parse error for {row['hostname']}: {e}") + + # ── 3. AI analysis for critical findings ────────────────────────── + if critical_findings and ai_on: + try: + findings_text = "\n".join(f"- {f}" for f in critical_findings) + ai_prompt = ( + f"You are JARVIS. The following anomalies were just detected:\n\n" + f"{findings_text}\n\n" + "Provide a brief (2-3 sentences), Iron Man-style alert message " + "for Myron. Be direct about severity and what action to take. " + "No markdown, no headers." + ) + ai_msg = await llm_call([{"role": "user", "content": ai_prompt}], "claude") + # Update the most recent guardian event with AI analysis + await db_execute( + """UPDATE guardian_events SET ai_analysis=%s + WHERE ai_analysis='' AND created_at > DATE_SUB(NOW(), INTERVAL 1 MINUTE) + ORDER BY id DESC LIMIT 1""", + (ai_msg,) + ) + # Inject proactive chat message if configured + if proactive_chat: + await _guardian_inject_chat(f"◈ GUARDIAN ALERT — {ai_msg}") + except Exception as e: + log.warning(f"[GUARDIAN] AI analysis error: {e}") + + if critical_findings: + log.warning(f"[GUARDIAN] Scan complete — {len(critical_findings)} findings: {', '.join(critical_findings)}") + else: + log.debug(f"[GUARDIAN] Scan complete — all clear") + + except Exception as exc: + log.error(f"[GUARDIAN] Loop error: {exc}") + + await asyncio.sleep(scan_interval) + + +async def _guardian_inject_chat(message: str) -> None: + """Write a proactive JARVIS message into the conversations table so the HUD picks it up.""" + try: + await db_execute( + """INSERT INTO conversations (session_id, role, message, created_at) + VALUES ('guardian', 'assistant', %s, NOW())""", + (message,) + ) + except Exception as e: + log.debug(f"[GUARDIAN] Chat inject error: {e}") + + +async def handle_sitrep(payload: dict) -> dict: + """ + Situation Report — comprehensive health briefing across all field stations. + payload: { detail: brief|full, provider: claude } + """ + detail = payload.get("detail", "full") + provider = payload.get("provider", "claude") + + log.info(f"[GUARDIAN] SITREP requested (detail={detail})") + + # 1. Gather agent status + agents = await db_fetchall( + """SELECT agent_id, hostname, status, ip_address, agent_type, + capabilities, last_seen + FROM registered_agents ORDER BY status DESC, hostname ASC""" + ) + + # 2. Get latest metrics per agent + metrics_map = {} + metrics_rows = await db_fetchall( + """SELECT m.agent_id, m.metric_data, m.recorded_at + FROM agent_metrics m + WHERE m.recorded_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE) + ORDER BY m.recorded_at DESC""" + ) + for row in metrics_rows: + if row["agent_id"] not in metrics_map: + try: + m = json.loads(row["metric_data"]) if isinstance(row["metric_data"], str) else row["metric_data"] + metrics_map[row["agent_id"]] = m + except Exception: + pass + + # 3. Recent guardian events (last 24h) + recent_events = await db_fetchall( + """SELECT event_type, severity, hostname, metric, value, threshold, message, created_at + FROM guardian_events + WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR) + ORDER BY created_at DESC LIMIT 20""" + ) + + # 4. Arc Reactor job stats + arc_stats = await db_fetchone( + """SELECT + SUM(status='queued') AS queued, + SUM(status='running') AS running, + SUM(status='done') AS done, + SUM(status='failed') AS failed + FROM arc_jobs WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)""" + ) + + # 5. Build context for Claude + agent_lines = [] + for ag in agents: + m = metrics_map.get(ag["agent_id"], {}) + sys = m if m else {} + cpu = sys.get("cpu_percent", "?") + mem = sys.get("memory", {}).get("percent", "?") if isinstance(sys.get("memory"), dict) else "?" + disk = next((d.get("percent","?") for d in (sys.get("disk") or []) if d.get("mount",d.get("mountpoint","")) == "/"), "?") + line = (f" {ag['hostname']} ({ag['status'].upper()}) — " + f"CPU:{cpu}% MEM:{mem}% DISK:{disk}%") + agent_lines.append(line) + + event_lines = [ + f" [{e['severity'].upper()}] {e['hostname']}: {e['message']} ({e['created_at']})" + for e in recent_events + ] or [" No events in last 24h"] + + sitrep_context = ( + f"JARVIS SITREP — {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}\n\n" + f"FIELD STATIONS ({len(agents)} total):\n" + "\n".join(agent_lines) + "\n\n" + f"ARC REACTOR (last 24h): queued={arc_stats.get('queued',0)} " + f"running={arc_stats.get('running',0)} done={arc_stats.get('done',0)} " + f"failed={arc_stats.get('failed',0)}\n\n" + f"GUARDIAN EVENTS (last 24h):\n" + "\n".join(event_lines) + ) + + prompt = ( + f"{sitrep_context}\n\n" + f"You are JARVIS. Deliver a {'brief 3-sentence' if detail == 'brief' else 'comprehensive'} " + f"situation report for Myron Blair. Iron Man style — direct, confident, actionable. " + f"Highlight: overall health status, any critical issues requiring immediate attention, " + f"and key metrics. " + + ("Keep it under 80 words." if detail == "brief" else "Structure: Overall Status → Issues → Metrics → Recommendation.") + ) + + analysis = await llm_call([{"role": "user", "content": prompt}], provider) + _guardian_state["last_sitrep"] = datetime.utcnow().isoformat() + + # Log SITREP as guardian event + await _guardian_emit("sitrep", "info", "system", "system", "sitrep", 0, 0, + f"SITREP generated ({detail})", analysis) + + online_count = sum(1 for a in agents if a["status"] == "online") + offline_count = len(agents) - online_count + critical_events = sum(1 for e in recent_events if e["severity"] == "critical") + + return { + "sitrep": analysis, + "agents_online": online_count, + "agents_offline": offline_count, + "agents_total": len(agents), + "events_24h": len(recent_events), + "critical_24h": critical_events, + "arc_stats": dict(arc_stats) if arc_stats else {}, + "timestamp": datetime.utcnow().isoformat(), + "detail": detail, + } + + +async def handle_guardian_config(payload: dict) -> dict: + """ + Get or set Guardian Mode configuration. + payload: { action: get|set, key: ..., value: ... } or { action: set, config: {...} } + """ + action = payload.get("action", "get") + + if action == "get": + cfg = await _guardian_get_config() + return {"config": cfg, "guardian_state": { + "enabled": _guardian_state.get("enabled"), + "last_scan": _guardian_state.get("last_scan"), + "last_sitrep": _guardian_state.get("last_sitrep"), + }} + + elif action == "set": + updates = payload.get("config", {}) + if payload.get("key") and payload.get("value") is not None: + updates[payload["key"]] = str(payload["value"]) + for k, v in updates.items(): + await db_execute( + "INSERT INTO guardian_config (key_name, value) VALUES (%s, %s) " + "ON DUPLICATE KEY UPDATE value=%s, updated_at=NOW()", + (k, str(v), str(v)) + ) + if k == "enabled": + _guardian_state["enabled"] = v not in ("0", "false", "off") + return {"ok": True, "updated": list(updates.keys())} + + raise ValueError(f"Unknown guardian_config action: {action}") + + +# Register Phase 5 handlers +JOB_HANDLERS["sitrep"] = handle_sitrep +JOB_HANDLERS["guardian_config"] = handle_guardian_config + +# ── PHASE 6: COMMS v2 — SEND EMAIL + SCHEDULE + MEETING PREP ───────────────── + +import smtplib +import ssl as _ssl +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.utils import formataddr, make_msgid + +SMTP_SETTINGS = { + "gmail": ("smtp.gmail.com", 587, GMAIL_USER, GMAIL_PASS), + "icloud": ("smtp.mail.me.com", 587, ICLOUD_USER, ICLOUD_PASS), +} + +def _smtp_send_sync(account: str, to_email: str, to_name: str, + subject: str, body: str, + reply_to_msg_id: str = "") -> dict: + """Synchronous SMTP send — run in executor.""" + host, port, user, passwd = SMTP_SETTINGS.get(account, SMTP_SETTINGS["gmail"]) + if not user or not passwd: + return {"success": False, "error": "No credentials configured for account"} + + try: + msg = MIMEMultipart("alternative") + from_addr = formataddr(("JARVIS / Myron Blair", user)) + msg["From"] = from_addr + msg["To"] = formataddr((to_name, to_email)) if to_name else to_email + msg["Subject"] = subject + msg_id = make_msgid(domain=user.split("@")[1]) + msg["Message-ID"] = msg_id + if reply_to_msg_id: + msg["In-Reply-To"] = f"<{reply_to_msg_id.strip('<>')}>" + msg["References"] = f"<{reply_to_msg_id.strip('<>')}>" + + msg.attach(MIMEText(body, "plain", "utf-8")) + + ctx = _ssl.create_default_context() + with smtplib.SMTP(host, port, timeout=20) as smtp: + smtp.ehlo() + smtp.starttls(context=ctx) + smtp.login(user, passwd) + smtp.sendmail(user, [to_email], msg.as_bytes()) + + return {"success": True, "message_id": msg_id} + except Exception as e: + return {"success": False, "error": str(e)} + + +async def handle_send_email(payload: dict) -> dict: + """ + Send an email from Gmail or iCloud. + payload: { + account: gmail|icloud, + to_email: recipient address, + to_name: recipient display name (optional), + subject: subject line, + body: plain-text body, + triage_id: int (optional — if replying to a triage item), + reply_to_msg_id: original Message-ID for threading (optional), + compose: true — ask Claude to draft the body from a prompt first + } + """ + account = payload.get("account", "gmail") + to_email = payload.get("to_email", "").strip() + to_name = payload.get("to_name", "").strip() + subject = payload.get("subject", "").strip() + body = payload.get("body", "").strip() + triage_id = payload.get("triage_id") + reply_msg_id = payload.get("reply_to_msg_id", "") + compose_prompt = payload.get("compose_prompt", "") + + # If triage_id is given, pull recipient + subject + draft from email_triage + if triage_id and not to_email: + row = await db_fetchone( + "SELECT from_email, from_name, subject, draft_reply, msg_id FROM email_triage WHERE id=%s", + (int(triage_id),) + ) + if row: + to_email = to_email or row["from_email"] + to_name = to_name or row["from_name"] + subject = subject or ("Re: " + (row["subject"] or "")) + body = body or row.get("draft_reply", "") + reply_msg_id = reply_msg_id or row.get("msg_id", "") + + if not to_email: + raise ValueError("Missing to_email") + + # If compose requested, use Claude to write the body + if compose_prompt and not body: + draft_prompt = ( + f"You are drafting an email on behalf of Myron Blair.\n" + f"To: {to_name or to_email}\n" + f"Subject: {subject}\n\n" + f"Instructions: {compose_prompt}\n\n" + "Write a professional, concise email body. No subject line, no salutation header. " + "Start with 'Hi [name],' or similar. Sign off as 'Myron Blair'." + ) + body = await llm_call([{"role": "user", "content": draft_prompt}], "claude") + + if not subject: + raise ValueError("Missing subject") + if not body: + raise ValueError("Missing email body — provide body or compose_prompt") + + log.info(f"[COMMS] Sending email: {account} → {to_email} | {subject[:60]}") + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, lambda: _smtp_send_sync(account, to_email, to_name, subject, body, reply_msg_id) + ) + + # Record in email_sent + status = "sent" if result["success"] else "failed" + await db_execute( + """INSERT INTO email_sent + (account, to_email, to_name, subject, body, triage_id, status, error, message_id) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""", + (account, to_email, to_name, subject, body, + triage_id or None, status, + result.get("error", ""), result.get("message_id", "")) + ) + + # Update triage item if this was a reply + if triage_id and result["success"]: + await db_execute( + "UPDATE email_triage SET action_taken='replied' WHERE id=%s", (int(triage_id),) + ) + + log.info(f"[COMMS] Send result: {status} → {to_email}") + return { + "success": result["success"], + "account": account, + "to_email": to_email, + "subject": subject, + "status": status, + "message_id": result.get("message_id", ""), + "error": result.get("error", ""), + "body": body, + } + + +async def handle_compose_email(payload: dict) -> dict: + """ + Compose an email from natural-language instructions — draft only (no auto-send). + payload: { to_email, to_name, subject_hint, instructions, account, send: false } + """ + to_email = payload.get("to_email", "").strip() + to_name = payload.get("to_name", "").strip() + subject_hint = payload.get("subject_hint", "").strip() + instructions = payload.get("instructions", "").strip() + account = payload.get("account", "gmail") + auto_send = bool(payload.get("send", False)) + + if not instructions: + raise ValueError("Missing instructions for compose") + + # Generate subject if not given + if not subject_hint: + subj_prompt = f"Write a concise email subject line (max 8 words) for an email about: {instructions}" + subject_hint = (await llm_call([{"role": "user", "content": subj_prompt}], "claude")).strip().strip('"') + + # Draft full body + draft_prompt = ( + f"You are drafting an email for Myron Blair.\n" + f"To: {to_name or to_email or 'the recipient'}\n" + f"Subject: {subject_hint}\n\n" + f"Instructions: {instructions}\n\n" + "Write a professional, concise email. Start with 'Hi [name],' or 'Hello,' " + "and sign off as 'Myron Blair'. Return only the email body text." + ) + body = await llm_call([{"role": "user", "content": draft_prompt}], "claude") + + if auto_send and to_email: + return await handle_send_email({ + "account": account, + "to_email": to_email, + "to_name": to_name, + "subject": subject_hint, + "body": body, + }) + + # Draft-only — save to email_sent with status='queued' for review + draft_id = await db_execute( + """INSERT INTO email_sent + (account, to_email, to_name, subject, body, status) + VALUES (%s,%s,%s,%s,%s,'queued')""", + (account, to_email, to_name, subject_hint, body) + ) + + log.info(f"[COMMS] Composed draft #{draft_id}: {to_email} | {subject_hint[:60]}") + return { + "draft_id": draft_id, + "to_email": to_email, + "to_name": to_name, + "subject": subject_hint, + "body": body, + "account": account, + "sent": False, + "status": "queued", + } + + +async def handle_schedule_event(payload: dict) -> dict: + """ + Create or find an appointment in the JARVIS planner from natural language. + payload: { title, description, start_at, end_at, location, category, + natural_language, all_day, reminder_min, generate_ics } + """ + nl = payload.get("natural_language", "").strip() + title = payload.get("title", "").strip() + start_at = payload.get("start_at", "") + end_at = payload.get("end_at", "") + location = payload.get("location", "") + category = payload.get("category", "work") + all_day = bool(payload.get("all_day", False)) + reminder = int(payload.get("reminder_min", 30)) + description = payload.get("description", "") + gen_ics = bool(payload.get("generate_ics", True)) + + # If natural language given, parse with Claude + if nl and (not title or not start_at): + now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") + parse_prompt = ( + f"Current datetime: {now_str}\n\n" + f"Parse this scheduling request into JSON:\n\"{nl}\"\n\n" + "Return ONLY valid JSON with these fields (no markdown):\n" + '{"title":"...","start_at":"YYYY-MM-DD HH:MM:SS","end_at":"YYYY-MM-DD HH:MM:SS or null",' + '"location":"...","category":"personal|work|medical|other","all_day":false,' + '"description":"..."}\n' + "Use 24h time. Default duration 1 hour if not specified. " + "If only a date (no time) is given, set all_day=true." + ) + raw = await llm_call([{"role": "user", "content": parse_prompt}], "claude") + raw = raw.strip().strip("```json").strip("```").strip() + try: + parsed = json.loads(raw) + title = title or parsed.get("title", nl[:100]) + start_at = start_at or parsed.get("start_at", "") + end_at = end_at or parsed.get("end_at") or "" + location = location or parsed.get("location", "") + category = parsed.get("category", category) + all_day = parsed.get("all_day", all_day) + description = description or parsed.get("description", "") + except Exception as e: + log.warning(f"[COMMS] Schedule parse failed: {e} — raw: {raw[:100]}") + if not title: + title = nl[:100] + + if not title: + raise ValueError("Could not determine event title from input") + if not start_at: + raise ValueError("Could not determine start time from input") + + # Insert into appointments table + appt_id = await db_execute( + """INSERT INTO appointments + (title, description, category, start_at, end_at, location, + all_day, reminder_min, external_source) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,'manual')""", + (title, description, category, start_at, + end_at or None, location, int(all_day), reminder) + ) + + # Generate ICS if requested + ics_content = "" + if gen_ics and start_at: + try: + from datetime import datetime as dt + uid = f"jarvis-{appt_id}@orbishosting.com" + dtstart = start_at.replace(" ", "T").replace("-", "").replace(":", "") + dtend = (end_at or start_at).replace(" ", "T").replace("-", "").replace(":", "") + dtstamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + ics_content = ( + "BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//JARVIS//EN\r\n" + "BEGIN:VEVENT\r\n" + f"UID:{uid}\r\n" + f"DTSTAMP:{dtstamp}\r\n" + f"DTSTART:{dtstart}\r\n" + f"DTEND:{dtend}\r\n" + f"SUMMARY:{title}\r\n" + f"DESCRIPTION:{description}\r\n" + f"LOCATION:{location}\r\n" + "END:VEVENT\r\nEND:VCALENDAR\r\n" + ) + except Exception: + pass + + log.info(f"[COMMS] Event scheduled: #{appt_id} '{title}' @ {start_at}") + return { + "appointment_id": appt_id, + "title": title, + "start_at": start_at, + "end_at": end_at, + "location": location, + "category": category, + "all_day": all_day, + "ics": ics_content, + "description": description, + } + + +async def handle_meeting_prep(payload: dict) -> dict: + """ + Prepare a briefing for an upcoming meeting. + payload: { appointment_id OR title OR timeframe, research: true } + """ + appt_id = payload.get("appointment_id") + title = payload.get("title", "").strip() + timeframe = payload.get("timeframe", "today") + do_research = bool(payload.get("research", True)) + + # Find the appointment + appt = None + if appt_id: + appt = await db_fetchone("SELECT * FROM appointments WHERE id=%s", (int(appt_id),)) + elif title: + appt = await db_fetchone( + "SELECT * FROM appointments WHERE title LIKE %s AND start_at >= NOW() ORDER BY start_at LIMIT 1", + (f"%{title}%",) + ) + else: + appt = await db_fetchone( + """SELECT * FROM appointments + WHERE start_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 24 HOUR) + ORDER BY start_at LIMIT 1""" + ) + + if not appt: + # Check tasks too + task = await db_fetchone( + "SELECT * FROM tasks WHERE title LIKE %s AND status != 'done' ORDER BY due_date LIMIT 1", + (f"%{title}%",) + ) + if task: + appt = {"title": task["title"], "description": task.get("notes",""), + "start_at": str(task.get("due_date") or ""), "location": ""} + + if not appt: + return {"success": False, "error": f"No upcoming appointment found matching '{title or timeframe}'"} + + appt_title = appt.get("title", "") + appt_start = str(appt.get("start_at", "")) + appt_desc = appt.get("description", "") + appt_loc = appt.get("location", "") + + # Optional: kick off a research job on the meeting topic/attendees + research_summary = "" + if do_research and appt_title: + try: + research_result = await handle_research({ + "query": f"background information on: {appt_title}", + "depth": "quick", + "provider": "claude", + }) + research_summary = research_result.get("synthesis", "")[:1500] + except Exception: + pass + + # Build meeting briefing + context = ( + f"Meeting: {appt_title}\n" + f"Time: {appt_start}\n" + f"Location: {appt_loc or 'Not specified'}\n" + f"Notes: {appt_desc or 'None'}\n" + ) + if research_summary: + context += f"\nBackground Research:\n{research_summary}" + + prompt = ( + f"You are JARVIS. Prepare a concise meeting briefing for Myron Blair.\n\n" + f"{context}\n\n" + "Format: Meeting summary → Key context/background → Suggested talking points → " + "Action items to prepare. Keep it sharp and actionable. Iron Man style." + ) + briefing = await llm_call([{"role": "user", "content": prompt}], "claude") + + log.info(f"[COMMS] Meeting prep complete: '{appt_title}'") + return { + "appointment_id": appt.get("id"), + "title": appt_title, + "start_at": appt_start, + "location": appt_loc, + "briefing": briefing, + "has_research": bool(research_summary), + } + + +# Register Phase 6 handlers +JOB_HANDLERS["send_email"] = handle_send_email +JOB_HANDLERS["compose_email"] = handle_compose_email +JOB_HANDLERS["schedule_event"] = handle_schedule_event +JOB_HANDLERS["meeting_prep"] = handle_meeting_prep + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 7: MISSION OPS — multi-step automated workflows +# ═══════════════════════════════════════════════════════════════════════════════ + +import re as _re + +def _render_template(text: str, context: dict) -> str: + """Replace {{key.subkey}} tokens in a string/JSON with values from context.""" + if not isinstance(text, str): + return text + def replacer(m): + path = m.group(1).strip().split(".") + val = context + for p in path: + if isinstance(val, dict): + val = val.get(p, m.group(0)) + else: + return m.group(0) + return str(val) if not isinstance(val, (dict, list)) else json.dumps(val) + return _re.sub(r'\{\{([^}]+)\}\}', replacer, text) + +def _apply_templates(payload: Any, context: dict) -> Any: + """Recursively apply template substitution to a payload dict/list/str.""" + if isinstance(payload, str): + return _render_template(payload, context) + if isinstance(payload, dict): + return {k: _apply_templates(v, context) for k, v in payload.items()} + if isinstance(payload, list): + return [_apply_templates(v, context) for v in payload] + return payload + +async def _execute_mission(mission_id: int, trigger_source: str = "manual") -> dict: + """Run all steps of a mission sequentially, log results, return summary.""" + mission = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,)) + if not mission: + return {"error": f"Mission {mission_id} not found"} + + steps = await db_fetchall( + "SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC", + (mission_id,) + ) + if not steps: + return {"error": "Mission has no steps"} + + # Create run record + run_id = await db_execute( + "INSERT INTO mission_runs (mission_id, status, trigger_source, steps_log) VALUES (%s,'running',%s,'[]')", + (mission_id, trigger_source) + ) + await db_execute( + "UPDATE missions SET last_run_at=NOW(), run_count=run_count+1 WHERE id=%s", + (mission_id,) + ) + + context = {"trigger": {"source": trigger_source, "mission_id": mission_id}} + steps_log = [] + overall_status = "done" + + for i, step in enumerate(steps): + step_label = step.get("label") or step["job_type"] + raw_payload = step.get("job_payload") or {} + if isinstance(raw_payload, str): + try: + raw_payload = json.loads(raw_payload) + except Exception: + raw_payload = {} + + payload = _apply_templates(raw_payload, context) + step_entry = { + "step": i, + "label": step_label, + "job_type": step["job_type"], + "status": "running", + "result": None, + "error": None, + } + + try: + handler = JOB_HANDLERS.get(step["job_type"]) + if not handler: + raise ValueError(f"Unknown job type: {step['job_type']}") + result = await handler(payload) + step_entry["status"] = "done" + step_entry["result"] = result + context[f"step_{i}"] = result + log.info(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) done") + except Exception as exc: + step_entry["status"] = "failed" + step_entry["error"] = str(exc)[:500] + log.warning(f"[MISSION {mission_id} RUN {run_id}] Step {i} ({step_label}) failed: {exc}") + if not step.get("continue_on_failure"): + overall_status = "failed" + steps_log.append(step_entry) + break + + steps_log.append(step_entry) + + await db_execute( + "UPDATE mission_runs SET status=%s, steps_log=%s, completed_at=NOW() WHERE id=%s", + (overall_status, json.dumps(steps_log, default=str), run_id) + ) + return { + "run_id": run_id, + "status": overall_status, + "steps": len(steps_log), + "steps_log": steps_log, + } + +async def handle_run_mission(payload: dict) -> dict: + mission_id = int(payload.get("mission_id") or 0) + if not mission_id: + return {"error": "Missing mission_id"} + source = payload.get("trigger_source", "manual") + return await _execute_mission(mission_id, source) + +JOB_HANDLERS["run_mission"] = handle_run_mission + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 8: MISSION DIRECTIVES — OKR / goal tracking with AI review +# ═══════════════════════════════════════════════════════════════════════════════ + +async def handle_directive_review(payload: dict) -> dict: + """AI-powered review of active directives: progress analysis + recommendations.""" + directive_id = payload.get("directive_id") + provider = payload.get("provider", "claude") + + # Fetch directives + if directive_id: + dirs = await db_fetchall( + "SELECT * FROM directives WHERE id=%s", (directive_id,) + ) + else: + dirs = await db_fetchall( + "SELECT * FROM directives WHERE status='active' ORDER BY priority DESC LIMIT 10" + ) + + if not dirs: + return {"error": "No active directives found"} + + dir_ids = [d["id"] for d in dirs] + placeholders = ",".join(["%s"] * len(dir_ids)) + + key_results = await db_fetchall( + f"SELECT * FROM directive_key_results WHERE directive_id IN ({placeholders}) ORDER BY directive_id,id", + dir_ids + ) or [] + + links = await db_fetchall( + f"""SELECT dl.directive_id, dl.link_type, dl.note, + COALESCE(t.title, a.title) AS linked_title, + COALESCE(t.status, 'scheduled') AS linked_status + FROM directive_links dl + LEFT JOIN tasks t ON dl.link_type='task' AND t.id=dl.link_id + LEFT JOIN appointments a ON dl.link_type='appointment' AND a.id=dl.link_id + WHERE dl.directive_id IN ({placeholders})""", + dir_ids + ) or [] + + # Build context block + context_lines = [] + for d in dirs: + d_krs = [kr for kr in key_results if kr["directive_id"] == d["id"]] + d_links = [lk for lk in links if lk["directive_id"] == d["id"]] + kr_sum_cur = sum(float(kr["current_value"]) for kr in d_krs) + kr_sum_tgt = sum(float(kr["target_value"]) for kr in d_krs) or 1 + pct = round(kr_sum_cur / kr_sum_tgt * 100, 1) + + context_lines.append(f"\n## DIRECTIVE: {d['title']} ({d['category'].upper()}) — {pct}% complete") + context_lines.append(f" Status: {d['status']} | Priority: {d['priority']}/10 | Target: {d.get('target_date') or 'no date'}") + if d.get("description"): + context_lines.append(f" Description: {d['description']}") + for kr in d_krs: + context_lines.append(f" KR: {kr['title']} — {kr['current_value']}/{kr['target_value']} {kr['unit']}") + for lk in d_links: + status = lk.get("linked_status") or "" + context_lines.append(f" Linked {lk['link_type']}: {lk.get('linked_title') or lk.get('note') or '—'} [{status}]") + + context = "\n".join(context_lines) + today = datetime.utcnow().strftime("%Y-%m-%d") + + prompt = f"""You are JARVIS, an AI assistant conducting a Mission Directives review for your principal. +Today is {today}. + +{context} + +Provide a concise executive briefing covering: +1. Overall progress summary (2-3 sentences) +2. For each directive: current status, what's on track, what needs attention +3. Top 3 recommended next actions to move the highest-priority directives forward +4. Any directives at risk of missing their target date + +Keep the tone confident and action-oriented. Format with clear sections. Under 400 words.""" + + review_text = "" + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.anthropic.com/v1/messages", + headers={"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"}, + json={"model": CLAUDE_MODEL, "max_tokens": 600, "messages": [{"role": "user", "content": prompt}]}, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + rdata = await resp.json() + review_text = rdata.get("content", [{}])[0].get("text", "") + except Exception as e: + review_text = f"[Review generation failed: {e}]" + + # Store in conversations so HUD can surface it + await db_execute( + "INSERT INTO conversations (session_id, role, message, created_at) VALUES ('guardian','assistant',%s,NOW())", + (review_text,) + ) + + return { + "review": review_text, + "directives": len(dirs), + "provider": provider, + } + +JOB_HANDLERS["directive_review"] = handle_directive_review + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 10: MEMORY CORE — auto-extraction knowledge graph +# ═══════════════════════════════════════════════════════════════════════════════ + +_MEMORY_EXTRACT_PROMPT = """Extract factual information about the user from this conversation exchange. Focus on: +- Preferences (likes/dislikes, preferred ways of working) +- People they mention (names, relationships, roles) +- Places (home, work, locations) +- Routines (schedules, habits) +- Goals (things they want to achieve) +- Instructions to JARVIS (always/never rules) +- Key facts about their life/work/projects + +Return a JSON array. Each element: {{"category": "", "subject": "", "predicate": "", "object": "", "confidence": <0.0-1.0>}} + +Rules: +- Only extract clearly stated facts, not speculation +- subject/predicate should be concise (under 50 chars each) +- confidence: 0.95 for explicit statements, 0.75-0.90 for clear implications +- Skip ephemeral info (what they asked just now, current queries) +- Max 8 facts per exchange +- Return [] if nothing durable worth remembering + +Exchange: +USER: {user_msg} +JARVIS: {asst_msg} + +Return ONLY valid JSON array, nothing else.""" + +async def handle_memory_extract(payload: dict) -> dict: + user_msg = str(payload.get("user_message", "")).strip() + asst_msg = str(payload.get("assistant_message", "")).strip() + conv_id = payload.get("conversation_id") + + if not user_msg and not asst_msg: + return {"ok": False, "reason": "empty exchange"} + + prompt = _MEMORY_EXTRACT_PROMPT.format( + user_msg=user_msg[:800], + asst_msg=asst_msg[:800] + ) + + raw = "" + try: + # Use Haiku for speed/cost; fall back to Groq + headers = {"x-api-key": CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json"} + body = { + "model": "claude-haiku-4-5-20251001", + "max_tokens": 800, + "messages": [{"role": "user", "content": prompt}] + } + async with aiohttp.ClientSession() as session: + async with session.post("https://api.anthropic.com/v1/messages", json=body, + headers=headers, timeout=aiohttp.ClientTimeout(total=20)) as resp: + if resp.status == 200: + data = await resp.json() + raw = data["content"][0]["text"].strip() + else: + raise RuntimeError(f"Claude {resp.status}") + except Exception as e: + log.warning(f"[MEMORY] Claude extract failed ({e}), trying Groq") + try: + raw = await llm_call([{"role": "user", "content": prompt}], provider="groq") + except Exception as e2: + log.error(f"[MEMORY] All providers failed: {e2}") + return {"ok": False, "reason": str(e2)} + + # Parse JSON — strip code fences if present + if "```" in raw: + raw = raw.split("```")[1] + if raw.startswith("json"): + raw = raw[4:] + raw = raw.strip() + + try: + facts = json.loads(raw) + if not isinstance(facts, list): + facts = [] + except Exception: + log.warning(f"[MEMORY] JSON parse failed. Raw: {raw[:200]}") + return {"ok": False, "reason": "json_parse_failed"} + + valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"} + inserted = 0 + for f in facts: + if not isinstance(f, dict): + continue + subj = str(f.get("subject", "")).strip()[:255] + pred = str(f.get("predicate", "is")).strip()[:255] + obj = str(f.get("object", "")).strip() + cat = f.get("category", "fact") + conf = min(1.0, max(0.0, float(f.get("confidence", 0.85)))) + if not subj or not obj or cat not in valid_cats: + continue + try: + await db_execute( + """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source, conversation_id) + VALUES (%s, %s, %s, %s, %s, 'conversation', %s) + ON DUPLICATE KEY UPDATE + object=VALUES(object), + confidence=GREATEST(confidence, VALUES(confidence)), + confirmed_count=confirmed_count+1, + last_confirmed_at=NOW(), + active=1""", + (cat, subj, pred, obj, conf, conv_id) + ) + inserted += 1 + except Exception as e: + log.warning(f"[MEMORY] Insert ({subj},{pred}) failed: {e}") + + log.info(f"[MEMORY] Stored {inserted}/{len(facts)} facts from conv {conv_id}") + return {"ok": True, "extracted": inserted, "candidates": len(facts)} + + +async def handle_memory_store(payload: dict) -> dict: + """Explicit memory insertion — from 'remember that X' voice commands.""" + subject = str(payload.get("subject", "user")).strip()[:255] + predicate = str(payload.get("predicate", "note")).strip()[:255] + obj = str(payload.get("object", "")).strip() + category = payload.get("category", "fact") + if not obj: + return {"ok": False, "reason": "empty object"} + valid_cats = {"preference", "person", "place", "routine", "goal", "fact", "instruction"} + if category not in valid_cats: + category = "fact" + fid = await db_execute( + """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source) + VALUES (%s, %s, %s, %s, 1.0, 'explicit') + ON DUPLICATE KEY UPDATE + object=VALUES(object), confidence=1.0, + confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""", + (category, subject, predicate, obj) + ) + log.info(f"[MEMORY] Explicit store: [{category}] {subject} {predicate}: {obj}") + return {"ok": True, "id": fid} + + +JOB_HANDLERS["memory_extract"] = handle_memory_extract +JOB_HANDLERS["memory_store"] = handle_memory_store + +# ═══════════════════════════════════════════════════════════════════════════════ +# PHASE 9: CLEARANCE PROTOCOL — approval gating for high-risk operations +# ═══════════════════════════════════════════════════════════════════════════════ + +def _clearance_describe(job_type: str, payload: dict) -> str: + """Human-readable description of what the job would do.""" + if job_type == "shell": + cmd = payload.get("command", payload.get("cmd", "?")) + agent = payload.get("agent_id", payload.get("agent", "unknown")) + return f"Execute shell command on agent '{agent}': {str(cmd)[:120]}" + if job_type == "send_email": + to = payload.get("to_email") or payload.get("target", "?") + subj = payload.get("subject", "") + tid = payload.get("triage_id") + if tid: + return f"Send SMTP reply to triage item #{tid} (to: {to})" + return f"Send email to {to}" + (f" — {subj}" if subj else "") + if job_type == "remote_exec": + cmd_type = payload.get("command_type", payload.get("command", "?")) + agent = payload.get("agent_id", payload.get("agent", "?")) + return f"Remote execute '{cmd_type}' on agent '{agent}'" + if job_type == "purge": + return "Purge all completed/failed jobs from the Arc Reactor queue" + return f"Execute {job_type} job" + +async def _check_clearance(job_type: str, payload: dict, created_by: str) -> Optional[dict]: + """ + Returns a clearance-pending response dict if approval is required, + or None if the job can proceed immediately. + """ + rule = await db_fetchone( + "SELECT * FROM clearance_rules WHERE job_type=%s AND enabled=1 AND require_approval=1", + (job_type,) + ) + if not rule: + return None + desc = _clearance_describe(job_type, payload) + expires = None + if rule.get("auto_approve_after_min") is not None: + expires = datetime.utcnow() + timedelta(minutes=int(rule["auto_approve_after_min"])) + cr_id = await db_execute( + "INSERT INTO clearance_requests (job_type,job_payload,risk_level,description,requested_by,status,expires_at) " + "VALUES (%s,%s,%s,%s,%s,'pending',%s)", + (job_type, json.dumps(payload), rule["risk_level"], desc, created_by, expires) + ) + log.warning(f"[CLEARANCE] Request #{cr_id} — {rule['risk_level'].upper()} — {desc}") + return { + "status": "pending_clearance", + "clearance_id": cr_id, + "risk_level": rule["risk_level"], + "description": desc, + "message": f"◈ CLEARANCE REQUIRED — {rule['risk_level'].upper()} risk operation intercepted. Approval needed before execution.", + } + +async def _dispatch_cleared_job(cr_id: int, job_type: str, payload: dict, priority: int = 7, decided_by: str = "admin") -> dict: + """Approve a clearance request and dispatch the job into the queue.""" + jid = await db_execute( + "INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)", + (job_type, json.dumps(payload), priority, f"clearance:{cr_id}") + ) + await db_execute( + "UPDATE clearance_requests SET status='approved', decided_by=%s, arc_job_id=%s, decided_at=NOW() WHERE id=%s", + (decided_by, jid, cr_id) + ) + log.info(f"[CLEARANCE] Request #{cr_id} approved by {decided_by} → Job #{jid}") + return {"job_id": jid, "clearance_id": cr_id, "status": "approved"} + +# ── Clearance watchdog ───────────────────────────────────────────────────────── + +async def clearance_watchdog() -> None: + """Expire timed-out pending requests; auto-approve those with auto_approve_after_min set.""" + log.info("Clearance watchdog started") + await asyncio.sleep(20) + while True: + try: + now = datetime.utcnow() + # Expire requests past their expiry with no auto-approve (expires_at IS NULL means never expires) + expired = await db_fetchall( + "SELECT cr.*, cru.auto_approve_after_min FROM clearance_requests cr " + "JOIN clearance_rules cru ON cru.job_type=cr.job_type " + "WHERE cr.status='pending' AND cr.expires_at IS NOT NULL AND cr.expires_at <= %s", + (now,) + ) or [] + for req in expired: + if req.get("auto_approve_after_min") is not None: + # Auto-approve: dispatch the job + payload = req.get("job_payload") or {} + if isinstance(payload, str): + try: payload = json.loads(payload) + except Exception: payload = {} + await _dispatch_cleared_job(req["id"], req["job_type"], payload, decided_by="auto_approve") + log.info(f"[CLEARANCE] Auto-approved request #{req['id']} ({req['job_type']})") + else: + await db_execute( + "UPDATE clearance_requests SET status='expired', decided_at=NOW() WHERE id=%s", + (req["id"],) + ) + log.info(f"[CLEARANCE] Expired request #{req['id']} ({req['job_type']})") + except Exception as exc: + log.error(f"Clearance watchdog error: {exc}") + await asyncio.sleep(60) + +# ── Mission trigger loop ─────────────────────────────────────────────────────── + +_mission_trigger_state: dict = {} # mission_id → last_triggered ISO + +async def mission_trigger_loop() -> None: + """Check scheduled and event-based mission triggers every 30 seconds.""" + log.info("Mission trigger loop started") + await asyncio.sleep(15) # stagger startup + while True: + try: + missions = await db_fetchall( + "SELECT * FROM missions WHERE enabled=1 AND trigger_type != 'manual'" + ) + for m in missions: + mid = m["id"] + ttype = m["trigger_type"] + cfg = m.get("trigger_config") or {} + if isinstance(cfg, str): + try: cfg = json.loads(cfg) + except Exception: cfg = {} + + should_run = False + source = ttype + + if ttype == "schedule": + interval_min = int(cfg.get("interval_minutes", 60)) + last_run = m.get("last_run_at") + if last_run is None: + should_run = True + else: + last_dt = last_run if isinstance(last_run, datetime) else datetime.fromisoformat(str(last_run)) + elapsed = (datetime.utcnow() - last_dt).total_seconds() / 60 + should_run = elapsed >= interval_min + + elif ttype == "guardian_event": + severity = cfg.get("severity", "") + etype = cfg.get("event_type", "") + last_key = f"ge_{mid}" + last_seen = _mission_trigger_state.get(last_key, "1970-01-01") + wheres = ["acknowledged=0", "created_at > %s"] + params: list = [last_seen] + if severity: wheres.append("severity=%s"); params.append(severity) + if etype: wheres.append("event_type=%s"); params.append(etype) + row = await db_fetchone( + f"SELECT id, created_at FROM guardian_events WHERE {' AND '.join(wheres)} ORDER BY created_at DESC LIMIT 1", + params + ) + if row: + should_run = True + _mission_trigger_state[last_key] = str(row["created_at"]) + source = f"guardian_event:{row['id']}" + + elif ttype == "email_keyword": + keywords = cfg.get("keywords", []) + category = cfg.get("category", "") + last_key = f"ek_{mid}" + last_seen = _mission_trigger_state.get(last_key, "1970-01-01") + if keywords: + kw_conds = " OR ".join(["subject LIKE %s OR summary LIKE %s"] * len(keywords)) + kw_params = [] + for kw in keywords: + kw_params += [f"%{kw}%", f"%{kw}%"] + where = f"created_at > %s AND ({kw_conds})" + params = [last_seen] + kw_params + if category: + where += " AND category=%s" + params.append(category) + row = await db_fetchone( + f"SELECT id, created_at FROM email_triage WHERE {where} ORDER BY created_at DESC LIMIT 1", + params + ) + if row: + should_run = True + _mission_trigger_state[last_key] = str(row["created_at"]) + source = f"email_triage:{row['id']}" + + if should_run: + log.info(f"[MISSION TRIGGER] Mission {mid} ({m['name']}) triggered by {source}") + asyncio.create_task(_execute_mission(mid, source)) + + except Exception as exc: + log.error(f"Mission trigger loop error: {exc}") + await asyncio.sleep(30) + +# ═══════════════════════════════════════════════════════════════════════════════ +# JOB RUNNER + BACKGROUND TASKS +# ═══════════════════════════════════════════════════════════════════════════════ + +_running_jobs: set = set() +_stats = {"done": 0, "failed": 0} + +async def run_job(job: dict) -> None: + jid = job["id"] + jtype = job["job_type"] + _running_jobs.add(jid) + try: + payload = job.get("payload") or {} + if isinstance(payload, str): + payload = json.loads(payload) + await db_execute("UPDATE arc_jobs SET status='running', started_at=NOW() WHERE id=%s", (jid,)) + log.info(f"[JOB {jid}] Starting {jtype}") + handler = JOB_HANDLERS.get(jtype) + if not handler: + raise ValueError(f"Unknown job type: {jtype}") + result = await handler(payload) + result_str = json.dumps(result, default=str) + await db_execute("UPDATE arc_jobs SET status='done', result=%s, completed_at=NOW() WHERE id=%s", (result_str, jid)) + _stats["done"] += 1 + log.info(f"[JOB {jid}] Done: {jtype}") + except Exception as exc: + err = str(exc) + await db_execute("UPDATE arc_jobs SET status='failed', error=%s, completed_at=NOW() WHERE id=%s", (err[:2000], jid)) + _stats["failed"] += 1 + log.warning(f"[JOB {jid}] Failed: {err[:120]}") + finally: + _running_jobs.discard(jid) + +async def job_poller() -> None: + log.info("Arc Reactor job poller started") + while True: + try: + jobs = await db_fetchall("SELECT * FROM arc_jobs WHERE status='queued' ORDER BY priority DESC, id ASC LIMIT 5") + for job in jobs: + if job["id"] not in _running_jobs: + asyncio.create_task(run_job(job)) + except Exception as exc: + log.error(f"Poller error: {exc}") + await asyncio.sleep(POLL_INTERVAL) + +async def heartbeat_loop() -> None: + while True: + try: + await db_execute( + "UPDATE arc_status SET last_heartbeat=NOW(), active_jobs=%s, jobs_done=%s, jobs_failed=%s WHERE id=1", + (len(_running_jobs), _stats["done"], _stats["failed"]) + ) + except Exception as exc: + log.error(f"Heartbeat error: {exc}") + await asyncio.sleep(HEARTBEAT_INTERVAL) + +# ═══════════════════════════════════════════════════════════════════════════════ +# FASTAPI APP +# ═══════════════════════════════════════════════════════════════════════════════ + +@asynccontextmanager +async def lifespan(app: FastAPI): + log.info(f"◈ JARVIS Arc Reactor v{VERSION} starting on {HOST}:{PORT}") + await get_pool() + await db_execute("UPDATE arc_status SET started_at=NOW(), last_heartbeat=NOW(), version=%s WHERE id=1", (VERSION,)) + asyncio.create_task(job_poller()) + asyncio.create_task(heartbeat_loop()) + asyncio.create_task(guardian_loop()) + asyncio.create_task(mission_trigger_loop()) + asyncio.create_task(clearance_watchdog()) + log.info(f"◈ Arc Reactor online — {len(JOB_HANDLERS)} handlers: {', '.join(JOB_HANDLERS)}") + yield + log.info("◈ Arc Reactor shutting down") + if _pool and not _pool.closed: + _pool.close() + await _pool.wait_closed() + +app = FastAPI(title="JARVIS Arc Reactor", version=VERSION, lifespan=lifespan) + +# ── Mission Ops endpoints ────────────────────────────────────────────────────── + +@app.get("/missions") +async def missions_list(enabled: Optional[int] = None): + if enabled is not None: + rows = await db_fetchall("SELECT * FROM missions WHERE enabled=%s ORDER BY id DESC", (enabled,)) + else: + rows = await db_fetchall("SELECT * FROM missions ORDER BY id DESC") + return rows or [] + +@app.get("/missions/{mission_id}") +async def mission_get(mission_id: int): + m = await db_fetchone("SELECT * FROM missions WHERE id=%s", (mission_id,)) + if not m: + raise HTTPException(status_code=404, detail="Not found") + steps = await db_fetchall( + "SELECT * FROM mission_steps WHERE mission_id=%s ORDER BY step_order ASC", + (mission_id,) + ) + runs = await db_fetchall( + "SELECT id, status, trigger_source, started_at, completed_at FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT 10", + (mission_id,) + ) + return {**m, "steps": steps or [], "recent_runs": runs or []} + +@app.post("/missions") +async def mission_create(req: Request): + body = await req.json() + name = body.get("name", "").strip() + if not name: + raise HTTPException(status_code=400, detail="name required") + desc = body.get("description", "") + ttype = body.get("trigger_type", "manual") + tcfg = json.dumps(body.get("trigger_config") or {}) + enabled = int(body.get("enabled", 1)) + mid = await db_execute( + "INSERT INTO missions (name,description,trigger_type,trigger_config,enabled) VALUES (%s,%s,%s,%s,%s)", + (name, desc, ttype, tcfg, enabled) + ) + steps = body.get("steps") or [] + for i, s in enumerate(steps): + await db_execute( + "INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)", + (mid, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0))) + ) + return {"id": mid, "ok": True} + +@app.put("/missions/{mission_id}") +async def mission_update(mission_id: int, req: Request): + body = await req.json() + fields: list = [] + params: list = [] + for col in ("name", "description", "trigger_type", "enabled"): + if col in body: + fields.append(f"{col}=%s") + params.append(body[col]) + if "trigger_config" in body: + fields.append("trigger_config=%s") + params.append(json.dumps(body["trigger_config"])) + if fields: + params.append(mission_id) + await db_execute(f"UPDATE missions SET {','.join(fields)} WHERE id=%s", params) + if "steps" in body: + await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,)) + for i, s in enumerate(body["steps"]): + await db_execute( + "INSERT INTO mission_steps (mission_id,step_order,label,job_type,job_payload,continue_on_failure) VALUES (%s,%s,%s,%s,%s,%s)", + (mission_id, i, s.get("label",""), s["job_type"], json.dumps(s.get("payload",{})), int(s.get("continue_on_failure",0))) + ) + return {"ok": True} + +@app.delete("/missions/{mission_id}") +async def mission_delete(mission_id: int): + await db_execute("DELETE FROM mission_steps WHERE mission_id=%s", (mission_id,)) + await db_execute("DELETE FROM mission_runs WHERE mission_id=%s", (mission_id,)) + await db_execute("DELETE FROM missions WHERE id=%s", (mission_id,)) + return {"ok": True} + +@app.post("/missions/{mission_id}/run") +async def mission_run_endpoint(mission_id: int, req: Request): + try: + body = await req.json() + except Exception: + body = {} + source = body.get("trigger_source", "manual") if isinstance(body, dict) else "manual" + return await _execute_mission(mission_id, source) + +@app.get("/missions/{mission_id}/runs") +async def mission_runs_list(mission_id: int, limit: int = 20): + rows = await db_fetchall( + "SELECT * FROM mission_runs WHERE mission_id=%s ORDER BY id DESC LIMIT %s", + (mission_id, limit) + ) + return rows or [] + +# ── Clearance FastAPI endpoints ──────────────────────────────────────────────── + +@app.get("/clearance/pending") +async def clearance_pending(): + rows = await db_fetchall( + "SELECT * FROM clearance_requests WHERE status='pending' ORDER BY created_at DESC" + ) + return rows or [] + +@app.get("/clearance/history") +async def clearance_history(limit: int = 50): + rows = await db_fetchall( + "SELECT * FROM clearance_requests ORDER BY created_at DESC LIMIT %s", (limit,) + ) + return rows or [] + +@app.post("/clearance/{cr_id}/approve") +async def clearance_approve(cr_id: int, req: Request): + try: body = await req.json() + except Exception: body = {} + decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin" + cr = await db_fetchone("SELECT * FROM clearance_requests WHERE id=%s AND status='pending'", (cr_id,)) + if not cr: + raise HTTPException(status_code=404, detail="Clearance request not found or not pending") + payload = cr.get("job_payload") or {} + if isinstance(payload, str): + try: payload = json.loads(payload) + except Exception: payload = {} + return await _dispatch_cleared_job(cr_id, cr["job_type"], payload, decided_by=decided_by) + +@app.post("/clearance/{cr_id}/deny") +async def clearance_deny(cr_id: int, req: Request): + try: body = await req.json() + except Exception: body = {} + decided_by = body.get("decided_by", "admin") if isinstance(body, dict) else "admin" + note = body.get("note", "") if isinstance(body, dict) else "" + await db_execute( + "UPDATE clearance_requests SET status='denied', decided_by=%s, decision_note=%s, decided_at=NOW() WHERE id=%s", + (decided_by, note, cr_id) + ) + log.info(f"[CLEARANCE] Request #{cr_id} denied by {decided_by}") + return {"ok": True, "clearance_id": cr_id, "status": "denied"} + +@app.get("/clearance/rules") +async def clearance_rules_list(): + rows = await db_fetchall("SELECT * FROM clearance_rules ORDER BY risk_level DESC, job_type ASC") + return rows or [] + +@app.put("/clearance/rules/{rule_id}") +async def clearance_rule_update(rule_id: int, req: Request): + body = await req.json() + fields: list = [] + params: list = [] + for col in ("risk_level", "require_approval", "auto_approve_after_min", "enabled", "description"): + if col in body: + fields.append(f"{col}=%s") + params.append(body[col]) + if not fields: + raise HTTPException(status_code=400, detail="Nothing to update") + params.append(rule_id) + await db_execute(f"UPDATE clearance_rules SET {','.join(fields)} WHERE id=%s", params) + return {"ok": True} + +@app.post("/clearance/rules") +async def clearance_rule_create(req: Request): + body = await req.json() + job_type = body.get("job_type", "").strip() + if not job_type: + raise HTTPException(status_code=400, detail="job_type required") + rid = await db_execute( + "INSERT INTO clearance_rules (job_type,risk_level,require_approval,auto_approve_after_min,description,enabled) " + "VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE " + "risk_level=%s,require_approval=%s,auto_approve_after_min=%s,description=%s,enabled=%s", + (job_type, + body.get("risk_level","high"), int(body.get("require_approval",1)), + body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1)), + body.get("risk_level","high"), int(body.get("require_approval",1)), + body.get("auto_approve_after_min"), body.get("description",""), int(body.get("enabled",1))) + ) + return {"ok": True, "id": rid} + +# ── Memory Core FastAPI endpoints ────────────────────────────────────────────── + +@app.get("/memory/facts") +async def memory_facts_list(limit: int = 100, category: str = "", search: str = ""): + where = "WHERE active=1" + params: list = [] + if category: + where += " AND category=%s" + params.append(category) + if search: + where += " AND (subject LIKE %s OR predicate LIKE %s OR object LIKE %s)" + params += [f"%{search}%"] * 3 + rows = await db_fetchall( + f"SELECT * FROM memory_facts {where} ORDER BY confirmed_count DESC, last_confirmed_at DESC LIMIT %s", + params + [limit] + ) + return rows or [] + +@app.post("/memory/facts") +async def memory_fact_create(req: Request): + body = await req.json() + subj = str(body.get("subject", "")).strip()[:255] + pred = str(body.get("predicate", "is")).strip()[:255] + obj = str(body.get("object", "")).strip() + cat = body.get("category", "fact") + if not subj or not obj: + raise HTTPException(status_code=400, detail="subject and object required") + valid = {"preference","person","place","routine","goal","fact","instruction"} + if cat not in valid: cat = "fact" + fid = await db_execute( + """INSERT INTO memory_facts (category, subject, predicate, object, confidence, source) + VALUES (%s,%s,%s,%s,1.0,'explicit') + ON DUPLICATE KEY UPDATE + object=VALUES(object), confidence=1.0, + confirmed_count=confirmed_count+1, last_confirmed_at=NOW(), active=1""", + (cat, subj, pred, obj) + ) + return {"ok": True, "id": fid} + +@app.delete("/memory/facts/{fact_id}") +async def memory_fact_delete(fact_id: int): + await db_execute("UPDATE memory_facts SET active=0 WHERE id=%s", (fact_id,)) + return {"ok": True} + +@app.delete("/memory/facts") +async def memory_facts_clear(category: str = ""): + if category: + await db_execute("UPDATE memory_facts SET active=0 WHERE category=%s", (category,)) + else: + await db_execute("UPDATE memory_facts SET active=0 WHERE active=1", ()) + return {"ok": True} + +@app.get("/memory/context") +async def memory_context(message: str = "", limit: int = 15): + """Return relevant memory facts for a given message (for prompt injection).""" + params: list = [] + if message.strip(): + words = [w for w in message.lower().split() if len(w) > 3][:10] + if words: + like_clauses = " OR ".join(["subject LIKE %s OR object LIKE %s"] * len(words)) + for w in words: + params += [f"%{w}%", f"%{w}%"] + rows = await db_fetchall( + f"SELECT * FROM memory_facts WHERE active=1 AND ({like_clauses}) " + f"ORDER BY confirmed_count DESC, confidence DESC LIMIT %s", + params + [limit] + ) or [] + # Pad with top general facts if sparse + if len(rows) < 5: + existing = [r["id"] for r in rows] + excl = ("AND id NOT IN (" + ",".join(["%s"]*len(existing)) + ")") if existing else "" + extra = await db_fetchall( + f"SELECT * FROM memory_facts WHERE active=1 {excl} " + f"ORDER BY confirmed_count DESC LIMIT %s", + existing + [max(1, limit - len(rows))] + ) or [] + rows = rows + extra + else: + rows = await db_fetchall( + "SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,) + ) or [] + else: + rows = await db_fetchall( + "SELECT * FROM memory_facts WHERE active=1 ORDER BY confirmed_count DESC LIMIT %s", (limit,) + ) or [] + lines = [f"[{r['category']}] {r['subject']} {r['predicate']}: {r['object']}" for r in rows] + return {"facts": rows, "context": "\n".join(lines) if lines else ""} + +@app.get("/memory/stats") +async def memory_stats(): + total = await db_fetchone("SELECT COUNT(*) cnt FROM memory_facts WHERE active=1") + by_cat = await db_fetchall( + "SELECT category, COUNT(*) cnt FROM memory_facts WHERE active=1 GROUP BY category ORDER BY cnt DESC" + ) + recent = await db_fetchall( + "SELECT * FROM memory_facts WHERE active=1 ORDER BY last_confirmed_at DESC LIMIT 5" + ) + return { + "total": total["cnt"] if total else 0, + "by_category": by_cat or [], + "recent": recent or [], + } + +@app.get("/status") +async def status(): + row = await db_fetchone("SELECT * FROM arc_status WHERE id=1") + queued = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='queued'") + running = await db_fetchone("SELECT COUNT(*) cnt FROM arc_jobs WHERE status='running'") + return { + "online": True, "version": VERSION, + "last_heartbeat": row["last_heartbeat"].isoformat() if row and row["last_heartbeat"] else None, + "started_at": row["started_at"].isoformat() if row and row["started_at"] else None, + "jobs_done": row["jobs_done"] if row else 0, + "jobs_failed": row["jobs_failed"] if row else 0, + "active_jobs": len(_running_jobs), + "queued_jobs": queued["cnt"] if queued else 0, + "running_jobs": running["cnt"] if running else 0, + "handlers": list(JOB_HANDLERS.keys()), + } + +@app.post("/job") +async def create_job(request: Request): + body = await request.json() + jtype = body.get("type", "") + payload = body.get("payload", {}) + priority = int(body.get("priority", 5)) + created_by = body.get("created_by", "jarvis") + if not jtype: + raise HTTPException(status_code=400, detail="Missing job type") + clearance = await _check_clearance(jtype, payload, created_by) + if clearance: + return clearance + jid = await db_execute( + "INSERT INTO arc_jobs (job_type, payload, priority, created_by) VALUES (%s, %s, %s, %s)", + (jtype, json.dumps(payload), priority, created_by) + ) + return {"job_id": jid, "status": "queued"} + +@app.get("/job/{job_id}") +async def get_job(job_id: int): + job = await db_fetchone("SELECT * FROM arc_jobs WHERE id=%s", (job_id,)) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + result = job.get("result") + if result and isinstance(result, str): + try: + result = json.loads(result) + except Exception: + pass + return { + "id": job["id"], "job_type": job["job_type"], "status": job["status"], + "result": result, "error": job.get("error"), + "created_at": job["created_at"].isoformat() if job["created_at"] else None, + "started_at": job["started_at"].isoformat() if job["started_at"] else None, + "completed_at": job["completed_at"].isoformat() if job["completed_at"] else None, + } + +@app.delete("/job/{job_id}") +async def cancel_job(job_id: int): + await db_execute("UPDATE arc_jobs SET status='cancelled' WHERE id=%s AND status='queued'", (job_id,)) + return {"ok": True} + +@app.get("/jobs") +async def list_jobs(limit: int = 50, status: str = ""): + if status: + rows = await db_fetchall( + "SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs WHERE status=%s ORDER BY id DESC LIMIT %s", + (status, limit) + ) + else: + rows = await db_fetchall( + "SELECT id,job_type,status,priority,created_by,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s", + (limit,) + ) + for r in rows: + if r.get("created_at"): r["created_at"] = r["created_at"].isoformat() + if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat() + return rows + +@app.get("/jobs/recent") +async def recent_jobs(limit: int = 10, job_type: str = ""): + if job_type: + rows = await db_fetchall( + "SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs WHERE job_type=%s ORDER BY id DESC LIMIT %s", + (job_type, limit) + ) + else: + rows = await db_fetchall( + "SELECT id,job_type,status,result,created_at,completed_at FROM arc_jobs ORDER BY id DESC LIMIT %s", + (limit,) + ) + for r in rows: + if r.get("created_at"): r["created_at"] = r["created_at"].isoformat() + if r.get("completed_at"): r["completed_at"] = r["completed_at"].isoformat() + if r.get("result") and isinstance(r["result"], str): + try: + r["result"] = json.loads(r["result"]) + except Exception: + pass + return rows + +@app.get("/triage") +async def get_triage(limit: int = 30, account: str = "gmail"): + rows = await db_fetchall( + """SELECT id, from_name, from_email, subject, date_received, category, priority, + summary, draft_reply, action_taken, created_at + FROM email_triage WHERE account=%s + ORDER BY priority DESC, created_at DESC LIMIT %s""", + (account, limit) + ) + for r in rows: + if r.get("date_received"): r["date_received"] = str(r["date_received"]) + if r.get("created_at"): r["created_at"] = str(r["created_at"]) + return rows + +@app.post("/triage/{triage_id}/action") +async def triage_action(triage_id: int, request: Request): + body = await request.json() + action = body.get("action", "dismissed") + await db_execute("UPDATE email_triage SET action_taken=%s WHERE id=%s", (action, triage_id)) + return {"ok": True} + +@app.delete("/jobs/purge") +async def purge_jobs(): + await db_execute("DELETE FROM arc_jobs WHERE status IN ('done','failed','cancelled') AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)") + return {"ok": True} + +# ── VISION PROTOCOL endpoints ───────────────────────────────────────────────── + +@app.get("/screenshots") +async def list_screenshots(limit: int = 20, agent: str = ""): + if agent: + rows = await db_fetchall( + "SELECT id, agent_id, hostname, method, width, height, file_size, " + " vision_analysis, vision_provider, created_at " + "FROM agent_screenshots WHERE hostname LIKE %s " + "ORDER BY created_at DESC LIMIT %s", + (f"%{agent}%", limit) + ) + else: + rows = await db_fetchall( + "SELECT id, agent_id, hostname, method, width, height, file_size, " + " vision_analysis, vision_provider, created_at " + "FROM agent_screenshots ORDER BY created_at DESC LIMIT %s", + (limit,) + ) + return rows or [] + +@app.get("/screenshots/{screenshot_id}") +async def get_screenshot(screenshot_id: int): + row = await db_fetchone( + "SELECT * FROM agent_screenshots WHERE id=%s", (screenshot_id,) + ) + if not row: + from fastapi import HTTPException + raise HTTPException(status_code=404, detail="Screenshot not found") + return row + +@app.delete("/screenshots/{screenshot_id}") +async def delete_screenshot(screenshot_id: int): + await db_execute("DELETE FROM agent_screenshots WHERE id=%s", (screenshot_id,)) + return {"ok": True} + +@app.delete("/screenshots/purge") +async def purge_screenshots(): + await db_execute("DELETE FROM agent_screenshots WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY)") + return {"ok": True} + +# ── GUARDIAN MODE endpoints ─────────────────────────────────────────────────── + +@app.get("/guardian/status") +async def guardian_status(): + cfg = await _guardian_get_config() + counts = await db_fetchone( + """SELECT + SUM(acknowledged=0) AS unread, + SUM(severity='critical' AND acknowledged=0) AS critical_unread, + SUM(severity='warning' AND acknowledged=0) AS warning_unread, + SUM(created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR)) AS events_24h + FROM guardian_events""" + ) + return { + "enabled": cfg.get("enabled", "1") == "1", + "scan_interval": int(cfg.get("scan_interval", 120)), + "last_scan": _guardian_state.get("last_scan"), + "last_sitrep": _guardian_state.get("last_sitrep"), + "thresholds": { + "cpu": float(cfg.get("cpu_threshold", 85)), + "memory": float(cfg.get("mem_threshold", 88)), + "disk": float(cfg.get("disk_threshold", 88)), + "offline_minutes": int(cfg.get("offline_minutes", 3)), + }, + "counts": dict(counts) if counts else {}, + } + +@app.get("/guardian/events") +async def guardian_events_list(limit: int = 30, unread: bool = False, + severity: str = "", since: str = ""): + wheres = [] + params: list = [] + if unread: + wheres.append("acknowledged = 0") + if severity: + wheres.append("severity = %s"); params.append(severity) + if since: + wheres.append("created_at > %s"); params.append(since) + where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else "" + params.append(limit) + rows = await db_fetchall( + f"SELECT * FROM guardian_events {where_sql} ORDER BY created_at DESC LIMIT %s", + params + ) + return rows or [] + +@app.post("/guardian/events/{event_id}/ack") +async def guardian_ack(event_id: int): + await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE id=%s", (event_id,)) + return {"ok": True} + +@app.post("/guardian/events/ack_all") +async def guardian_ack_all(): + await db_execute("UPDATE guardian_events SET acknowledged=1 WHERE acknowledged=0") + return {"ok": True} + +@app.get("/guardian/chat") +async def guardian_chat_events(since: str = ""): + """Return proactive guardian messages injected into conversations.""" + if since: + rows = await db_fetchall( + "SELECT id, message, created_at FROM conversations " + "WHERE session_id='guardian' AND created_at > %s ORDER BY created_at ASC", + (since,) + ) + else: + rows = await db_fetchall( + "SELECT id, message, created_at FROM conversations " + "WHERE session_id='guardian' ORDER BY created_at DESC LIMIT 10" + ) + return rows or [] + +# ── COMMS v2 endpoints ─────────────────────────────────────────────────────── + +@app.get("/comms/sent") +async def comms_sent(limit: int = 50, status: str = ""): + if status: + rows = await db_fetchall( + "SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id " + "FROM email_sent WHERE status=%s ORDER BY sent_at DESC LIMIT %s", + (status, limit) + ) + else: + rows = await db_fetchall( + "SELECT id,account,to_email,to_name,subject,status,sent_at,triage_id " + "FROM email_sent ORDER BY sent_at DESC LIMIT %s", + (limit,) + ) + return rows or [] + +@app.get("/comms/sent/{sent_id}") +async def comms_sent_get(sent_id: int): + row = await db_fetchone("SELECT * FROM email_sent WHERE id=%s", (sent_id,)) + if not row: + raise HTTPException(status_code=404, detail="Not found") + return row + +@app.delete("/comms/sent/{sent_id}") +async def comms_sent_delete(sent_id: int): + await db_execute("DELETE FROM email_sent WHERE id=%s", (sent_id,)) + return {"ok": True} + +if __name__ == "__main__": + uvicorn.run("reactor:app", host=HOST, port=PORT, log_level="info", access_log=False) diff --git a/public_html/.gitignore b/public_html/.gitignore new file mode 100644 index 0000000..fee9217 --- /dev/null +++ b/public_html/.gitignore @@ -0,0 +1 @@ +*.conf