From 4ef6bc24f6fac0a525b9725c5dfb38e827d5e1e8 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Mon, 8 Dec 2025 21:21:22 +0800 Subject: [PATCH 1/8] Add task completion log event for cloud tasks --- .../task_schedule_modules/dispatcher.py | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index e3ce0d4e9..085a562c0 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -1,4 +1,5 @@ import concurrent +import os import threading import time @@ -19,7 +20,7 @@ from memos.mem_scheduler.schemas.general_schemas import ( DEFAULT_STOP_WAIT, ) -from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem +from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue @@ -200,6 +201,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if self.status_tracker: for msg in messages: self.status_tracker.task_completed(task_id=msg.item_id, user_id=msg.user_id) + self._maybe_emit_task_completion(messages) self.metrics.task_completed(user_id=m.user_id, task_type=m.label) emit_monitor_event( @@ -284,6 +286,56 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): return wrapped_handler + def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> None: + """If all item_ids under a business task are completed, emit a single completion log.""" + if not self.submit_web_logs or not self.status_tracker: + return + + # messages in one batch can belong to different business task_ids; check each + task_ids = {getattr(msg, "task_id", None) for msg in messages} + task_ids.discard(None) + if not task_ids: + return + + # Use the first message only for shared fields; mem_cube_id is same within a batch + first = messages[0] + user_id = first.user_id + mem_cube_id = first.mem_cube_id + + try: + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + ) + if not is_cloud_env: + return + + for task_id in task_ids: + status_data = self.status_tracker.get_task_status_by_business_id( + business_task_id=task_id, user_id=user_id + ) + if not status_data or status_data.get("status") != "completed": + continue + + event = ScheduleLogForWebItem( + task_id=task_id, + user_id=user_id, + mem_cube_id=mem_cube_id, + label="taskStatus", + from_memory_type="status", + to_memory_type="status", + log_content=f"Task {task_id} completed", + status="completed", + ) + self.submit_web_logs(event) + except Exception: + logger.warning( + "Failed to emit task completion log. user_id=%s mem_cube_id=%s task_ids=%s", + user_id, + mem_cube_id, + list(task_ids), + exc_info=True, + ) + def get_running_tasks( self, filter_func: Callable[[RunningTaskItem], bool] | None = None ) -> dict[str, RunningTaskItem]: From edd83890aa134dcd260dbbaaddac308f270329d7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Mon, 8 Dec 2025 21:36:49 +0800 Subject: [PATCH 2/8] Style: reformat dispatcher.py with ruff --- src/memos/mem_scheduler/task_schedule_modules/dispatcher.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 085a562c0..bbec254dd 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -303,9 +303,7 @@ def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> No mem_cube_id = first.mem_cube_id try: - is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - ) + is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" if not is_cloud_env: return From 1dc8ffbc694d21904f9c75185d25e859428cdbed Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 10:57:57 +0800 Subject: [PATCH 3/8] feat(scheduler): report task failure to web logs and fix exception handling --- .../task_schedule_modules/dispatcher.py | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index bbec254dd..1065701ea 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -239,6 +239,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): self.status_tracker.task_failed( task_id=msg.item_id, user_id=msg.user_id, error_message=str(e) ) + self._maybe_emit_task_completion(messages, error=e) emit_monitor_event( "finish", m, @@ -286,7 +287,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): return wrapped_handler - def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> None: + def _maybe_emit_task_completion( + self, messages: list[ScheduleMessageItem], error: Exception | None = None + ) -> None: """If all item_ids under a business task are completed, emit a single completion log.""" if not self.submit_web_logs or not self.status_tracker: return @@ -311,20 +314,35 @@ def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> No status_data = self.status_tracker.get_task_status_by_business_id( business_task_id=task_id, user_id=user_id ) - if not status_data or status_data.get("status") != "completed": + if not status_data: continue - event = ScheduleLogForWebItem( - task_id=task_id, - user_id=user_id, - mem_cube_id=mem_cube_id, - label="taskStatus", - from_memory_type="status", - to_memory_type="status", - log_content=f"Task {task_id} completed", - status="completed", - ) - self.submit_web_logs(event) + status = status_data.get("status") + + if status == "completed" and error is None: + event = ScheduleLogForWebItem( + task_id=task_id, + user_id=user_id, + mem_cube_id=mem_cube_id, + label="taskStatus", + from_memory_type="status", + to_memory_type="status", + log_content=f"Task {task_id} completed", + status="completed", + ) + self.submit_web_logs(event) + elif status == "failed" and error is not None: + event = ScheduleLogForWebItem( + task_id=task_id, + user_id=user_id, + mem_cube_id=mem_cube_id, + label="taskStatus", + from_memory_type="status", + to_memory_type="status", + log_content=f"Task {task_id} failed: {error!s}", + status="failed", + ) + self.submit_web_logs(event) except Exception: logger.warning( "Failed to emit task completion log. user_id=%s mem_cube_id=%s task_ids=%s", From fbf82ad521dbfd7e13ef4f94a45d2628582d9cb8 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 11:01:00 +0800 Subject: [PATCH 4/8] fix(scheduler): fix SchedulerRedisQueue status_tracker missing attribute error --- .../mem_scheduler/task_schedule_modules/redis_queue.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index c6a8c3d47..553d721b7 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -21,8 +21,8 @@ DEFAULT_STREAM_KEY_PREFIX, DEFAULT_STREAM_KEYS_REFRESH_INTERVAL_SEC, ) -from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator -from memos.mem_scheduler.webservice_modules.redis_service import RedisSchedulerModule +from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso +from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker logger = get_logger(__name__) @@ -51,6 +51,7 @@ def __init__( consumer_name: str | None = "scheduler_consumer", max_len: int | None = None, auto_delete_acked: bool = True, # Whether to automatically delete acknowledged messages + status_tracker: TaskStatusTracker | None = None, ): """ Initialize the Redis queue. @@ -62,6 +63,7 @@ def __init__( max_len: Maximum length of the stream (for memory management) maxsize: Maximum size of the queue (for Queue compatibility, ignored) auto_delete_acked: Whether to automatically delete acknowledged messages from stream + status_tracker: TaskStatusTracker instance for tracking task status """ super().__init__() # Stream configuration @@ -101,6 +103,7 @@ def __init__( self.message_pack_cache = deque() self.orchestrator = SchedulerOrchestrator() if orchestrator is None else orchestrator + self.status_tracker = status_tracker # Cached stream keys and refresh control self._stream_keys_cache: list[str] = [] From 4ed2eecac52a9f2a4fa492505c9a2597735760e8 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 11:14:18 +0800 Subject: [PATCH 5/8] feat(scheduler): implement status-driven failure logging and fix redis_queue status_tracker init --- .../task_schedule_modules/dispatcher.py | 20 ++++++++++++++++--- .../task_schedule_modules/redis_queue.py | 3 ++- .../mem_scheduler/utils/status_tracker.py | 4 ++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 1065701ea..ca6798726 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -319,7 +319,10 @@ def _maybe_emit_task_completion( status = status_data.get("status") - if status == "completed" and error is None: + if status == "completed": + # Only emit success log if we didn't just catch an exception locally + # (Although if status is 'completed', local error shouldn't happen theoretically, + # unless status update lags or is inconsistent. We trust status_tracker here.) event = ScheduleLogForWebItem( task_id=task_id, user_id=user_id, @@ -331,7 +334,18 @@ def _maybe_emit_task_completion( status="completed", ) self.submit_web_logs(event) - elif status == "failed" and error is not None: + + elif status == "failed": + # Construct error message + error_msg = str(error) if error else None + if not error_msg: + # Try to get errors from status_tracker aggregation + errors = status_data.get("errors", []) + if errors: + error_msg = "; ".join(errors) + else: + error_msg = "Unknown error (check system logs)" + event = ScheduleLogForWebItem( task_id=task_id, user_id=user_id, @@ -339,7 +353,7 @@ def _maybe_emit_task_completion( label="taskStatus", from_memory_type="status", to_memory_type="status", - log_content=f"Task {task_id} failed: {error!s}", + log_content=f"Task {task_id} failed: {error_msg}", status="failed", ) self.submit_web_logs(event) diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index 553d721b7..ea803f6da 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -21,8 +21,9 @@ DEFAULT_STREAM_KEY_PREFIX, DEFAULT_STREAM_KEYS_REFRESH_INTERVAL_SEC, ) -from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso +from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker +from memos.mem_scheduler.webservice_modules.redis_service import RedisSchedulerModule logger = get_logger(__name__) diff --git a/src/memos/mem_scheduler/utils/status_tracker.py b/src/memos/mem_scheduler/utils/status_tracker.py index f2edc5aea..d8c8d2cee 100644 --- a/src/memos/mem_scheduler/utils/status_tracker.py +++ b/src/memos/mem_scheduler/utils/status_tracker.py @@ -142,11 +142,14 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) -> # Get statuses for all items key = self._get_key(user_id) item_statuses = [] + errors = [] for item_id in item_ids: item_data_json = self.redis.hget(key, item_id) if item_data_json: item_data = json.loads(item_data_json) item_statuses.append(item_data["status"]) + if item_data.get("status") == "failed" and "error" in item_data: + errors.append(item_data["error"]) if not item_statuses: return None @@ -167,6 +170,7 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) -> "business_task_id": business_task_id, "item_count": len(item_ids), "item_statuses": item_statuses, + "errors": errors, } def get_all_tasks_global(self) -> dict[str, dict[str, dict]]: From f5b009ec5560a4319a4b70f1c17b6e89af959241 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 11:20:29 +0800 Subject: [PATCH 6/8] fix(scheduler): propagate status_tracker to SchedulerRedisQueue in ScheduleTaskQueue --- src/memos/mem_scheduler/task_schedule_modules/task_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index a01bc3fce..8c5e7da03 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -42,6 +42,7 @@ def __init__( consumer_group="scheduler_group", consumer_name="scheduler_consumer", orchestrator=self.orchestrator, + status_tracker=self.status_tracker, # Propagate status_tracker ) else: self.memos_message_queue = SchedulerLocalQueue(maxsize=self.maxsize) From 2e8ade86bae3567686385d2bcb0aa1510406c1ab Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 11:34:30 +0800 Subject: [PATCH 7/8] fix(scheduler): propagate status_tracker via setter to ensure proper initialization --- src/memos/mem_scheduler/base_scheduler.py | 3 ++- .../task_schedule_modules/task_queue.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index bc218172e..64f7474f8 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -224,7 +224,8 @@ def initialize_modules( if self.dispatcher: self.dispatcher.status_tracker = self.status_tracker if self.memos_message_queue: - self.memos_message_queue.status_tracker = self.status_tracker + # Use the setter to propagate to the inner queue (e.g. SchedulerRedisQueue) + self.memos_message_queue.set_status_tracker(self.status_tracker) # initialize submodules self.chat_llm = chat_llm self.process_llm = process_llm diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index 8c5e7da03..c20243242 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -49,6 +49,19 @@ def __init__( self.disabled_handlers = disabled_handlers + def set_status_tracker(self, status_tracker: TaskStatusTracker) -> None: + """ + Set the status tracker for this queue and propagate it to the underlying queue implementation. + + This allows the tracker to be injected after initialization (e.g., when Redis connection becomes available). + """ + self.status_tracker = status_tracker + if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"): + # SchedulerRedisQueue has status_tracker attribute (from our previous fix) + # SchedulerLocalQueue can also accept it dynamically if it doesn't use __slots__ + self.memos_message_queue.status_tracker = status_tracker + logger.info("Propagated status_tracker to underlying message queue") + def ack_message( self, user_id: str, From e71b9d8a451f33ac8fa985fe33308e4390cdebe5 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 9 Dec 2025 11:54:08 +0800 Subject: [PATCH 8/8] fix: remove redundant task completion status update in redis queue ack --- src/memos/mem_scheduler/task_schedule_modules/redis_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index ea803f6da..a90644bc0 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -358,7 +358,6 @@ def ack_message( self._redis_conn.xack(stream_key, self.consumer_group, redis_message_id) if message: - self.status_tracker.task_completed(task_id=message.item_id, user_id=message.user_id) logger.info( f"Message {message.item_id} | {message.label} | {message.content} has been acknowledged." )