diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index e2eefb9d8..e7b756a1f 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -250,10 +250,13 @@ def handle_task_queue_status( user_id: str, mem_scheduler: OptimizedScheduler, task_id: str | None = None ) -> TaskQueueResponse: try: - queue = getattr(mem_scheduler, "memos_message_queue", None) - if queue is None: + queue_wrapper = getattr(mem_scheduler, "memos_message_queue", None) + if queue_wrapper is None: raise HTTPException(status_code=503, detail="Scheduler queue is not available") + # Unwrap to the underlying queue if wrapped by ScheduleTaskQueue + queue = getattr(queue_wrapper, "memos_message_queue", queue_wrapper) + # Only support Redis-backed queue for now; try lazy init if not connected redis_conn = getattr(queue, "_redis_conn", None) if redis_conn is None: @@ -269,7 +272,8 @@ def handle_task_queue_status( if redis_conn is None: raise HTTPException(status_code=503, detail="Scheduler queue not connected to Redis") - stream_keys = queue.get_stream_keys() + # Use wrapper to list stream keys so it can adapt to local/redis queue + stream_keys = queue_wrapper.get_stream_keys() # Filter by user_id; stream key format: {prefix}:{user_id}:{mem_cube_id}:{task_label} user_stream_keys = [sk for sk in stream_keys if f":{user_id}:" in sk]