From 3a7758c3e7c2373eefd44aab5d4d4c25ab15912f Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 18 Dec 2025 11:31:33 +0800 Subject: [PATCH] feat: propagate item_id in scheduler and dispatcher logs --- .../general_modules/scheduler_logger.py | 26 ++++++++++++------- src/memos/mem_scheduler/general_scheduler.py | 12 +++++++++ .../task_schedule_modules/dispatcher.py | 6 +++++ 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index 57d78676f..f52d8aa99 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -49,6 +49,7 @@ def create_autofilled_log_item( user_id: str, mem_cube_id: str, mem_cube: GeneralMemCube, + item_id: str | None = None, ) -> ScheduleLogForWebItem: if mem_cube is None: logger.error( @@ -94,16 +95,19 @@ def create_autofilled_log_item( ) memory_capacities["parameter_memory_capacity"] = 1 - log_message = ScheduleLogForWebItem( - user_id=user_id, - mem_cube_id=mem_cube_id, - label=label, - from_memory_type=from_memory_type, - to_memory_type=to_memory_type, - log_content=log_content, - current_memory_sizes=current_memory_sizes, - memory_capacities=memory_capacities, - ) + log_kwargs = { + "user_id": user_id, + "mem_cube_id": mem_cube_id, + "label": label, + "from_memory_type": from_memory_type, + "to_memory_type": to_memory_type, + "log_content": log_content, + "current_memory_sizes": current_memory_sizes, + "memory_capacities": memory_capacities, + } + if item_id: + log_kwargs["item_id"] = item_id + log_message = ScheduleLogForWebItem(**log_kwargs) return log_message @log_exceptions(logger=logger) @@ -120,6 +124,7 @@ def create_event_log( memory_len: int, memcube_name: str | None = None, log_content: str | None = None, + item_id: str | None = None, ) -> ScheduleLogForWebItem: item = self.create_autofilled_log_item( log_content=log_content or "", @@ -129,6 +134,7 @@ def create_event_log( user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube, + item_id=item_id, ) item.memcube_log_content = memcube_log_content item.metadata = metadata diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 86066f346..d3f3794a2 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -266,6 +266,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: metadata=[], memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id self._submit_web_logs([event]) @@ -322,6 +323,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: metadata=[], memory_len=1, memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id self._submit_web_logs([event]) @@ -492,6 +494,7 @@ def send_add_log_messages_to_local_env( metadata=add_meta_legacy, memory_len=len(add_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id events.append(event) @@ -507,6 +510,7 @@ def send_add_log_messages_to_local_env( metadata=update_meta_legacy, memory_len=len(update_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.task_id = msg.task_id events.append(event) @@ -573,6 +577,7 @@ def send_add_log_messages_to_cloud_env( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(msg.mem_cube_id), + item_id=msg.item_id, ) event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." event.task_id = msg.task_id @@ -719,6 +724,7 @@ def _extract_fields(mem_item): metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=message.item_id, ) event.log_content = ( f"Knowledge Base Memory Update: {len(kb_log_content)} changes." @@ -788,6 +794,7 @@ def process_message(message: ScheduleMessageItem): user_name=user_name, custom_tags=info.get("custom_tags", None), task_id=message.task_id, + item_id=message.item_id, info=info, ) @@ -815,6 +822,7 @@ def _process_memories_with_reader( user_name: str, custom_tags: list[str] | None = None, task_id: str | None = None, + item_id: str | None = None, info: dict | None = None, ) -> None: logger.info( @@ -934,6 +942,7 @@ def _process_memories_with_reader( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.log_content = ( f"Knowledge Base Memory Update: {len(kb_log_content)} changes." @@ -979,6 +988,7 @@ def _process_memories_with_reader( metadata=add_meta_legacy, memory_len=len(add_content_legacy), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.task_id = task_id self._submit_web_logs([event]) @@ -1045,6 +1055,7 @@ def _process_memories_with_reader( metadata=None, memory_len=len(kb_log_content), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=item_id, ) event.log_content = f"Knowledge Base Memory Update failed: {exc!s}" event.task_id = task_id @@ -1212,6 +1223,7 @@ def process_message(message: ScheduleMessageItem): metadata=meta, memory_len=len(keys), memcube_name=self._map_memcube_name(mem_cube_id), + item_id=message.item_id, ) self._submit_web_logs([event]) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 35df3db64..b048bbf6b 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -329,6 +329,7 @@ def _maybe_emit_task_completion( # messages in one batch can belong to different business task_ids; check each task_ids = set() task_id_to_doc_id = {} + task_id_to_item_id = {} for msg in messages: tid = getattr(msg, "task_id", None) @@ -340,6 +341,8 @@ def _maybe_emit_task_completion( sid = info.get("source_doc_id") if sid: task_id_to_doc_id[tid] = sid + if tid not in task_id_to_item_id: + task_id_to_item_id[tid] = msg.item_id if not task_ids: return @@ -356,6 +359,7 @@ def _maybe_emit_task_completion( for task_id in task_ids: source_doc_id = task_id_to_doc_id.get(task_id) + event_item_id = task_id_to_item_id.get(task_id) status_data = self.status_tracker.get_task_status_by_business_id( business_task_id=task_id, user_id=user_id ) @@ -369,6 +373,7 @@ def _maybe_emit_task_completion( # (Although if status is 'completed', local error shouldn't happen theoretically, # unless status update lags or is inconsistent. We trust status_tracker here.) event = ScheduleLogForWebItem( + item_id=event_item_id, task_id=task_id, user_id=user_id, mem_cube_id=mem_cube_id, @@ -393,6 +398,7 @@ def _maybe_emit_task_completion( error_msg = "Unknown error (check system logs)" event = ScheduleLogForWebItem( + item_id=event_item_id, task_id=task_id, user_id=user_id, mem_cube_id=mem_cube_id,