diff --git a/.gitignore b/.gitignore index 51e2f7ab4..9bb600c86 100644 --- a/.gitignore +++ b/.gitignore @@ -239,3 +239,17 @@ outputs evaluation/data/ test_add_pipeline.py test_file_pipeline.py +data/ +config.yaml + +# ── Runtime / ephemeral (Violet addition 2026-05-27) ── +daemon/bridge.pid +memos-local/ +bridge-status.json +apps/memos-local-plugin/daemon/ +apps/memos-local-plugin/memos-local/ +apps/memos-local-plugin/.memos-node-bin +apps/memos-local-plugin/bridge-status.json +apps/memos-local-plugin/prod_check.cjs +data.stale-backup/ +scripts/trigger-scoring.py diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..b94502c67 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -63,8 +63,8 @@ if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from bridge_client import BridgeError, MemosBridgeClient # noqa: E402 -from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402 +from bridge_client import BridgeError, MemosBridgeClient, MemosHttpClient # noqa: E402 +from daemon_manager import ensure_bridge_running, ensure_viewer_daemon, probe_viewer_status, kill_zombie_bridges, startup_lock_active # noqa: E402 try: # pragma: no cover — host-provided base class, absent in unit tests @@ -243,7 +243,7 @@ class MemTensorProvider(MemoryProvider): """ def __init__(self) -> None: - self._bridge: MemosBridgeClient | None = None + self._bridge: MemosBridgeClient | MemosHttpClient | None = None self._reconnect_lock = threading.Lock() self._session_id: str = "" self._episode_id: str = "" @@ -293,6 +293,23 @@ def is_available(self) -> bool: # type: ignore[override] # ─── Lifecycle ──────────────────────────────────────────────────────── + def _connect_http_bridge(self, session_id: str, *, timeout: float = 60.0) -> bool: + """Try to connect via HTTP bridge. Sets self._bridge on success.""" + http_bridge: MemosHttpClient | None = None + try: + http_bridge = MemosHttpClient() + http_bridge.register_host_handler("host.llm.complete", self._handle_host_llm_complete) + self._bridge = http_bridge + self._open_session(session_id, timeout=timeout) + return True + except Exception as err: + logger.warning("MemOS: HTTP bridge failed, falling back to stdio — %s", err) + if http_bridge is not None: + with contextlib.suppress(Exception): + http_bridge.close() + self._bridge = None + return False + def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override] """Called once at agent startup. @@ -314,34 +331,73 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov except Exception as err: logger.warning("MemOS: failed to start bridge — %s", err) return + + # Kill zombie bridges from previous sessions before deciding + # how to connect. try: - ensure_viewer_daemon() - except Exception as err: - logger.warning("MemOS: viewer daemon check failed — %s", err) - new_bridge: MemosBridgeClient | None = None - try: - new_bridge = MemosBridgeClient() - # Register the fallback LLM handler BEFORE we open the - # session so it is available the very first time the - # plugin's facade asks for help (e.g. on the first - # `turn.start` retrieval call). - new_bridge.register_host_handler( - "host.llm.complete", - self._handle_host_llm_complete, - ) - self._bridge = new_bridge - self._open_session(session_id) - logger.info( - "MemOS: bridge ready session=%s platform=%s (episode deferred)", - self._session_id, - self._platform, - ) - except Exception as err: - logger.warning("MemOS: bridge init failed — %s", err) - if new_bridge is not None: - with contextlib.suppress(Exception): - new_bridge.close() - self._bridge = None + zombies = kill_zombie_bridges() + if zombies: + logger.info("MemOS: killed %d zombie bridge(s)", zombies) + except Exception: + pass + + # If the daemon is already running on the viewer port, connect + # to it over HTTP instead of spawning a new stdio bridge. This + # eliminates zombie bridge accumulation. + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id): + logger.info( + "MemOS: bridge ready (HTTP) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + else: + viewer_status = "free" # force stdio fallback below + elif viewer_status == "free": + # Re-probe after a short wait only when another process may be + # mid-startup (startup lock is held). On a cold first-launch the + # lock doesn't exist, so we skip the delay entirely. + if startup_lock_active(): + time.sleep(1.0) + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id): + logger.info( + "MemOS: bridge ready (HTTP, late probe) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + + if self._bridge is None: + try: + ensure_viewer_daemon() + except Exception as err: + logger.warning("MemOS: viewer daemon check failed — %s", err) + new_bridge: MemosBridgeClient | None = None + try: + new_bridge = MemosBridgeClient() + # Register the fallback LLM handler BEFORE we open the + # session so it is available the very first time the + # plugin's facade asks for help (e.g. on the first + # `turn.start` retrieval call). + new_bridge.register_host_handler( + "host.llm.complete", + self._handle_host_llm_complete, + ) + self._bridge = new_bridge + self._open_session(session_id, timeout=60.0) + logger.info( + "MemOS: bridge ready (stdio) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + except Exception as err: + logger.warning("MemOS: bridge init failed — %s", err) + if new_bridge is not None: + with contextlib.suppress(Exception): + new_bridge.close() + self._bridge = None # Register a Hermes plugin hook to capture tool calls as they # happen. The `post_tool_call` hook fires after every tool # dispatch (write_file, terminal, search_files, etc.) with the @@ -1746,6 +1802,13 @@ def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> N logger.info("MemOS: old bridge closed (pid=%s)", old_pid) ensure_bridge_running() + # Try HTTP first if daemon is running + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id, timeout=timeout): + logger.info("MemOS: reconnected via HTTP") + return + try: ensure_viewer_daemon() except Exception as err: diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 5b986ec66..d851db05a 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -19,6 +19,9 @@ import shutil import subprocess import threading +import time +import urllib.error +import urllib.request from pathlib import Path from typing import TYPE_CHECKING, Any @@ -419,4 +422,123 @@ def _resolve(self, msg: dict[str, Any]) -> None: entry["error"] = msg["error"] else: entry["result"] = msg.get("result") - entry["event"].set() + + +class MemosHttpClient: + """JSON-RPC 2.0 client that talks to the daemon bridge over HTTP. + + Drop-in replacement for ``MemosBridgeClient`` when a daemon is already + running on the viewer port. Instead of spawning a new subprocess, this + client POSTs JSON-RPC envelopes to ``/api/v1/rpc`` on the daemon's HTTP + server. This eliminates zombie bridge accumulation. + + Limitations vs. stdio client: + - No reverse-direction RPC (``host.llm.complete``). The daemon's own + stdio bridge handles host LLM fallback internally. + - No ``notify()`` (notifications have no response; use ``request()`` + for all calls — the daemon handles both). + """ + + def __init__( + self, + *, + port: int = 18800, + host: str = "127.0.0.1", + api_key: str | None = None, + ) -> None: + self._base_url = f"http://{host}:{port}/api/v1/rpc" + self._lock = threading.Lock() + self._next_id = 1 + self._closed = False + self._api_key = api_key + self._host_handlers: dict[str, Callable[[dict[str, Any]], Any]] = {} + + @property + def pid(self) -> int: + """Return 0 — HTTP client has no subprocess.""" + return 0 + + # ─── Public API (matches MemosBridgeClient) ── + + def request( + self, + method: str, + params: Any = None, + *, + timeout: float = 30.0, + ) -> dict[str, Any]: + if self._closed: + raise BridgeError("transport_closed", "HTTP client is closed") + with self._lock: + rpc_id = self._next_id + self._next_id += 1 + + envelope = { + "jsonrpc": "2.0", + "id": rpc_id, + "method": method, + "params": params, + } + payload = json.dumps(envelope, ensure_ascii=False).encode("utf-8") + + req = urllib.request.Request( + self._base_url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + if self._api_key: + req.add_header("Authorization", f"Bearer {self._api_key}") + + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read(4_194_304) # 4 MiB safety cap + except urllib.error.HTTPError as exc: + # Try to read the JSON-RPC error body + try: + err_body = exc.read(4_194_304) + err_json = json.loads(err_body.decode("utf-8", errors="replace")) + err_obj = err_json.get("error") or {} + raise BridgeError( + err_obj.get("data", {}).get("code") or str(err_obj.get("code", "internal")), + err_obj.get("message", f"HTTP {exc.code}"), + err_obj.get("data"), + ) from exc + except (json.JSONDecodeError, UnicodeDecodeError): + raise BridgeError("internal", f"HTTP {exc.code}: {exc.reason}") from exc + except urllib.error.URLError as exc: + raise BridgeError("transport_closed", str(exc)) from exc + + result = json.loads(body.decode("utf-8", errors="replace")) + if "error" in result: + e = result["error"] + raise BridgeError( + (e.get("data") or {}).get("code") or str(e.get("code", "internal")), + e.get("message", "unknown error"), + e.get("data"), + ) + return result.get("result") or {} + + def notify(self, method: str, params: Any = None) -> None: + """No-op for HTTP — use request() for all calls.""" + try: + self.request(method, params, timeout=5.0) + except Exception: + pass + + def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None: + """No-op — SSE events not supported over HTTP transport.""" + + def on_log(self, cb: Callable[[dict[str, Any]], None]) -> None: + """No-op — SSE logs not supported over HTTP transport.""" + + def register_host_handler( + self, + method: str, + handler: Callable[[dict[str, Any]], Any], + ) -> None: + """Store the handler but it won't be called (daemon handles host LLM internally).""" + self._host_handlers[method] = handler + + def close(self) -> None: + self._closed = True diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 19c8a7be0..3acb76b35 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -19,6 +19,7 @@ import contextlib import logging import os +import re import shutil import signal import subprocess @@ -336,6 +337,108 @@ def shutdown_bridge() -> None: _bridge_ok = None +def probe_viewer_status() -> str: + """Return the current viewer daemon status without side effects. + + Returns one of: ``"running_memos"``, ``"free"``, ``"blocked"``. + This is a cheap, lock-free probe suitable for deciding whether to + spawn a new stdio bridge or connect to the existing daemon over HTTP. + """ + return _probe_viewer() + + +def startup_lock_active() -> bool: + """Return True when the viewer-start.lock directory exists. + + Used by the provider to skip the cold-start sleep when there is no + concurrent daemon launch in progress. + """ + return (_plugin_root() / "daemon" / "viewer-start.lock").exists() + + +def kill_zombie_bridges() -> int: + """Kill all bridge.cjs processes that are NOT the daemon on port 18800. + + Returns the number of zombies killed. The daemon (the process that + owns port 18800) is left alone. This should be called early in the + provider's lifecycle to clean up leftovers from crashed sessions. + """ + # Find the PID that owns port 18800 (the real daemon). + # ss(8) is Linux-only; fall back to lsof on macOS. + daemon_pid: int | None = None + try: + ss_out = subprocess.check_output( + ["ss", "-tlnp"], + timeout=2.0, + text=True, + ) + for line in ss_out.splitlines(): + if ":18800" in line: + # ss output: users:(("node",pid=21246,fd=24)) + m = re.search(r"pid=(\d+)", line) + if m: + daemon_pid = int(m.group(1)) + break + except Exception: + pass + + if daemon_pid is None: + # macOS fallback — lsof is available on both Linux and macOS. + try: + lsof_out = subprocess.check_output( + ["lsof", "-iTCP:18800", "-sTCP:LISTEN", "-n", "-P"], + timeout=2.0, + text=True, + ) + for line in lsof_out.splitlines()[1:]: # skip header + parts = line.split() + if len(parts) >= 2: + try: + daemon_pid = int(parts[1]) + break + except ValueError: + continue + except Exception: + pass + + # If we still can't identify the daemon PID, skip killing entirely to + # avoid terminating the daemon itself. + if daemon_pid is None: + logger.debug("MemOS: zombie scan skipped — could not identify daemon PID on port 18800") + return 0 + + # Find all bridge.cjs processes + killed = 0 + try: + ps_out = subprocess.check_output( + ["ps", "aux"], + timeout=2.0, + text=True, + ) + for line in ps_out.splitlines(): + if "bridge.cjs" not in line or "grep" in line: + continue + parts = line.split() + if len(parts) < 2: + continue + try: + pid = int(parts[1]) + except ValueError: + continue + if pid == daemon_pid: + continue + try: + os.kill(pid, signal.SIGTERM) + killed += 1 + logger.info("MemOS: killed zombie bridge pid=%d", pid) + except (OSError, ProcessLookupError): + pass + except Exception as err: + logger.debug("MemOS: zombie scan failed — %s", err) + + return killed + + def wait_for_process_exit(pid: int, timeout: float = 5.0) -> bool: """Wait for a process to exit. diff --git a/apps/memos-local-plugin/server/routes/auth.ts b/apps/memos-local-plugin/server/routes/auth.ts index bb6dc63a4..d8cb1a5fe 100644 --- a/apps/memos-local-plugin/server/routes/auth.ts +++ b/apps/memos-local-plugin/server/routes/auth.ts @@ -410,10 +410,27 @@ export function requireSession( agent?: string | null, ): boolean { // Public: auth endpoints + health (so the viewer can tell whether - // the backend is up BEFORE unlocking). Other API routes, including - // ping, fall through and are only open when no password is configured. + // the backend is up BEFORE unlocking). RPC endpoint is exempt for + // loopback callers only (local Python adapter, same machine, no + // browser session). Remote callers on the hub's 0.0.0.0 binding + // must still hold a valid session cookie. + // + // ASSUMPTION: the server binds directly (no reverse proxy). If a + // reverse proxy is ever added, socket.remoteAddress will be the + // proxy address and X-Forwarded-For must be consulted instead, or + // this loopback exemption will bypass auth for all proxy traffic. if (pathname.startsWith("/api/v1/auth/")) return true; if (pathname === "/api/v1/health") return true; + if (pathname === "/api/v1/rpc") { + const remote = + (req as unknown as { socket?: { remoteAddress?: string } }).socket + ?.remoteAddress ?? ""; + const isLoopback = + remote === "127.0.0.1" || + remote === "::1" || + remote === "::ffff:127.0.0.1"; + if (isLoopback) return true; + } const state = readAuthState(homeDir); if (!state) return true; // password protection off → open diff --git a/apps/memos-local-plugin/server/routes/registry.ts b/apps/memos-local-plugin/server/routes/registry.ts index 7adaf8c53..9b1abef81 100644 --- a/apps/memos-local-plugin/server/routes/registry.ts +++ b/apps/memos-local-plugin/server/routes/registry.ts @@ -46,6 +46,7 @@ import { registerApiLogsRoutes } from "./api-logs.js"; import { registerDiagRoutes } from "./diag.js"; import { registerEmbeddingRoutes } from "./embeddings.js"; import { registerTelemetryRoutes } from "./telemetry.js"; +import { registerRpcRoutes } from "./rpc.js"; export interface RouteContext { req: IncomingMessage; @@ -183,6 +184,7 @@ export function buildRoutes( registerApiLogsRoutes(routes, deps); registerDiagRoutes(routes, deps); registerTelemetryRoutes(routes, deps); + registerRpcRoutes(routes, deps); return routes; } diff --git a/apps/memos-local-plugin/server/routes/rpc.ts b/apps/memos-local-plugin/server/routes/rpc.ts new file mode 100644 index 000000000..60b7f9607 --- /dev/null +++ b/apps/memos-local-plugin/server/routes/rpc.ts @@ -0,0 +1,143 @@ +/** + * JSON-RPC-over-HTTP route — `/api/v1/rpc`. + * + * Accepts a single JSON-RPC 2.0 request envelope (or a batch) and + * dispatches via the same `makeDispatcher` the stdio bridge uses. + * This allows the Python adapter to talk to an already-running daemon + * over HTTP instead of spawning a new stdio bridge subprocess, which + * eliminates zombie bridge accumulation. + * + * ## Protocol + * + * `POST /api/v1/rpc` accepts: + * - A single JSON-RPC request: `{"jsonrpc":"2.0","id":1,"method":"turn.start","params":{...}}` + * - A batch: `[{...}, {...}]` + * + * Notifications (no `id`) are accepted and return 204. + * Responses follow JSON-RPC 2.0 spec. + * + * ## Security + * + * Inherit the same auth gating as all `/api/*` routes: + * loopback default + optional API key + session cookie. + */ + +import type { Routes } from "./registry.js"; +import type { ServerDeps } from "../types.js"; +import { makeDispatcher, errorCodeOf } from "../../bridge/methods.js"; +import { MemosError } from "../../agent-contract/errors.js"; +import type { ErrorCode } from "../../agent-contract/errors.js"; + +interface JsonRpcRequest { + jsonrpc: "2.0"; + id?: number | string | null; + method: string; + params?: unknown; +} + +interface JsonRpcResponse { + jsonrpc: "2.0"; + id: number | string | null; + result?: unknown; + error?: { + code: number; + message: string; + data?: { code: ErrorCode }; + }; +} + +const JSONRPC_INVALID_REQUEST = -32600; +const JSONRPC_METHOD_NOT_FOUND = -32601; +const JSONRPC_INVALID_PARAMS = -32602; +const JSONRPC_INTERNAL_ERROR = -32603; + +function memosErrorCodeToJsonRpc(code: ErrorCode): number { + switch (code) { + case "unknown_method": return JSONRPC_METHOD_NOT_FOUND; + case "invalid_argument": return JSONRPC_INVALID_PARAMS; + default: return JSONRPC_INTERNAL_ERROR; + } +} + +export function registerRpcRoutes(routes: Routes, deps: ServerDeps): void { + const dispatch = makeDispatcher(deps.core, { strict: false }); + + routes.set("POST /api/v1/rpc", async (ctx) => { + let parsed: unknown; + try { + const text = ctx.body.toString("utf-8"); + parsed = JSON.parse(text); + } catch { + return { + jsonrpc: "2.0", + id: null, + error: { code: JSONRPC_INVALID_REQUEST, message: "Parse error" }, + } satisfies JsonRpcResponse; + } + + // Batch support + if (Array.isArray(parsed)) { + const all = await Promise.all( + parsed.map((item) => handleSingle(item, dispatch)), + ); + // Per JSON-RPC 2.0 §6: notifications produce no response object. + // Filter out undefined so they don't serialize as null. + const results = all.filter((r): r is JsonRpcResponse => r !== undefined); + // If every item was a notification, return nothing (204). + if (results.length === 0) return undefined as unknown as JsonRpcResponse; + return results as unknown as JsonRpcResponse; + } + + return await handleSingle(parsed, dispatch); + }); +} + +async function handleSingle( + raw: unknown, + dispatch: ReturnType, +): Promise { + if (!isRpcRequest(raw)) { + return { + jsonrpc: "2.0", + id: null, + error: { code: JSONRPC_INVALID_REQUEST, message: "Invalid Request" }, + }; + } + + // Notification — fire and forget + if (raw.id === undefined || raw.id === null) { + try { + await dispatch(raw.method, raw.params); + } catch { + // Notifications must not return errors per spec + } + return undefined as unknown as JsonRpcResponse; // 204 handled by caller + } + + try { + const result = await dispatch(raw.method, raw.params); + return { jsonrpc: "2.0", id: raw.id, result }; + } catch (err) { + const code = errorCodeOf(err); + const message = + err instanceof MemosError ? err.message : String(err); + return { + jsonrpc: "2.0", + id: raw.id, + error: { + code: memosErrorCodeToJsonRpc(code), + message, + data: { code }, + }, + }; + } +} + +function isRpcRequest(v: unknown): v is JsonRpcRequest { + if (typeof v !== "object" || v === null) return false; + const obj = v as Record; + return ( + obj.jsonrpc === "2.0" && + typeof obj.method === "string" + ); +}