From 9c3b3373556b96a093eaa204d107ad4736c6d468 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 18 Dec 2025 11:31:33 +0800 Subject: [PATCH 1/4] feat: propagate item_id in scheduler and dispatcher logs --- .../general_modules/scheduler_logger.py | 26 ++++++++++++------- src/memos/mem_scheduler/general_scheduler.py | 12 +++++++++ .../task_schedule_modules/dispatcher.py | 6 +++++ 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index 57d78676f..f52d8aa99 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -49,6 +49,7 @@ def create_autofilled_log_item( user_id: str, mem_cube_id: str, mem_cube: GeneralMemCube, + item_id: str | None = None, ) -> ScheduleLogForWebItem: if mem_cube is None: logger.error( @@ -94,16 +95,19 @@ def create_autofilled_log_item( ) memory_capacities["parameter_memory_capacity"] = 1 - log_message = ScheduleLogForWebItem( - user_id=user_id, - mem_cube_id=mem_cube_id, - label=label, - from_memory_type=from_memory_type, - to_memory_type=to_memory_type, - log_content=log_content, - current_memory_sizes=current_memory_sizes, - memory_capacities=memory_capacities, - ) + log_kwargs = { + "user_id": user_id, + "mem_cube_id": mem_cube_id, + "label": label, + "from_memory_type": from_memory_type, + "to_memory_type": to_memory_type, + "log_content": log_content, + "current_memory_sizes": current_memory_sizes, + "memory_capacities": memory_capacities, + } + if item_id: + log_kwargs["item_id"] = item_id + log_message = ScheduleLogForWebItem(**log_kwargs) return log_message @log_exceptions(logger=logger) @@ -120,6 +124,7 @@ def create_event_log( memory_len: int, memcube_name: str | None = None, log_content: str | None = None, + item_id: str | None = None, ) -> ScheduleLogForWebItem: item = self.create_autofilled_log_item( log_content=log_content or "", @@ -129,6 +134,7 @@ def create_event_log( user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube, + item_id=item_id, ) item.memcube_log_content = memcube_log_content item.metadata = metadata diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 86066f346..d3f3794a2 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -266,6 +266,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: metadata=[], memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id self._submit_web_logs([event]) @@ -322,6 +323,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: metadata=[], memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id self._submit_web_logs([event]) @@ -492,6 +494,7 @@ def send_add_log_messages_to_local_env( metadata=add_meta_legacy, memory_len=len(add_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id events.append(event) @@ -507,6 +510,7 @@ def send_add_log_messages_to_local_env( metadata=update_meta_legacy, memory_len=len(update_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id events.append(event) @@ -573,6 +577,7 @@ def send_add_log_messages_to_cloud_env( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." event.task_id = msg.task_id @@ -719,6 +724,7 @@ def _extract_fields(mem_item): metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=message.item_id, ) event.log_content = ( f"Knowledge Base Memory Update: {len(kb_log_content)} changes." @@ -788,6 +794,7 @@ def process_message(message: ScheduleMessageItem): user_name=user_name, custom_tags=info.get("custom_tags", None), task_id=message.task_id, + item_id=message.item_id, info=info, ) @@ -815,6 +822,7 @@ def _process_memories_with_reader( user_name: str, custom_tags: list[str] | None = None, task_id: str | None = None, + item_id: str | None = None, info: dict | None = None, ) -> None: logger.info( @@ -934,6 +942,7 @@ def _process_memories_with_reader( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.log_content = ( f"Knowledge Base Memory Update: {len(kb_log_content)} changes." @@ -979,6 +988,7 @@ def _process_memories_with_reader( metadata=add_meta_legacy, memory_len=len(add_content_legacy), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.task_id = task_id self._submit_web_logs([event]) @@ -1045,6 +1055,7 @@ def _process_memories_with_reader( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.log_content = f"Knowledge Base Memory Update failed: {exc!s}" event.task_id = task_id @@ -1212,6 +1223,7 @@ def process_message(message: ScheduleMessageItem): metadata=meta, memory_len=len(keys), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=message.item_id, ) self._submit_web_logs([event]) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 35df3db64..b048bbf6b 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -329,6 +329,7 @@ def _maybe_emit_task_completion( # messages in one batch can belong to different business task_ids; check each task_ids = set() task_id_to_doc_id = {} + task_id_to_item_id = {} for msg in messages: tid = getattr(msg, "task_id", None) @@ -340,6 +341,8 @@ def _maybe_emit_task_completion( sid = info.get("source_doc_id") if sid: task_id_to_doc_id[tid] = sid + if tid not in task_id_to_item_id: + task_id_to_item_id[tid] = msg.item_id if not task_ids: return @@ -356,6 +359,7 @@ def _maybe_emit_task_completion( for task_id in task_ids: source_doc_id = task_id_to_doc_id.get(task_id) + event_item_id = task_id_to_item_id.get(task_id) status_data = self.status_tracker.get_task_status_by_business_id( business_task_id=task_id, user_id=user_id ) @@ -369,6 +373,7 @@ def _maybe_emit_task_completion( # (Although if status is 'completed', local error shouldn't happen theoretically, # unless status update lags or is inconsistent. We trust status_tracker here.) event = ScheduleLogForWebItem( + item_id=event_item_id, task_id=task_id, user_id=user_id, mem_cube_id=mem_cube_id, @@ -393,6 +398,7 @@ def _maybe_emit_task_completion( error_msg = "Unknown error (check system logs)" event = ScheduleLogForWebItem( + item_id=event_item_id, task_id=task_id, user_id=user_id, mem_cube_id=mem_cube_id, From 175123ca50464652fc961bf978f20f22256c66e1 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 18 Dec 2025 12:59:15 +0800 Subject: [PATCH 2/4] remove local web log queue --- src/memos/mem_scheduler/base_scheduler.py | 70 +---------------------- 1 file changed, 2 insertions(+), 68 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 8e4ca9fcb..b2b4f6d0d 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -25,7 +25,6 @@ from memos.mem_cube.general import GeneralMemCube from memos.mem_feedback.simple_feedback import SimpleMemFeedback from memos.mem_scheduler.general_modules.init_components_for_scheduler import init_components -from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue as Queue from memos.mem_scheduler.general_modules.scheduler_logger import SchedulerLoggerModule from memos.mem_scheduler.memory_manage_modules.retriever import SchedulerRetriever from memos.mem_scheduler.monitors.dispatcher_monitor import SchedulerDispatcherMonitor @@ -37,7 +36,6 @@ DEFAULT_CONSUME_INTERVAL_SECONDS, DEFAULT_CONTEXT_WINDOW_SIZE, DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE, - DEFAULT_MAX_WEB_LOG_QUEUE_SIZE, DEFAULT_STARTUP_MODE, DEFAULT_THREAD_POOL_MAX_WORKERS, DEFAULT_TOP_K, @@ -121,12 +119,6 @@ def __init__(self, config: BaseSchedulerConfig): # optional configs self.disabled_handlers: list | None = self.config.get("disabled_handlers", None) - self.max_web_log_queue_size = self.config.get( - "max_web_log_queue_size", DEFAULT_MAX_WEB_LOG_QUEUE_SIZE - ) - self._web_log_message_queue: Queue[ScheduleLogForWebItem] = Queue( - maxsize=self.max_web_log_queue_size - ) self._consumer_thread = None # Reference to our consumer thread/process self._consumer_process = None # Reference to our consumer process self._running = False @@ -853,11 +845,6 @@ def _submit_web_logs( return for message in messages: - try: - self._web_log_message_queue.put(message) - except Exception as e: - logger.warning(f"Failed to put message to web log queue: {e}", stack_info=True) - message_info = message.debug_info() logger.debug(f"Submitted Scheduling log for web: {message_info}") @@ -867,67 +854,14 @@ def _submit_web_logs( ) self.rabbitmq_publish_message(message=message.to_dict()) logger.debug( - f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" + f"{len(messages)} submitted. additional_log_info: {additional_log_info}" ) def get_web_log_messages(self) -> list[dict]: """ Retrieve structured log messages from the queue and return JSON-serializable dicts. """ - raw_items: list[ScheduleLogForWebItem] = [] - while True: - try: - raw_items.append(self._web_log_message_queue.get_nowait()) - except Exception: - break - - def _map_label(label: str) -> str: - mapping = { - QUERY_TASK_LABEL: "addMessage", - ANSWER_TASK_LABEL: "addMessage", - ADD_TASK_LABEL: "addMemory", - MEM_UPDATE_TASK_LABEL: "updateMemory", - MEM_ORGANIZE_TASK_LABEL: "mergeMemory", - MEM_ARCHIVE_TASK_LABEL: "archiveMemory", - } - return mapping.get(label, label) - - def _normalize_item(item: ScheduleLogForWebItem) -> dict: - data = item.to_dict() - data["label"] = _map_label(data.get("label")) - memcube_content = getattr(item, "memcube_log_content", None) or [] - metadata = getattr(item, "metadata", None) or [] - - memcube_name = getattr(item, "memcube_name", None) - if not memcube_name and hasattr(self, "_map_memcube_name"): - memcube_name = self._map_memcube_name(item.mem_cube_id) - data["memcube_name"] = memcube_name - - memory_len = getattr(item, "memory_len", None) - if memory_len is None: - if data["label"] == "mergeMemory": - memory_len = len([c for c in memcube_content if c.get("type") != "postMerge"]) - elif memcube_content: - memory_len = len(memcube_content) - else: - memory_len = 1 if item.log_content else 0 - - data["memcube_log_content"] = memcube_content - data["memory_len"] = memory_len - - def _with_memory_time(meta: dict) -> dict: - enriched = dict(meta) - if "memory_time" not in enriched: - enriched["memory_time"] = enriched.get("updated_at") or enriched.get( - "update_at" - ) - return enriched - - data["metadata"] = [_with_memory_time(m) for m in metadata] - data["log_title"] = "" - return data - - return [_normalize_item(it) for it in raw_items] + return [] def _message_consumer(self) -> None: """ From 3dad771710273a72f4c5330478bfd5ff4815fbc3 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 18 Dec 2025 13:00:52 +0800 Subject: [PATCH 3/4] style: ruff format scheduler logs --- src/memos/mem_scheduler/base_scheduler.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index b2b4f6d0d..9103c44f4 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -49,12 +49,6 @@ ) from memos.mem_scheduler.schemas.monitor_schemas import MemoryMonitorItem from memos.mem_scheduler.schemas.task_schemas import ( - ADD_TASK_LABEL, - ANSWER_TASK_LABEL, - MEM_ARCHIVE_TASK_LABEL, - MEM_ORGANIZE_TASK_LABEL, - MEM_UPDATE_TASK_LABEL, - QUERY_TASK_LABEL, TaskPriorityLevel, ) from memos.mem_scheduler.task_schedule_modules.dispatcher import SchedulerDispatcher @@ -853,9 +847,7 @@ def _submit_web_logs( f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}" ) self.rabbitmq_publish_message(message=message.to_dict()) - logger.debug( - f"{len(messages)} submitted. additional_log_info: {additional_log_info}" - ) + logger.debug(f"{len(messages)} submitted. additional_log_info: {additional_log_info}") def get_web_log_messages(self) -> list[dict]: """ From f7a025f72be9cc77bbe425f90206bda399a45b2a Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 18 Dec 2025 13:09:00 +0800 Subject: [PATCH 4/4] test: adjust submit web logs for queue removal --- tests/mem_scheduler/test_scheduler.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/mem_scheduler/test_scheduler.py b/tests/mem_scheduler/test_scheduler.py index 5b68a8bad..743480876 100644 --- a/tests/mem_scheduler/test_scheduler.py +++ b/tests/mem_scheduler/test_scheduler.py @@ -139,15 +139,9 @@ def test_submit_web_logs(self): }, ) - # Empty the queue by consuming all elements - while not self.scheduler._web_log_message_queue.empty(): - self.scheduler._web_log_message_queue.get() - # Submit the log message self.scheduler._submit_web_logs(messages=log_message) - - # Verify the message was added to the queue - self.assertEqual(self.scheduler._web_log_message_queue.qsize(), 1) + # No local web log queue; ensure submission completes without error. # Get the actual message from the queue actual_message = self.scheduler._web_log_message_queue.get()