Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 93 additions & 30 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
if str(_PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(_PLUGIN_DIR))

from bridge_client import BridgeError, MemosBridgeClient # noqa: E402
from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402
from bridge_client import BridgeError, MemosBridgeClient, MemosHttpClient # noqa: E402
from daemon_manager import ensure_bridge_running, ensure_viewer_daemon, probe_viewer_status, kill_zombie_bridges, startup_lock_active # noqa: E402


try: # pragma: no cover — host-provided base class, absent in unit tests
Expand Down Expand Up @@ -243,7 +243,7 @@ class MemTensorProvider(MemoryProvider):
"""

def __init__(self) -> None:
self._bridge: MemosBridgeClient | None = None
self._bridge: MemosBridgeClient | MemosHttpClient | None = None
self._reconnect_lock = threading.Lock()
self._session_id: str = ""
self._episode_id: str = ""
Expand Down Expand Up @@ -293,6 +293,23 @@ def is_available(self) -> bool: # type: ignore[override]

# ─── Lifecycle ────────────────────────────────────────────────────────

def _connect_http_bridge(self, session_id: str, *, timeout: float = 60.0) -> bool:
"""Try to connect via HTTP bridge. Sets self._bridge on success."""
http_bridge: MemosHttpClient | None = None
try:
http_bridge = MemosHttpClient()
http_bridge.register_host_handler("host.llm.complete", self._handle_host_llm_complete)
self._bridge = http_bridge
self._open_session(session_id, timeout=timeout)
return True
except Exception as err:
logger.warning("MemOS: HTTP bridge failed, falling back to stdio — %s", err)
if http_bridge is not None:
with contextlib.suppress(Exception):
http_bridge.close()
self._bridge = None
return False

def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override]
"""Called once at agent startup.

Expand All @@ -314,34 +331,73 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
except Exception as err:
logger.warning("MemOS: failed to start bridge — %s", err)
return

# Kill zombie bridges from previous sessions before deciding
# how to connect.
try:
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed — %s", err)
new_bridge: MemosBridgeClient | None = None
try:
new_bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._open_session(session_id)
logger.info(
"MemOS: bridge ready session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
self._bridge = None
zombies = kill_zombie_bridges()
if zombies:
logger.info("MemOS: killed %d zombie bridge(s)", zombies)
except Exception:
pass

# If the daemon is already running on the viewer port, connect
# to it over HTTP instead of spawning a new stdio bridge. This
# eliminates zombie bridge accumulation.
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id):
logger.info(
"MemOS: bridge ready (HTTP) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
else:
viewer_status = "free" # force stdio fallback below
elif viewer_status == "free":
# Re-probe after a short wait only when another process may be
# mid-startup (startup lock is held). On a cold first-launch the
# lock doesn't exist, so we skip the delay entirely.
if startup_lock_active():
time.sleep(1.0)
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id):
logger.info(
"MemOS: bridge ready (HTTP, late probe) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)

if self._bridge is None:
try:
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed — %s", err)
new_bridge: MemosBridgeClient | None = None
try:
new_bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._open_session(session_id, timeout=60.0)
logger.info(
"MemOS: bridge ready (stdio) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
self._bridge = None
# Register a Hermes plugin hook to capture tool calls as they
# happen. The `post_tool_call` hook fires after every tool
# dispatch (write_file, terminal, search_files, etc.) with the
Expand Down Expand Up @@ -1746,6 +1802,13 @@ def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> N
logger.info("MemOS: old bridge closed (pid=%s)", old_pid)

ensure_bridge_running()
# Try HTTP first if daemon is running
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id, timeout=timeout):
logger.info("MemOS: reconnected via HTTP")
return

try:
ensure_viewer_daemon()
except Exception as err:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import contextlib
import logging
import os
import re
import shutil
import signal
import subprocess
Expand Down Expand Up @@ -304,7 +305,10 @@ def ensure_viewer_daemon(*, probe_only: bool = False) -> bool:
logger.warning("MemOS: failed to start viewer daemon — %s", err)
return False

deadline = time.time() + 15.0
# 45s is generous for cold Node.js starts (tsx compile + SQLite
# open + FTS warmup). Fast probes for the first 15s, then back
# off to 2s to avoid hammering a slow-starting daemon.
deadline = time.time() + 45.0
while time.time() < deadline:
if _viewer_process.poll() is not None:
logger.warning(
Expand All @@ -324,8 +328,8 @@ def ensure_viewer_daemon(*, probe_only: bool = False) -> bool:
HERMES_VIEWER_PORT,
)
return False
time.sleep(0.5)
logger.warning("MemOS: viewer daemon did not become healthy within 15s")
time.sleep(0.5 if (deadline - time.time()) > 30 else 2.0)
logger.warning("MemOS: viewer daemon did not become healthy within 45s")
return False


Expand All @@ -336,6 +340,108 @@ def shutdown_bridge() -> None:
_bridge_ok = None


def probe_viewer_status() -> str:
"""Return the current viewer daemon status without side effects.

Returns one of: ``"running_memos"``, ``"free"``, ``"blocked"``.
This is a cheap, lock-free probe suitable for deciding whether to
spawn a new stdio bridge or connect to the existing daemon over HTTP.
"""
return _probe_viewer()


def startup_lock_active() -> bool:
"""Return True when the viewer-start.lock directory exists.

Used by the provider to skip the cold-start sleep when there is no
concurrent daemon launch in progress.
"""
return (_plugin_root() / "daemon" / "viewer-start.lock").exists()


def kill_zombie_bridges() -> int:
"""Kill all bridge.cjs processes that are NOT the daemon on port 18800.

Returns the number of zombies killed. The daemon (the process that
owns port 18800) is left alone. This should be called early in the
provider's lifecycle to clean up leftovers from crashed sessions.
"""
# Find the PID that owns port 18800 (the real daemon).
# ss(8) is Linux-only; fall back to lsof on macOS.
daemon_pid: int | None = None
try:
ss_out = subprocess.check_output(
["ss", "-tlnp"],
timeout=2.0,
text=True,
)
for line in ss_out.splitlines():
if ":18800" in line:
# ss output: users:(("node",pid=21246,fd=24))
m = re.search(r"pid=(\d+)", line)
if m:
daemon_pid = int(m.group(1))
break
except Exception:
pass

if daemon_pid is None:
# macOS fallback — lsof is available on both Linux and macOS.
try:
lsof_out = subprocess.check_output(
["lsof", "-iTCP:18800", "-sTCP:LISTEN", "-n", "-P"],
timeout=2.0,
text=True,
)
for line in lsof_out.splitlines()[1:]: # skip header
parts = line.split()
if len(parts) >= 2:
try:
daemon_pid = int(parts[1])
break
except ValueError:
continue
except Exception:
pass

# If we still can't identify the daemon PID, skip killing entirely to
# avoid terminating the daemon itself.
if daemon_pid is None:
logger.debug("MemOS: zombie scan skipped — could not identify daemon PID on port 18800")
return 0

# Find all bridge.cjs processes
killed = 0
try:
ps_out = subprocess.check_output(
["ps", "aux"],
timeout=2.0,
text=True,
)
for line in ps_out.splitlines():
if "bridge.cjs" not in line or "grep" in line:
continue
parts = line.split()
if len(parts) < 2:
continue
try:
pid = int(parts[1])
except ValueError:
continue
if pid == daemon_pid:
continue
try:
os.kill(pid, signal.SIGTERM)
killed += 1
logger.info("MemOS: killed zombie bridge pid=%d", pid)
except (OSError, ProcessLookupError):
pass
except Exception as err:
logger.debug("MemOS: zombie scan failed — %s", err)

return killed


def wait_for_process_exit(pid: int, timeout: float = 5.0) -> bool:
"""Wait for a process to exit.

Expand Down
65 changes: 40 additions & 25 deletions apps/memos-local-plugin/core/capture/capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { extractSteps } from "./step-extractor.js";
import { createSummarizer, type Summarizer } from "./summarizer.js";
import { tagsForStep } from "./tagger.js";
import { extractErrorSignatures } from "./error-signature.js";
import { RECOVERY_REASONS } from "../pipeline/recovery-constants.js";
import type {
CaptureConfig,
CaptureEvent,
Expand Down Expand Up @@ -396,33 +397,47 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
for (const tr of existing) traceByTs.set(tr.ts, tr);
const orphan = normalized.filter((s) => !traceByTs.has(s.ts));
if (orphan.length > 0) {
log.warn("reflect.orphan_steps", {
episodeId: input.episode.id,
count: orphan.length,
action: "fallback_insert",
});
// These steps never went through runLite (likely a test path or a
// dropped event). Insert them now with reflection=null so the
// batch pass below can patch them like the rest.
const summStart = now();
const { summaries } = await runSummarize(
orphan.map((s) => ({
// During dirty-reward recovery (recoverDirtyClosedEpisodes), the episode
// snapshot is rebuilt from trace_ids_json. Any "orphan" steps here are
// artifact mismatches between snapshot timestamps and DB rows — NOT
// genuinely missing traces. Inserting them would grow trace_ids_json,
// keep reward.traceCount !== traceIds.length, and restart the recovery
// loop on every bridge start.
if (input.episode.meta?.recoveryReason === RECOVERY_REASONS.DIRTY_REWARD_RESCORE) {
log.warn("reflect.orphan_steps_skipped_recovery", {
episodeId: input.episode.id,
count: orphan.length,
reason: "dirty_reward_rescore — skipping insert to break recovery loop",
});
} else {
log.warn("reflect.orphan_steps", {
episodeId: input.episode.id,
count: orphan.length,
action: "fallback_insert",
});
// These steps never went through runLite (likely a test path or a
// dropped event). Insert them now with reflection=null so the
// batch pass below can patch them like the rest.
const summStart = now();
const { summaries } = await runSummarize(
orphan.map((s) => ({
...s,
reflection: { text: null, alpha: 0, usable: false, source: "none" },
})),
summStart,
llmCalls,
warnings,
{ episodeId: input.episode.id, phase: "reflect" },
);
const orphanScored: ScoredStep[] = orphan.map((s) => ({
...s,
reflection: { text: null, alpha: 0, usable: false, source: "none" },
})),
summStart,
llmCalls,
warnings,
{ episodeId: input.episode.id, phase: "reflect" },
);
const orphanScored: ScoredStep[] = orphan.map((s) => ({
...s,
reflection: { text: null, alpha: 0, usable: false, source: "none" },
}));
const { vecs } = await runEmbed(orphanScored, summaries, warnings);
const orphanRows = buildRows(orphanScored, summaries, vecs, input.episode);
await persistRows(orphanRows, input, warnings);
for (const r of orphanRows) traceByTs.set(r.ts, r);
}));
const { vecs } = await runEmbed(orphanScored, summaries, warnings);
const orphanRows = buildRows(orphanScored, summaries, vecs, input.episode);
await persistRows(orphanRows, input, warnings);
for (const r of orphanRows) traceByTs.set(r.ts, r);
}
}

if (normalized.length === 0) {
Expand Down
Loading