Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions openviking/storage/transaction/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,21 @@ def _reconcile_handle(self, handle: LockHandle) -> Optional[LockHandle]:
# ------------------------------------------------------------------

async def _recover_pending_redo(self) -> None:
# Wait for the queue manager to become available so that
# _enqueue_semantic does not silently discard work.
from openviking.storage.queuefs import get_queue_manager

for _ in range(30): # up to 30 s
if get_queue_manager() is not None:
break
await asyncio.sleep(1)
else:
logger.warning(
"Queue manager not available after 30 s — "
"deferring redo recovery to next restart"
)
return

pending_ids = self._redo_log.list_pending()
for task_id in pending_ids:
logger.info(f"Recovering pending redo task: {task_id}")
Expand Down Expand Up @@ -314,7 +329,7 @@ async def _redo_session_memory(self, info: Dict[str, Any]) -> None:
session_id=session_id,
ctx=ctx,
),
timeout=60.0,
timeout=15.0, # short timeout during recovery to avoid blocking startup
)
logger.info(f"Redo: extracted {len(memories)} memories from {archive_uri}")
except Exception as e:
Expand All @@ -337,8 +352,9 @@ async def _enqueue_semantic(self, **params: Any) -> None:

queue_manager = get_queue_manager()
if queue_manager is None:
logger.debug("No queue manager available, skipping enqueue_semantic")
return
raise RuntimeError(
"Queue manager not available — cannot enqueue semantic processing"
)

uri = params.get("uri")
if not uri:
Expand Down
Loading