From 5958b5121e8ddc414765b3605d9dedf2bf33fbb1 Mon Sep 17 00:00:00 2001 From: yc111233 Date: Sun, 5 Apr 2026 23:43:02 +0800 Subject: [PATCH] fix(redo): wait for queue manager before recovery and prevent silent task loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues in _recover_pending_redo / _enqueue_semantic: 1. Redo recovery runs immediately at start() but the queue manager may not be initialized yet. _enqueue_semantic silently returns when queue_manager is None, then mark_done removes the redo marker → the task is permanently lost and memories are never extracted. 2. The VLM timeout during recovery is 60 s. If VLM is unavailable, each redo task wastes a full minute, and with multiple tasks this delays service readiness by several minutes. 3. _enqueue_semantic silently drops work when queue_manager is None. Any caller that assumes the work was accepted will proceed and delete its redo marker, losing the task. Fixes: - Wait up to 30 s for queue manager to become available before starting redo recovery; if it never appears, defer to next restart. - Reduce VLM timeout during recovery from 60 s to 15 s. - Make _enqueue_semantic raise RuntimeError instead of silently returning when queue_manager is None so callers cannot accidentally mark the task as done. Co-Authored-By: Claude Opus 4.6 --- .../storage/transaction/lock_manager.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index f6e9a15e1..589eae7d4 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -246,6 +246,21 @@ def _reconcile_handle(self, handle: LockHandle) -> Optional[LockHandle]: # ------------------------------------------------------------------ async def _recover_pending_redo(self) -> None: + # Wait for the queue manager to become available so that + # _enqueue_semantic does not silently discard work. + from openviking.storage.queuefs import get_queue_manager + + for _ in range(30): # up to 30 s + if get_queue_manager() is not None: + break + await asyncio.sleep(1) + else: + logger.warning( + "Queue manager not available after 30 s — " + "deferring redo recovery to next restart" + ) + return + pending_ids = self._redo_log.list_pending() for task_id in pending_ids: logger.info(f"Recovering pending redo task: {task_id}") @@ -314,7 +329,7 @@ async def _redo_session_memory(self, info: Dict[str, Any]) -> None: session_id=session_id, ctx=ctx, ), - timeout=60.0, + timeout=15.0, # short timeout during recovery to avoid blocking startup ) logger.info(f"Redo: extracted {len(memories)} memories from {archive_uri}") except Exception as e: @@ -337,8 +352,9 @@ async def _enqueue_semantic(self, **params: Any) -> None: queue_manager = get_queue_manager() if queue_manager is None: - logger.debug("No queue manager available, skipping enqueue_semantic") - return + raise RuntimeError( + "Queue manager not available — cannot enqueue semantic processing" + ) uri = params.get("uri") if not uri: