diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index f6e9a15e1..589eae7d4 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -246,6 +246,21 @@ def _reconcile_handle(self, handle: LockHandle) -> Optional[LockHandle]: # ------------------------------------------------------------------ async def _recover_pending_redo(self) -> None: + # Wait for the queue manager to become available so that + # _enqueue_semantic does not silently discard work. + from openviking.storage.queuefs import get_queue_manager + + for _ in range(30): # up to 30 s + if get_queue_manager() is not None: + break + await asyncio.sleep(1) + else: + logger.warning( + "Queue manager not available after 30 s — " + "deferring redo recovery to next restart" + ) + return + pending_ids = self._redo_log.list_pending() for task_id in pending_ids: logger.info(f"Recovering pending redo task: {task_id}") @@ -314,7 +329,7 @@ async def _redo_session_memory(self, info: Dict[str, Any]) -> None: session_id=session_id, ctx=ctx, ), - timeout=60.0, + timeout=15.0, # short timeout during recovery to avoid blocking startup ) logger.info(f"Redo: extracted {len(memories)} memories from {archive_uri}") except Exception as e: @@ -337,8 +352,9 @@ async def _enqueue_semantic(self, **params: Any) -> None: queue_manager = get_queue_manager() if queue_manager is None: - logger.debug("No queue manager available, skipping enqueue_semantic") - return + raise RuntimeError( + "Queue manager not available — cannot enqueue semantic processing" + ) uri = params.get("uri") if not uri: