From 8e1eb6299b93bc4fe438916e33a1b8d83ffb18a6 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 14:37:20 +0800 Subject: [PATCH 1/7] Route mem_feedback async through scheduler tracking --- src/memos/multi_mem_cube/single_cube.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 88c0f87c7..081056473 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -157,9 +157,8 @@ def feedback_memories(self, feedback_req: APIFeedbackRequest) -> dict[str, Any]: content=feedback_req_str, timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages( - messages=[message_item_feedback] - ) + # Use scheduler submission to ensure tracking and metrics + self.mem_scheduler.submit_messages(messages=[message_item_feedback]) self.logger.info(f"[SingleCubeView] cube={self.cube_id} Submitted FEEDBACK async") except Exception as e: self.logger.error( From 39fd9e4607f3caa4e8a0b12c8b70c406099fabd7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 11:35:18 +0800 Subject: [PATCH 2/7] Add scheduler allstatus endpoint and fix redis scan --- src/memos/api/handlers/scheduler_handler.py | 66 ++++++++++++++++++- src/memos/api/product_models.py | 15 +++++ src/memos/api/routers/server_router.py | 13 ++++ .../mem_scheduler/utils/status_tracker.py | 28 ++++++++ 4 files changed, 121 insertions(+), 1 deletion(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index 697822a77..b92ddf301 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -15,14 +15,78 @@ from fastapi.responses import StreamingResponse # Imports for new implementation -from memos.api.product_models import StatusResponse, StatusResponseItem +from memos.api.product_models import ( + AllStatusResponse, + AllStatusResponseData, + StatusResponse, + StatusResponseItem, +) from memos.log import get_logger +from memos.mem_scheduler.base_scheduler import BaseScheduler from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker logger = get_logger(__name__) +def handle_scheduler_allstatus( + mem_scheduler: BaseScheduler, + status_tracker: TaskStatusTracker, +) -> AllStatusResponse: + """ + Get detailed scheduler status including running tasks and queue metrics. + + This handler aggregates: + 1. Currently running tasks from Redis (via TaskStatusTracker) - PERSISTENT. + 2. Queue status (counts of running/remaining tasks) from the monitor - PERSISTENT. + + Args: + mem_scheduler: The BaseScheduler instance. + status_tracker: The TaskStatusTracker instance. + + Returns: + AllStatusResponse with detailed status data. + """ + try: + # 1. Get running tasks from Redis (persistent status) + # Flatten the user -> task structure into a single dict for the response + # or keep it nested? The AllStatusResponseData expects a dict. + # Let's flatten it to task_id -> task_data for now, or update model to support nesting. + # Given AllStatusResponseData.running_tasks is dict[str, Any], we can put whatever. + # Let's return {user_id: {task_id: ...}} which is structured. + + global_tasks = status_tracker.get_all_tasks_global() + + # Filter for 'running' tasks only? Or return all? + # The name is "running_tasks", implying active ones. + # But "status" endpoint returns all. + # Let's return ALL tasks found in Redis, but maybe organized by user. + + # flatten to task_id -> detail if unique, but task_ids are UUIDs so unique. + flattened_tasks = {} + for user_id, tasks in global_tasks.items(): + for task_id, detail in tasks.items(): + # Inject user_id into detail for clarity + detail["user_id"] = user_id + flattened_tasks[task_id] = detail + + # 2. Get queue status from the monitor + # The monitor has get_tasks_status() which returns queue metrics + queue_status_data = {} + if mem_scheduler.task_schedule_monitor: + queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() + + return AllStatusResponse( + data=AllStatusResponseData( + running_tasks=flattened_tasks, + queue_status=queue_status_data, + ) + ) + except Exception as err: + logger.error(f"Failed to get full scheduler status: {traceback.format_exc()}") + raise HTTPException(status_code=500, detail="Failed to get full scheduler status") from err + + def handle_scheduler_status( user_id: str, status_tracker: TaskStatusTracker, task_id: str | None = None ) -> StatusResponse: diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 9dfd872b0..d62f3aae8 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -865,3 +865,18 @@ class StatusResponse(BaseResponse[list[StatusResponseItem]]): """Response model for scheduler status operations.""" message: str = "Memory get status successfully" + + +class AllStatusResponseData(BaseModel): + """Data model for full scheduler status.""" + + running_tasks: dict[str, Any] = Field(..., description="Details of currently running tasks") + queue_status: dict[str, Any] = Field( + ..., description="Status of the task queue (running/remaining counts)" + ) + + +class AllStatusResponse(BaseResponse[AllStatusResponseData]): + """Response model for full scheduler status operations.""" + + message: str = "Full scheduler status retrieved successfully" diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 5b2107b6c..ea50a6bb0 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -28,6 +28,7 @@ APIChatCompleteRequest, APIFeedbackRequest, APISearchRequest, + AllStatusResponse, ChatRequest, DeleteMemoryRequest, DeleteMemoryResponse, @@ -114,6 +115,18 @@ def add_memories(add_req: APIADDRequest): # ============================================================================= +@router.get( # Changed from post to get + "/scheduler/allstatus", + summary="Get detailed scheduler status", + response_model=AllStatusResponse, +) +def scheduler_allstatus(): + """Get detailed scheduler status including running tasks and queue metrics.""" + return handlers.scheduler_handler.handle_scheduler_allstatus( + mem_scheduler=mem_scheduler, status_tracker=status_tracker + ) + + @router.get( # Changed from post to get "/scheduler/status", summary="Get scheduler running status", response_model=StatusResponse ) diff --git a/src/memos/mem_scheduler/utils/status_tracker.py b/src/memos/mem_scheduler/utils/status_tracker.py index 9a8fa53df..f2edc5aea 100644 --- a/src/memos/mem_scheduler/utils/status_tracker.py +++ b/src/memos/mem_scheduler/utils/status_tracker.py @@ -168,3 +168,31 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) -> "item_count": len(item_ids), "item_statuses": item_statuses, } + + def get_all_tasks_global(self) -> dict[str, dict[str, dict]]: + """ + Retrieve all tasks for all users from Redis. + + Returns: + dict: {user_id: {task_id: task_data, ...}, ...} + """ + all_users_tasks = {} + cursor: int | str = 0 + while True: + cursor, keys = self.redis.scan(cursor=cursor, match="memos:task_meta:*", count=100) + for key in keys: + # key format: memos:task_meta:{user_id} + parts = key.split(":") + if len(parts) < 3: + continue + user_id = parts[2] + + tasks = self.redis.hgetall(key) + if tasks: + user_tasks = {tid: json.loads(t_data) for tid, t_data in tasks.items()} + all_users_tasks[user_id] = user_tasks + + if cursor == 0 or cursor == "0": + break + + return all_users_tasks From 7bbc58716776852f8e37b6340d6343996551ac3c Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 12:05:13 +0800 Subject: [PATCH 3/7] Summarize scheduler allstatus response --- src/memos/api/handlers/scheduler_handler.py | 87 +++++++++++++-------- src/memos/api/product_models.py | 23 ++++-- 2 files changed, 73 insertions(+), 37 deletions(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index b92ddf301..a425e1a0a 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -8,6 +8,7 @@ import json import time import traceback +from collections import Counter from typing import Any @@ -20,6 +21,7 @@ AllStatusResponseData, StatusResponse, StatusResponseItem, + TaskSummary, ) from memos.log import get_logger from memos.mem_scheduler.base_scheduler import BaseScheduler @@ -34,52 +36,73 @@ def handle_scheduler_allstatus( status_tracker: TaskStatusTracker, ) -> AllStatusResponse: """ - Get detailed scheduler status including running tasks and queue metrics. - - This handler aggregates: - 1. Currently running tasks from Redis (via TaskStatusTracker) - PERSISTENT. - 2. Queue status (counts of running/remaining tasks) from the monitor - PERSISTENT. + Get aggregated scheduler status metrics (no per-task payload). Args: mem_scheduler: The BaseScheduler instance. status_tracker: The TaskStatusTracker instance. Returns: - AllStatusResponse with detailed status data. + AllStatusResponse with aggregated status data. """ + + def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: + """Aggregate counts by status for the provided task details.""" + counter = Counter() + for detail in task_details: + status = detail.get("status") + if status: + counter[status] += 1 + + total = sum(counter.values()) + return TaskSummary( + waiting=counter.get("waiting", 0), + in_progress=counter.get("in_progress", 0), + completed=counter.get("completed", 0), + failed=counter.get("failed", 0), + cancelled=counter.get("cancelled", 0), + total=total, + ) + try: - # 1. Get running tasks from Redis (persistent status) - # Flatten the user -> task structure into a single dict for the response - # or keep it nested? The AllStatusResponseData expects a dict. - # Let's flatten it to task_id -> task_data for now, or update model to support nesting. - # Given AllStatusResponseData.running_tasks is dict[str, Any], we can put whatever. - # Let's return {user_id: {task_id: ...}} which is structured. - + # Get all task details from Redis and aggregate counts global_tasks = status_tracker.get_all_tasks_global() - - # Filter for 'running' tasks only? Or return all? - # The name is "running_tasks", implying active ones. - # But "status" endpoint returns all. - # Let's return ALL tasks found in Redis, but maybe organized by user. - - # flatten to task_id -> detail if unique, but task_ids are UUIDs so unique. - flattened_tasks = {} + all_task_details: list[dict[str, Any]] = [] for user_id, tasks in global_tasks.items(): - for task_id, detail in tasks.items(): - # Inject user_id into detail for clarity - detail["user_id"] = user_id - flattened_tasks[task_id] = detail - - # 2. Get queue status from the monitor - # The monitor has get_tasks_status() which returns queue metrics - queue_status_data = {} + all_task_details.extend(tasks.values()) + + all_tasks_summary = _summarize_tasks(all_task_details) + + # Summarize scheduler queue metrics (running/remaining) if available + scheduler_waiting = 0 + scheduler_in_progress = 0 if mem_scheduler.task_schedule_monitor: - queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() + queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() or {} + for key, value in queue_status_data.items(): + if not key.startswith("scheduler:"): + continue + scheduler_in_progress += int(value.get("running", 0) or 0) + scheduler_waiting += int(value.get("remaining", 0) or 0) + + scheduler_summary = TaskSummary( + waiting=scheduler_waiting, + in_progress=scheduler_in_progress, + completed=all_tasks_summary.completed, + failed=all_tasks_summary.failed, + cancelled=all_tasks_summary.cancelled, + total=( + scheduler_waiting + + scheduler_in_progress + + all_tasks_summary.completed + + all_tasks_summary.failed + + all_tasks_summary.cancelled + ), + ) return AllStatusResponse( data=AllStatusResponseData( - running_tasks=flattened_tasks, - queue_status=queue_status_data, + scheduler_summary=scheduler_summary, + all_tasks_summary=all_tasks_summary, ) ) except Exception as err: diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index d62f3aae8..00a7c497e 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -867,16 +867,29 @@ class StatusResponse(BaseResponse[list[StatusResponseItem]]): message: str = "Memory get status successfully" +class TaskSummary(BaseModel): + """Aggregated counts of tasks by status.""" + + waiting: int = Field(0, description="Number of tasks waiting to run") + in_progress: int = Field(0, description="Number of tasks currently running") + completed: int = Field(0, description="Number of tasks completed") + failed: int = Field(0, description="Number of tasks failed") + cancelled: int = Field(0, description="Number of tasks cancelled") + total: int = Field(0, description="Total number of tasks counted") + + class AllStatusResponseData(BaseModel): - """Data model for full scheduler status.""" + """Aggregated scheduler status metrics.""" - running_tasks: dict[str, Any] = Field(..., description="Details of currently running tasks") - queue_status: dict[str, Any] = Field( - ..., description="Status of the task queue (running/remaining counts)" + scheduler_summary: TaskSummary = Field( + ..., description="Aggregated status for scheduler-managed tasks" + ) + all_tasks_summary: TaskSummary = Field( + ..., description="Aggregated status for all tracked tasks" ) class AllStatusResponse(BaseResponse[AllStatusResponseData]): """Response model for full scheduler status operations.""" - message: str = "Full scheduler status retrieved successfully" + message: str = "Scheduler status summary retrieved successfully" From 49f62acdde5847de6254d7e79b4b8d3782364587 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 12:31:17 +0800 Subject: [PATCH 4/7] Refine scheduler allstatus aggregation --- src/memos/api/handlers/scheduler_handler.py | 39 ++++++++++++--------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index a425e1a0a..846afe367 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -47,7 +47,7 @@ def handle_scheduler_allstatus( """ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: - """Aggregate counts by status for the provided task details.""" + """Aggregate counts by status for the provided task details (tracker data).""" counter = Counter() for detail in task_details: status = detail.get("status") @@ -73,30 +73,37 @@ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: all_tasks_summary = _summarize_tasks(all_task_details) - # Summarize scheduler queue metrics (running/remaining) if available - scheduler_waiting = 0 - scheduler_in_progress = 0 + # Scheduler view: assume tracker contains scheduler tasks; overlay queue monitor for live queue depth + sched_waiting = all_tasks_summary.waiting + sched_in_progress = all_tasks_summary.in_progress + sched_completed = all_tasks_summary.completed + sched_failed = all_tasks_summary.failed + sched_cancelled = all_tasks_summary.cancelled + + # If queue monitor is available, prefer its live waiting/in_progress counts if mem_scheduler.task_schedule_monitor: queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() or {} + scheduler_waiting = 0 + scheduler_in_progress = 0 for key, value in queue_status_data.items(): if not key.startswith("scheduler:"): continue scheduler_in_progress += int(value.get("running", 0) or 0) scheduler_waiting += int(value.get("remaining", 0) or 0) + sched_waiting = scheduler_waiting + sched_in_progress = scheduler_in_progress scheduler_summary = TaskSummary( - waiting=scheduler_waiting, - in_progress=scheduler_in_progress, - completed=all_tasks_summary.completed, - failed=all_tasks_summary.failed, - cancelled=all_tasks_summary.cancelled, - total=( - scheduler_waiting - + scheduler_in_progress - + all_tasks_summary.completed - + all_tasks_summary.failed - + all_tasks_summary.cancelled - ), + waiting=sched_waiting, + in_progress=sched_in_progress, + completed=sched_completed, + failed=sched_failed, + cancelled=sched_cancelled, + total=sched_waiting + + sched_in_progress + + sched_completed + + sched_failed + + sched_cancelled, ) return AllStatusResponse( From 3bc37ffc1290fc2c3b02a46c6045eeed59b1fb4c Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 14:06:10 +0800 Subject: [PATCH 5/7] Optimize scheduler allstatus aggregation --- src/memos/api/handlers/scheduler_handler.py | 58 ++++++++++++++++++--- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index 846afe367..8e5a024fa 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -64,14 +64,58 @@ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: total=total, ) - try: - # Get all task details from Redis and aggregate counts - global_tasks = status_tracker.get_all_tasks_global() - all_task_details: list[dict[str, Any]] = [] - for user_id, tasks in global_tasks.items(): - all_task_details.extend(tasks.values()) + def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | None: + """Stream status counts directly from Redis to avoid loading all task payloads.""" + redis_client = getattr(tracker, "redis", None) + if not redis_client: + return None + + counter = Counter() + + # Scan task_meta keys, then hscan each hash in batches + cursor: int | str = 0 + while True: + cursor, keys = redis_client.scan(cursor=cursor, match="memos:task_meta:*", count=200) + for key in keys: + h_cursor: int | str = 0 + while True: + h_cursor, fields = redis_client.hscan(key, cursor=h_cursor, count=500) + for value in fields.values(): + try: + payload = json.loads(value.decode("utf-8") if isinstance(value, bytes) else value) + status = payload.get("status") + if status: + counter[status] += 1 + except Exception: + continue + if h_cursor == 0 or h_cursor == "0": + break + if cursor == 0 or cursor == "0": + break + + if not counter: + return TaskSummary() # Empty summary if nothing found - all_tasks_summary = _summarize_tasks(all_task_details) + total = sum(counter.values()) + return TaskSummary( + waiting=counter.get("waiting", 0), + in_progress=counter.get("in_progress", 0), + completed=counter.get("completed", 0), + failed=counter.get("failed", 0), + cancelled=counter.get("cancelled", 0), + total=total, + ) + + try: + # Prefer streaming aggregation to avoid pulling all task payloads + all_tasks_summary = _aggregate_counts_from_redis(status_tracker) + if all_tasks_summary is None: + # Fallback: load all details then aggregate + global_tasks = status_tracker.get_all_tasks_global() + all_task_details: list[dict[str, Any]] = [] + for user_id, tasks in global_tasks.items(): + all_task_details.extend(tasks.values()) + all_tasks_summary = _summarize_tasks(all_task_details) # Scheduler view: assume tracker contains scheduler tasks; overlay queue monitor for live queue depth sched_waiting = all_tasks_summary.waiting From adff41f19a178a84ac2ab68b14430906e57a16a4 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 14:21:20 +0800 Subject: [PATCH 6/7] Add pending metrics and age filter to scheduler allstatus --- src/memos/api/handlers/scheduler_handler.py | 23 ++++++++++++++++++- src/memos/api/product_models.py | 1 + .../monitors/task_schedule_monitor.py | 13 +++++++---- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index 8e5a024fa..960b39f56 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -6,6 +6,7 @@ """ import json +from datetime import datetime, timezone import time import traceback from collections import Counter @@ -59,18 +60,22 @@ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: waiting=counter.get("waiting", 0), in_progress=counter.get("in_progress", 0), completed=counter.get("completed", 0), + pending=counter.get("pending", counter.get("in_progress", 0)), failed=counter.get("failed", 0), cancelled=counter.get("cancelled", 0), total=total, ) - def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | None: + def _aggregate_counts_from_redis( + tracker: TaskStatusTracker, max_age_seconds: float = 86400 + ) -> TaskSummary | None: """Stream status counts directly from Redis to avoid loading all task payloads.""" redis_client = getattr(tracker, "redis", None) if not redis_client: return None counter = Counter() + now = datetime.now(timezone.utc).timestamp() # Scan task_meta keys, then hscan each hash in batches cursor: int | str = 0 @@ -83,6 +88,16 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No for value in fields.values(): try: payload = json.loads(value.decode("utf-8") if isinstance(value, bytes) else value) + # Skip stale entries to reduce noise and load + ts = payload.get("submitted_at") or payload.get("started_at") + if ts: + try: + ts_dt = datetime.fromisoformat(ts) + ts_seconds = ts_dt.timestamp() + except Exception: + ts_seconds = None + if ts_seconds and (now - ts_seconds) > max_age_seconds: + continue status = payload.get("status") if status: counter[status] += 1 @@ -101,6 +116,7 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No waiting=counter.get("waiting", 0), in_progress=counter.get("in_progress", 0), completed=counter.get("completed", 0), + pending=counter.get("pending", counter.get("in_progress", 0)), failed=counter.get("failed", 0), cancelled=counter.get("cancelled", 0), total=total, @@ -120,6 +136,7 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No # Scheduler view: assume tracker contains scheduler tasks; overlay queue monitor for live queue depth sched_waiting = all_tasks_summary.waiting sched_in_progress = all_tasks_summary.in_progress + sched_pending = all_tasks_summary.pending sched_completed = all_tasks_summary.completed sched_failed = all_tasks_summary.failed sched_cancelled = all_tasks_summary.cancelled @@ -129,17 +146,21 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() or {} scheduler_waiting = 0 scheduler_in_progress = 0 + scheduler_pending = 0 for key, value in queue_status_data.items(): if not key.startswith("scheduler:"): continue scheduler_in_progress += int(value.get("running", 0) or 0) + scheduler_pending += int(value.get("pending", value.get("running", 0)) or 0) scheduler_waiting += int(value.get("remaining", 0) or 0) sched_waiting = scheduler_waiting sched_in_progress = scheduler_in_progress + sched_pending = scheduler_pending scheduler_summary = TaskSummary( waiting=sched_waiting, in_progress=sched_in_progress, + pending=sched_pending, completed=sched_completed, failed=sched_failed, cancelled=sched_cancelled, diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 00a7c497e..004bd81ea 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -872,6 +872,7 @@ class TaskSummary(BaseModel): waiting: int = Field(0, description="Number of tasks waiting to run") in_progress: int = Field(0, description="Number of tasks currently running") + pending: int = Field(0, description="Number of tasks fetched by workers but not yet acknowledged") completed: int = Field(0, description="Number of tasks completed") failed: int = Field(0, description="Number of tasks failed") cancelled: int = Field(0, description="Number of tasks cancelled") diff --git a/src/memos/mem_scheduler/monitors/task_schedule_monitor.py b/src/memos/mem_scheduler/monitors/task_schedule_monitor.py index 82e43d858..6e1fc695e 100644 --- a/src/memos/mem_scheduler/monitors/task_schedule_monitor.py +++ b/src/memos/mem_scheduler/monitors/task_schedule_monitor.py @@ -29,7 +29,7 @@ def __init__( @staticmethod def init_task_status() -> dict: - return {"running": 0, "remaining": 0} + return {"running": 0, "remaining": 0, "pending": 0} def get_tasks_status(self) -> dict: if isinstance(self.queue, SchedulerRedisQueue): @@ -158,6 +158,7 @@ def _get_local_tasks_status(self) -> dict: # running from dispatcher if available if self.dispatcher and hasattr(self.dispatcher, "get_running_task_count"): task_status["running"] = int(self.dispatcher.get_running_task_count()) + task_status["pending"] = task_status["running"] except Exception as e: logger.warning(f"Failed to collect local queue status: {e}") return task_status @@ -200,11 +201,13 @@ async def _collect_async() -> dict: if group.get("name") == self.queue.consumer_group: pending = int(group.get("pending", 0)) break - # Remaining = total messages (xlen) - pending for our group - remaining = max(0, int(xlen_val or 0)) + total_messages = max(0, int(xlen_val or 0)) + remaining = max(0, total_messages - pending) local[stream_key]["running"] += pending + local[stream_key]["pending"] += pending local[stream_key]["remaining"] += remaining local["running"] += pending + local["pending"] += pending local["remaining"] += remaining return local @@ -234,10 +237,12 @@ async def _collect_async() -> dict: for group in groups_info: if group.get("name") == self.queue.consumer_group: pending = int(group.get("pending", 0)) - remaining = max(0, xlen_val) + remaining = max(0, xlen_val - pending) task_status[stream_key]["running"] += pending + task_status[stream_key]["pending"] += pending task_status[stream_key]["remaining"] += remaining task_status["running"] += pending + task_status["pending"] += pending task_status["remaining"] += remaining break From e32315cf937ee67707f50aa12aaf03a25ee76c2e Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 5 Dec 2025 14:56:56 +0800 Subject: [PATCH 7/7] Adjust scheduler status pending semantics and ruff --- src/memos/api/handlers/scheduler_handler.py | 16 +++++++++------- src/memos/api/product_models.py | 4 +++- src/memos/api/routers/server_router.py | 2 +- .../monitors/task_schedule_monitor.py | 17 +++++++++++------ 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index 960b39f56..d12a8ace4 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -6,11 +6,11 @@ """ import json -from datetime import datetime, timezone import time import traceback -from collections import Counter +from collections import Counter +from datetime import datetime, timezone from typing import Any from fastapi import HTTPException @@ -60,7 +60,7 @@ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary: waiting=counter.get("waiting", 0), in_progress=counter.get("in_progress", 0), completed=counter.get("completed", 0), - pending=counter.get("pending", counter.get("in_progress", 0)), + pending=counter.get("pending", counter.get("waiting", 0)), failed=counter.get("failed", 0), cancelled=counter.get("cancelled", 0), total=total, @@ -87,7 +87,9 @@ def _aggregate_counts_from_redis( h_cursor, fields = redis_client.hscan(key, cursor=h_cursor, count=500) for value in fields.values(): try: - payload = json.loads(value.decode("utf-8") if isinstance(value, bytes) else value) + payload = json.loads( + value.decode("utf-8") if isinstance(value, bytes) else value + ) # Skip stale entries to reduce noise and load ts = payload.get("submitted_at") or payload.get("started_at") if ts: @@ -116,7 +118,7 @@ def _aggregate_counts_from_redis( waiting=counter.get("waiting", 0), in_progress=counter.get("in_progress", 0), completed=counter.get("completed", 0), - pending=counter.get("pending", counter.get("in_progress", 0)), + pending=counter.get("pending", counter.get("waiting", 0)), failed=counter.get("failed", 0), cancelled=counter.get("cancelled", 0), total=total, @@ -129,7 +131,7 @@ def _aggregate_counts_from_redis( # Fallback: load all details then aggregate global_tasks = status_tracker.get_all_tasks_global() all_task_details: list[dict[str, Any]] = [] - for user_id, tasks in global_tasks.items(): + for _, tasks in global_tasks.items(): all_task_details.extend(tasks.values()) all_tasks_summary = _summarize_tasks(all_task_details) @@ -151,7 +153,7 @@ def _aggregate_counts_from_redis( if not key.startswith("scheduler:"): continue scheduler_in_progress += int(value.get("running", 0) or 0) - scheduler_pending += int(value.get("pending", value.get("running", 0)) or 0) + scheduler_pending += int(value.get("pending", value.get("remaining", 0)) or 0) scheduler_waiting += int(value.get("remaining", 0) or 0) sched_waiting = scheduler_waiting sched_in_progress = scheduler_in_progress diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 004bd81ea..e77aee755 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -872,7 +872,9 @@ class TaskSummary(BaseModel): waiting: int = Field(0, description="Number of tasks waiting to run") in_progress: int = Field(0, description="Number of tasks currently running") - pending: int = Field(0, description="Number of tasks fetched by workers but not yet acknowledged") + pending: int = Field( + 0, description="Number of tasks fetched by workers but not yet acknowledged" + ) completed: int = Field(0, description="Number of tasks completed") failed: int = Field(0, description="Number of tasks failed") cancelled: int = Field(0, description="Number of tasks cancelled") diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index ea50a6bb0..576cca55e 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -24,11 +24,11 @@ from memos.api.handlers.feedback_handler import FeedbackHandler from memos.api.handlers.search_handler import SearchHandler from memos.api.product_models import ( + AllStatusResponse, APIADDRequest, APIChatCompleteRequest, APIFeedbackRequest, APISearchRequest, - AllStatusResponse, ChatRequest, DeleteMemoryRequest, DeleteMemoryResponse, diff --git a/src/memos/mem_scheduler/monitors/task_schedule_monitor.py b/src/memos/mem_scheduler/monitors/task_schedule_monitor.py index 6e1fc695e..14bed8316 100644 --- a/src/memos/mem_scheduler/monitors/task_schedule_monitor.py +++ b/src/memos/mem_scheduler/monitors/task_schedule_monitor.py @@ -154,11 +154,12 @@ def _get_local_tasks_status(self) -> dict: try: # remaining is the sum of per-stream qsize qsize_map = self.queue.qsize() - task_status["remaining"] = sum(v for k, v in qsize_map.items() if isinstance(v, int)) + remaining_total = sum(v for k, v in qsize_map.items() if isinstance(v, int)) + task_status["remaining"] = remaining_total + task_status["pending"] = remaining_total # running from dispatcher if available if self.dispatcher and hasattr(self.dispatcher, "get_running_task_count"): task_status["running"] = int(self.dispatcher.get_running_task_count()) - task_status["pending"] = task_status["running"] except Exception as e: logger.warning(f"Failed to collect local queue status: {e}") return task_status @@ -203,11 +204,13 @@ async def _collect_async() -> dict: break total_messages = max(0, int(xlen_val or 0)) remaining = max(0, total_messages - pending) + # running = in-progress (delivered, not yet acked) local[stream_key]["running"] += pending - local[stream_key]["pending"] += pending + # pending = not yet delivered (remaining) + local[stream_key]["pending"] += remaining local[stream_key]["remaining"] += remaining local["running"] += pending - local["pending"] += pending + local["pending"] += remaining local["remaining"] += remaining return local @@ -238,11 +241,13 @@ async def _collect_async() -> dict: if group.get("name") == self.queue.consumer_group: pending = int(group.get("pending", 0)) remaining = max(0, xlen_val - pending) + # running = in-progress (delivered, not yet acked) task_status[stream_key]["running"] += pending - task_status[stream_key]["pending"] += pending + # pending = not yet delivered (remaining) + task_status[stream_key]["pending"] += remaining task_status[stream_key]["remaining"] += remaining task_status["running"] += pending - task_status["pending"] += pending + task_status["pending"] += remaining task_status["remaining"] += remaining break