diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..b94502c67 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -63,8 +63,8 @@ if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from bridge_client import BridgeError, MemosBridgeClient # noqa: E402 -from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402 +from bridge_client import BridgeError, MemosBridgeClient, MemosHttpClient # noqa: E402 +from daemon_manager import ensure_bridge_running, ensure_viewer_daemon, probe_viewer_status, kill_zombie_bridges, startup_lock_active # noqa: E402 try: # pragma: no cover — host-provided base class, absent in unit tests @@ -243,7 +243,7 @@ class MemTensorProvider(MemoryProvider): """ def __init__(self) -> None: - self._bridge: MemosBridgeClient | None = None + self._bridge: MemosBridgeClient | MemosHttpClient | None = None self._reconnect_lock = threading.Lock() self._session_id: str = "" self._episode_id: str = "" @@ -293,6 +293,23 @@ def is_available(self) -> bool: # type: ignore[override] # ─── Lifecycle ──────────────────────────────────────────────────────── + def _connect_http_bridge(self, session_id: str, *, timeout: float = 60.0) -> bool: + """Try to connect via HTTP bridge. Sets self._bridge on success.""" + http_bridge: MemosHttpClient | None = None + try: + http_bridge = MemosHttpClient() + http_bridge.register_host_handler("host.llm.complete", self._handle_host_llm_complete) + self._bridge = http_bridge + self._open_session(session_id, timeout=timeout) + return True + except Exception as err: + logger.warning("MemOS: HTTP bridge failed, falling back to stdio — %s", err) + if http_bridge is not None: + with contextlib.suppress(Exception): + http_bridge.close() + self._bridge = None + return False + def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override] """Called once at agent startup. @@ -314,34 +331,73 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov except Exception as err: logger.warning("MemOS: failed to start bridge — %s", err) return + + # Kill zombie bridges from previous sessions before deciding + # how to connect. try: - ensure_viewer_daemon() - except Exception as err: - logger.warning("MemOS: viewer daemon check failed — %s", err) - new_bridge: MemosBridgeClient | None = None - try: - new_bridge = MemosBridgeClient() - # Register the fallback LLM handler BEFORE we open the - # session so it is available the very first time the - # plugin's facade asks for help (e.g. on the first - # `turn.start` retrieval call). - new_bridge.register_host_handler( - "host.llm.complete", - self._handle_host_llm_complete, - ) - self._bridge = new_bridge - self._open_session(session_id) - logger.info( - "MemOS: bridge ready session=%s platform=%s (episode deferred)", - self._session_id, - self._platform, - ) - except Exception as err: - logger.warning("MemOS: bridge init failed — %s", err) - if new_bridge is not None: - with contextlib.suppress(Exception): - new_bridge.close() - self._bridge = None + zombies = kill_zombie_bridges() + if zombies: + logger.info("MemOS: killed %d zombie bridge(s)", zombies) + except Exception: + pass + + # If the daemon is already running on the viewer port, connect + # to it over HTTP instead of spawning a new stdio bridge. This + # eliminates zombie bridge accumulation. + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id): + logger.info( + "MemOS: bridge ready (HTTP) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + else: + viewer_status = "free" # force stdio fallback below + elif viewer_status == "free": + # Re-probe after a short wait only when another process may be + # mid-startup (startup lock is held). On a cold first-launch the + # lock doesn't exist, so we skip the delay entirely. + if startup_lock_active(): + time.sleep(1.0) + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id): + logger.info( + "MemOS: bridge ready (HTTP, late probe) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + + if self._bridge is None: + try: + ensure_viewer_daemon() + except Exception as err: + logger.warning("MemOS: viewer daemon check failed — %s", err) + new_bridge: MemosBridgeClient | None = None + try: + new_bridge = MemosBridgeClient() + # Register the fallback LLM handler BEFORE we open the + # session so it is available the very first time the + # plugin's facade asks for help (e.g. on the first + # `turn.start` retrieval call). + new_bridge.register_host_handler( + "host.llm.complete", + self._handle_host_llm_complete, + ) + self._bridge = new_bridge + self._open_session(session_id, timeout=60.0) + logger.info( + "MemOS: bridge ready (stdio) session=%s platform=%s (episode deferred)", + self._session_id, + self._platform, + ) + except Exception as err: + logger.warning("MemOS: bridge init failed — %s", err) + if new_bridge is not None: + with contextlib.suppress(Exception): + new_bridge.close() + self._bridge = None # Register a Hermes plugin hook to capture tool calls as they # happen. The `post_tool_call` hook fires after every tool # dispatch (write_file, terminal, search_files, etc.) with the @@ -1746,6 +1802,13 @@ def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> N logger.info("MemOS: old bridge closed (pid=%s)", old_pid) ensure_bridge_running() + # Try HTTP first if daemon is running + viewer_status = probe_viewer_status() + if viewer_status == "running_memos": + if self._connect_http_bridge(session_id, timeout=timeout): + logger.info("MemOS: reconnected via HTTP") + return + try: ensure_viewer_daemon() except Exception as err: diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 19c8a7be0..4bb67f3f9 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -19,6 +19,7 @@ import contextlib import logging import os +import re import shutil import signal import subprocess @@ -304,7 +305,10 @@ def ensure_viewer_daemon(*, probe_only: bool = False) -> bool: logger.warning("MemOS: failed to start viewer daemon — %s", err) return False - deadline = time.time() + 15.0 + # 45s is generous for cold Node.js starts (tsx compile + SQLite + # open + FTS warmup). Fast probes for the first 15s, then back + # off to 2s to avoid hammering a slow-starting daemon. + deadline = time.time() + 45.0 while time.time() < deadline: if _viewer_process.poll() is not None: logger.warning( @@ -324,8 +328,8 @@ def ensure_viewer_daemon(*, probe_only: bool = False) -> bool: HERMES_VIEWER_PORT, ) return False - time.sleep(0.5) - logger.warning("MemOS: viewer daemon did not become healthy within 15s") + time.sleep(0.5 if (deadline - time.time()) > 30 else 2.0) + logger.warning("MemOS: viewer daemon did not become healthy within 45s") return False @@ -336,6 +340,108 @@ def shutdown_bridge() -> None: _bridge_ok = None +def probe_viewer_status() -> str: + """Return the current viewer daemon status without side effects. + + Returns one of: ``"running_memos"``, ``"free"``, ``"blocked"``. + This is a cheap, lock-free probe suitable for deciding whether to + spawn a new stdio bridge or connect to the existing daemon over HTTP. + """ + return _probe_viewer() + + +def startup_lock_active() -> bool: + """Return True when the viewer-start.lock directory exists. + + Used by the provider to skip the cold-start sleep when there is no + concurrent daemon launch in progress. + """ + return (_plugin_root() / "daemon" / "viewer-start.lock").exists() + + +def kill_zombie_bridges() -> int: + """Kill all bridge.cjs processes that are NOT the daemon on port 18800. + + Returns the number of zombies killed. The daemon (the process that + owns port 18800) is left alone. This should be called early in the + provider's lifecycle to clean up leftovers from crashed sessions. + """ + # Find the PID that owns port 18800 (the real daemon). + # ss(8) is Linux-only; fall back to lsof on macOS. + daemon_pid: int | None = None + try: + ss_out = subprocess.check_output( + ["ss", "-tlnp"], + timeout=2.0, + text=True, + ) + for line in ss_out.splitlines(): + if ":18800" in line: + # ss output: users:(("node",pid=21246,fd=24)) + m = re.search(r"pid=(\d+)", line) + if m: + daemon_pid = int(m.group(1)) + break + except Exception: + pass + + if daemon_pid is None: + # macOS fallback — lsof is available on both Linux and macOS. + try: + lsof_out = subprocess.check_output( + ["lsof", "-iTCP:18800", "-sTCP:LISTEN", "-n", "-P"], + timeout=2.0, + text=True, + ) + for line in lsof_out.splitlines()[1:]: # skip header + parts = line.split() + if len(parts) >= 2: + try: + daemon_pid = int(parts[1]) + break + except ValueError: + continue + except Exception: + pass + + # If we still can't identify the daemon PID, skip killing entirely to + # avoid terminating the daemon itself. + if daemon_pid is None: + logger.debug("MemOS: zombie scan skipped — could not identify daemon PID on port 18800") + return 0 + + # Find all bridge.cjs processes + killed = 0 + try: + ps_out = subprocess.check_output( + ["ps", "aux"], + timeout=2.0, + text=True, + ) + for line in ps_out.splitlines(): + if "bridge.cjs" not in line or "grep" in line: + continue + parts = line.split() + if len(parts) < 2: + continue + try: + pid = int(parts[1]) + except ValueError: + continue + if pid == daemon_pid: + continue + try: + os.kill(pid, signal.SIGTERM) + killed += 1 + logger.info("MemOS: killed zombie bridge pid=%d", pid) + except (OSError, ProcessLookupError): + pass + except Exception as err: + logger.debug("MemOS: zombie scan failed — %s", err) + + return killed + + def wait_for_process_exit(pid: int, timeout: float = 5.0) -> bool: """Wait for a process to exit. diff --git a/apps/memos-local-plugin/core/capture/capture.ts b/apps/memos-local-plugin/core/capture/capture.ts index 9d52f749e..85430a551 100644 --- a/apps/memos-local-plugin/core/capture/capture.ts +++ b/apps/memos-local-plugin/core/capture/capture.ts @@ -33,6 +33,7 @@ import { extractSteps } from "./step-extractor.js"; import { createSummarizer, type Summarizer } from "./summarizer.js"; import { tagsForStep } from "./tagger.js"; import { extractErrorSignatures } from "./error-signature.js"; +import { RECOVERY_REASONS } from "../pipeline/recovery-constants.js"; import type { CaptureConfig, CaptureEvent, @@ -396,33 +397,47 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner { for (const tr of existing) traceByTs.set(tr.ts, tr); const orphan = normalized.filter((s) => !traceByTs.has(s.ts)); if (orphan.length > 0) { - log.warn("reflect.orphan_steps", { - episodeId: input.episode.id, - count: orphan.length, - action: "fallback_insert", - }); - // These steps never went through runLite (likely a test path or a - // dropped event). Insert them now with reflection=null so the - // batch pass below can patch them like the rest. - const summStart = now(); - const { summaries } = await runSummarize( - orphan.map((s) => ({ + // During dirty-reward recovery (recoverDirtyClosedEpisodes), the episode + // snapshot is rebuilt from trace_ids_json. Any "orphan" steps here are + // artifact mismatches between snapshot timestamps and DB rows — NOT + // genuinely missing traces. Inserting them would grow trace_ids_json, + // keep reward.traceCount !== traceIds.length, and restart the recovery + // loop on every bridge start. + if (input.episode.meta?.recoveryReason === RECOVERY_REASONS.DIRTY_REWARD_RESCORE) { + log.warn("reflect.orphan_steps_skipped_recovery", { + episodeId: input.episode.id, + count: orphan.length, + reason: "dirty_reward_rescore — skipping insert to break recovery loop", + }); + } else { + log.warn("reflect.orphan_steps", { + episodeId: input.episode.id, + count: orphan.length, + action: "fallback_insert", + }); + // These steps never went through runLite (likely a test path or a + // dropped event). Insert them now with reflection=null so the + // batch pass below can patch them like the rest. + const summStart = now(); + const { summaries } = await runSummarize( + orphan.map((s) => ({ + ...s, + reflection: { text: null, alpha: 0, usable: false, source: "none" }, + })), + summStart, + llmCalls, + warnings, + { episodeId: input.episode.id, phase: "reflect" }, + ); + const orphanScored: ScoredStep[] = orphan.map((s) => ({ ...s, reflection: { text: null, alpha: 0, usable: false, source: "none" }, - })), - summStart, - llmCalls, - warnings, - { episodeId: input.episode.id, phase: "reflect" }, - ); - const orphanScored: ScoredStep[] = orphan.map((s) => ({ - ...s, - reflection: { text: null, alpha: 0, usable: false, source: "none" }, - })); - const { vecs } = await runEmbed(orphanScored, summaries, warnings); - const orphanRows = buildRows(orphanScored, summaries, vecs, input.episode); - await persistRows(orphanRows, input, warnings); - for (const r of orphanRows) traceByTs.set(r.ts, r); + })); + const { vecs } = await runEmbed(orphanScored, summaries, warnings); + const orphanRows = buildRows(orphanScored, summaries, vecs, input.episode); + await persistRows(orphanRows, input, warnings); + for (const r of orphanRows) traceByTs.set(r.ts, r); + } } if (normalized.length === 0) { diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 4974ee16d..3755805cb 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -89,6 +89,7 @@ import { } from "../llm/host-bridge.js"; import { createPipeline } from "./orchestrator.js"; +import { RECOVERY_REASONS } from "./recovery-constants.js"; import { wrapRetrievalRepos } from "./retrieval-repos.js"; import type { PipelineDeps, PipelineHandle } from "./types.js"; import { @@ -1248,10 +1249,10 @@ export function createMemoryCore( handle.repos.episodes.updateMeta(episodeId, { closeReason: "finalized", recoveredAtStartup: endedAt, - recoveryReason: "dirty_reward_rescore", + recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE, }); const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, { - recoveryReason: "dirty_reward_rescore", + recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE, }); handle.buses.session.emit({ kind: "episode.finalized", diff --git a/apps/memos-local-plugin/core/pipeline/recovery-constants.ts b/apps/memos-local-plugin/core/pipeline/recovery-constants.ts new file mode 100644 index 000000000..7ddc0b99c --- /dev/null +++ b/apps/memos-local-plugin/core/pipeline/recovery-constants.ts @@ -0,0 +1,12 @@ +/** + * Shared string constants for episode recovery reasons. + * + * Both `memory-core.ts` (which sets recoveryReason) and `capture.ts` + * (which checks it) import from here so a rename can't silently + * break the orphan-skip guard. + */ +export const RECOVERY_REASONS = { + DIRTY_REWARD_RESCORE: "dirty_reward_rescore", +} as const; + +export type RecoveryReason = (typeof RECOVERY_REASONS)[keyof typeof RECOVERY_REASONS]; diff --git a/apps/memos-local-plugin/server/routes/auth.ts b/apps/memos-local-plugin/server/routes/auth.ts index bb6dc63a4..d8cb1a5fe 100644 --- a/apps/memos-local-plugin/server/routes/auth.ts +++ b/apps/memos-local-plugin/server/routes/auth.ts @@ -410,10 +410,27 @@ export function requireSession( agent?: string | null, ): boolean { // Public: auth endpoints + health (so the viewer can tell whether - // the backend is up BEFORE unlocking). Other API routes, including - // ping, fall through and are only open when no password is configured. + // the backend is up BEFORE unlocking). RPC endpoint is exempt for + // loopback callers only (local Python adapter, same machine, no + // browser session). Remote callers on the hub's 0.0.0.0 binding + // must still hold a valid session cookie. + // + // ASSUMPTION: the server binds directly (no reverse proxy). If a + // reverse proxy is ever added, socket.remoteAddress will be the + // proxy address and X-Forwarded-For must be consulted instead, or + // this loopback exemption will bypass auth for all proxy traffic. if (pathname.startsWith("/api/v1/auth/")) return true; if (pathname === "/api/v1/health") return true; + if (pathname === "/api/v1/rpc") { + const remote = + (req as unknown as { socket?: { remoteAddress?: string } }).socket + ?.remoteAddress ?? ""; + const isLoopback = + remote === "127.0.0.1" || + remote === "::1" || + remote === "::ffff:127.0.0.1"; + if (isLoopback) return true; + } const state = readAuthState(homeDir); if (!state) return true; // password protection off → open diff --git a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts index 88d5cbbd4..50240cc1c 100644 --- a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts +++ b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts @@ -26,6 +26,7 @@ import { __resetHostLlmBridgeForTests, type HostLlmBridge, } from "../../../core/llm/index.js"; +import { RECOVERY_REASONS } from "../../../core/pipeline/recovery-constants.js"; import { makeTmpDb, type TmpDbHandle } from "../../helpers/tmp-db.js"; import { makeTmpHome, type TmpHomeContext } from "../../helpers/tmp-home.js"; import { fakeEmbedder } from "../../helpers/fake-embedder.js"; @@ -1363,7 +1364,7 @@ algorithm: reward?: { traceCount?: number; traceIds?: string[] }; }; expect(meta.rewardDirty).toBeUndefined(); - expect(meta.recoveryReason).toBe("dirty_reward_rescore"); + expect(meta.recoveryReason).toBe(RECOVERY_REASONS.DIRTY_REWARD_RESCORE); expect(meta.reward?.traceCount).toBe(1); expect(meta.reward?.traceIds).toEqual(["tr_dirty"]); }); @@ -1452,7 +1453,7 @@ algorithm: recoveryReason?: string; reward?: { traceCount?: number; traceIds?: string[] }; }; - expect(meta.recoveryReason).toBe("dirty_reward_rescore"); + expect(meta.recoveryReason).toBe(RECOVERY_REASONS.DIRTY_REWARD_RESCORE); expect(meta.reward?.traceCount).toBe(1); expect(meta.reward?.traceIds).toEqual(["tr_missing_reward"]); });