diff --git a/examples/mem_scheduler/task_stop_rerun.py b/examples/mem_scheduler/task_stop_rerun.py index 809e625ae..b5e62ff8f 100644 --- a/examples/mem_scheduler/task_stop_rerun.py +++ b/examples/mem_scheduler/task_stop_rerun.py @@ -25,7 +25,7 @@ def my_test_handler(messages: list[ScheduleMessageItem]): task_id = str(msg.item_id) file_path = tmp_dir / f"{task_id}.txt" try: - sleep(5) + sleep(1) file_path.write_text(f"Task {task_id} processed.\n") print(f"writing {file_path} done") except Exception as e: diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index ec542ac2e..d945db671 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -148,7 +148,7 @@ def __init__(self, config: BaseSchedulerConfig): self.monitor: SchedulerGeneralMonitor | None = None self.dispatcher_monitor: SchedulerDispatcherMonitor | None = None self.mem_reader = None # Will be set by MOSCore - self.status_tracker: TaskStatusTracker | None = None + self._status_tracker: TaskStatusTracker | None = None self.metrics = metrics self._monitor_thread = None self.memos_message_queue = ScheduleTaskQueue( @@ -156,14 +156,14 @@ def __init__(self, config: BaseSchedulerConfig): maxsize=self.max_internal_message_queue_size, disabled_handlers=self.disabled_handlers, orchestrator=self.orchestrator, - status_tracker=self.status_tracker, + status_tracker=self._status_tracker, ) self.dispatcher = SchedulerDispatcher( config=self.config, memos_message_queue=self.memos_message_queue, max_workers=self.thread_pool_max_workers, enable_parallel_dispatch=self.enable_parallel_dispatch, - status_tracker=self.status_tracker, + status_tracker=self._status_tracker, metrics=self.metrics, submit_web_logs=self._submit_web_logs, orchestrator=self.orchestrator, @@ -293,6 +293,38 @@ def mem_cube(self) -> BaseMemCube: ) return self.current_mem_cube + @property + def status_tracker(self) -> TaskStatusTracker | None: + """Lazy-initialized TaskStatusTracker. + + If the tracker is None, attempt to initialize from the Redis client + available via RedisSchedulerModule. This mirrors the lazy pattern used + by `mem_cube` so downstream modules can safely access the tracker. + """ + if self._status_tracker is None: + try: + self._status_tracker = TaskStatusTracker(self.redis) + # Propagate to submodules when created lazily + if self.dispatcher: + self.dispatcher.status_tracker = self._status_tracker + if self.memos_message_queue: + self.memos_message_queue.set_status_tracker(self._status_tracker) + except Exception as e: + logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) + return self._status_tracker + + @status_tracker.setter + def status_tracker(self, value: TaskStatusTracker | None) -> None: + """Setter that also propagates tracker to dependent modules.""" + self._status_tracker = value + try: + if self.dispatcher: + self.dispatcher.status_tracker = value + if self.memos_message_queue and value is not None: + self.memos_message_queue.set_status_tracker(value) + except Exception as e: + logger.warning(f"Failed to propagate status_tracker: {e}", exc_info=True) + @property def feedback_server(self) -> SimpleMemFeedback: """The memory cube associated with this MemChat.""" diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index c4e4a66bd..729345dc5 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -109,6 +109,8 @@ def __init__( ) self.metrics = metrics + self._status_tracker: TaskStatusTracker | None = None + # Use setter to allow propagation and keep a single source of truth self.status_tracker = status_tracker self.submit_web_logs = submit_web_logs # ADDED @@ -117,6 +119,37 @@ def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None: return # This is handled in BaseScheduler now + @property + def status_tracker(self) -> TaskStatusTracker | None: + """Lazy-initialized status tracker for the dispatcher. + + If the tracker is None, attempt to initialize from the Redis-backed + components available to the dispatcher (queue or orchestrator). + """ + if self._status_tracker is None: + try: + self._status_tracker = TaskStatusTracker(self.redis) + # Propagate to submodules when created lazily + if self.dispatcher: + self.dispatcher.status_tracker = self._status_tracker + if self.memos_message_queue: + self.memos_message_queue.set_status_tracker(self._status_tracker) + except Exception as e: + logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) + return self._status_tracker + + @status_tracker.setter + def status_tracker(self, value: TaskStatusTracker | None) -> None: + self._status_tracker = value + # Propagate to the queue if possible + try: + if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"): + self.memos_message_queue.status_tracker = value + except Exception as e: + logger.warning( + f"Failed to propagate dispatcher status_tracker to queue: {e}", exc_info=True + ) + def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem): """ Create a wrapper around the handler to track task execution and capture results. diff --git a/src/memos/mem_scheduler/webservice_modules/redis_service.py b/src/memos/mem_scheduler/webservice_modules/redis_service.py index d7ca6565f..5a056f954 100644 --- a/src/memos/mem_scheduler/webservice_modules/redis_service.py +++ b/src/memos/mem_scheduler/webservice_modules/redis_service.py @@ -46,6 +46,8 @@ def __init__(self): @property def redis(self) -> Any: + if self._redis_conn is None: + self.auto_initialize_redis() return self._redis_conn @redis.setter