From 8428193ad966e319b570c7d3bfe49e6d96d532f5 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 28 Nov 2025 14:19:47 +0800 Subject: [PATCH 1/3] feat: Add consistent logging for async memory addition --- src/memos/mem_scheduler/general_scheduler.py | 76 ++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2093083e6..3f2a3393c 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -504,6 +504,8 @@ def process_message(message: ScheduleMessageItem): text_mem=text_mem, user_name=user_name, custom_tags=info.get("custom_tags", None), + task_id=message.task_id, + info=info, ) logger.info( @@ -529,6 +531,8 @@ def _process_memories_with_reader( text_mem: TreeTextMemory, user_name: str, custom_tags: list[str] | None = None, + task_id: str | None = None, + info: dict | None = None, ) -> None: """ Process memories using mem_reader for enhanced memory processing. @@ -602,6 +606,78 @@ def _process_memories_with_reader( logger.info( f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" ) + + # LOGGING BLOCK START + # This block is replicated from _add_message_consumer to ensure consistent logging + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") + == "memos-memory-change" + ) + if is_cloud_env: + # New: Knowledge Base Logging (Cloud Service) + kb_log_content = [] + for item in flattened_memories: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages") if info else "Messages", + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + if kb_log_content: + event = self.create_event_log( + label="knowledgeBaseUpdate", + log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.", + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + self._submit_web_logs([event]) + else: + # Existing: Playground/Default Logging + add_content_legacy: list[dict] = [] + add_meta_legacy: list[dict] = [] + for item_id, item in zip(enhanced_mem_ids, flattened_memories): + key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory) + add_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item_id}) + add_meta_legacy.append( + { + "ref_id": item_id, + "id": item_id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None), + } + ) + if add_content_legacy: + event = self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=add_content_legacy, + metadata=add_meta_legacy, + memory_len=len(add_content_legacy), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + self._submit_web_logs([event]) + # LOGGING BLOCK END else: logger.info("No enhanced memories generated by mem_reader") else: From 146a09d5d851d8b4e3d6ac6e9d5ee67e36d89da7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 28 Nov 2025 14:41:37 +0800 Subject: [PATCH 2/3] fix: log mem_reader failures with task status --- src/memos/mem_scheduler/general_scheduler.py | 36 ++++++++++++- .../task_schedule_modules/dispatcher.py | 54 +++++++++---------- 2 files changed, 62 insertions(+), 28 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 3f2a3393c..1e5f841f0 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -544,6 +544,7 @@ def _process_memories_with_reader( text_mem: Text memory instance custom_tags: Optional list of custom tags for memory processing """ + kb_log_content: list[dict] = [] try: # Get the mem_reader from the parent MOSCore if not hasattr(self, "mem_reader") or self.mem_reader is None: @@ -706,10 +707,43 @@ def _process_memories_with_reader( logger.info("Remove and Refresh Memories") logger.debug(f"Finished add {user_id} memory: {mem_ids}") - except Exception: + except Exception as exc: logger.error( f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True ) + with contextlib.suppress(Exception): + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + ) + if is_cloud_env: + if not kb_log_content: + trigger_source = info.get("trigger_source", "Messages") if info else "Messages" + kb_log_content = [ + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": trigger_source, + "operation": "ADD", + "memory_id": mem_id, + "content": None, + "original_content": None, + "source_doc_id": None, + } + for mem_id in mem_ids + ] + event = self.create_event_log( + label="knowledgeBaseUpdate", + log_content=f"Knowledge Base Memory Update failed: {exc!s}", + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + event.status = "failed" + self._submit_web_logs([event]) def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.") diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index df3e2055e..e31de6036 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -159,19 +159,19 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): ) self.metrics.task_completed(user_id=m.user_id, task_type=m.label) - is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - ) - if self.submit_web_logs and is_cloud_env: - status_log = ScheduleLogForWebItem( - user_id=task_item.user_id, - mem_cube_id=task_item.mem_cube_id, - item_id=task_item.item_id, - label=m.label, - log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.", - status="completed", - ) - self.submit_web_logs([status_log]) + # is_cloud_env = ( + # os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + # ) + # if self.submit_web_logs and is_cloud_env: + # status_log = ScheduleLogForWebItem( + # user_id=task_item.user_id, + # mem_cube_id=task_item.mem_cube_id, + # item_id=task_item.item_id, + # label=m.label, + # log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.", + # status="completed", + # ) + # self.submit_web_logs([status_log]) # acknowledge redis messages if self.use_redis_queue and self.memos_message_queue is not None: @@ -211,20 +211,20 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): self._completed_tasks.pop(0) logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}") - is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - ) - if self.submit_web_logs and is_cloud_env: - status_log = ScheduleLogForWebItem( - user_id=task_item.user_id, - mem_cube_id=task_item.mem_cube_id, - item_id=task_item.item_id, - label=m.label, - log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.", - status="failed", - exception=str(e), - ) - self.submit_web_logs([status_log]) + # is_cloud_env = ( + # os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + # ) + # if self.submit_web_logs and is_cloud_env: + # status_log = ScheduleLogForWebItem( + # user_id=task_item.user_id, + # mem_cube_id=task_item.mem_cube_id, + # item_id=task_item.item_id, + # label=m.label, + # log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.", + # status="failed", + # exception=str(e), + # ) + # self.submit_web_logs([status_log]) raise return wrapped_handler From 75092cf9fb2a18a7062f74cf06a3388f561f6a7f Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Fri, 28 Nov 2025 14:48:37 +0800 Subject: [PATCH 3/3] chore: format scheduler logging files --- src/memos/mem_scheduler/general_scheduler.py | 26 +++++++++++----- .../task_schedule_modules/dispatcher.py | 31 +------------------ 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 1e5f841f0..c3dba6d8c 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -611,8 +611,7 @@ def _process_memories_with_reader( # LOGGING BLOCK START # This block is replicated from _add_message_consumer to ensure consistent logging is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") - == "memos-memory-change" + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" ) if is_cloud_env: # New: Knowledge Base Logging (Cloud Service) @@ -621,7 +620,9 @@ def _process_memories_with_reader( kb_log_content.append( { "log_source": "KNOWLEDGE_BASE_LOG", - "trigger_source": info.get("trigger_source", "Messages") if info else "Messages", + "trigger_source": info.get("trigger_source", "Messages") + if info + else "Messages", "operation": "ADD", "memory_id": item.id, "content": item.memory, @@ -647,9 +648,15 @@ def _process_memories_with_reader( # Existing: Playground/Default Logging add_content_legacy: list[dict] = [] add_meta_legacy: list[dict] = [] - for item_id, item in zip(enhanced_mem_ids, flattened_memories): - key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory) - add_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item_id}) + for item_id, item in zip( + enhanced_mem_ids, flattened_memories, strict=False + ): + key = getattr(item.metadata, "key", None) or transform_name_to_key( + name=item.memory + ) + add_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) add_meta_legacy.append( { "ref_id": item_id, @@ -660,7 +667,8 @@ def _process_memories_with_reader( "status": item.metadata.status, "confidence": item.metadata.confidence, "tags": item.metadata.tags, - "updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None), + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), } ) if add_content_legacy: @@ -717,7 +725,9 @@ def _process_memories_with_reader( ) if is_cloud_env: if not kb_log_content: - trigger_source = info.get("trigger_source", "Messages") if info else "Messages" + trigger_source = ( + info.get("trigger_source", "Messages") if info else "Messages" + ) kb_log_content = [ { "log_source": "KNOWLEDGE_BASE_LOG", diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index e31de6036..c361a77a2 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -1,5 +1,4 @@ import concurrent -import os import threading import time @@ -15,7 +14,7 @@ from memos.mem_scheduler.schemas.general_schemas import ( DEFAULT_STOP_WAIT, ) -from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker @@ -159,20 +158,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): ) self.metrics.task_completed(user_id=m.user_id, task_type=m.label) - # is_cloud_env = ( - # os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - # ) - # if self.submit_web_logs and is_cloud_env: - # status_log = ScheduleLogForWebItem( - # user_id=task_item.user_id, - # mem_cube_id=task_item.mem_cube_id, - # item_id=task_item.item_id, - # label=m.label, - # log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.", - # status="completed", - # ) - # self.submit_web_logs([status_log]) - # acknowledge redis messages if self.use_redis_queue and self.memos_message_queue is not None: for msg in messages: @@ -211,20 +196,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): self._completed_tasks.pop(0) logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}") - # is_cloud_env = ( - # os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - # ) - # if self.submit_web_logs and is_cloud_env: - # status_log = ScheduleLogForWebItem( - # user_id=task_item.user_id, - # mem_cube_id=task_item.mem_cube_id, - # item_id=task_item.item_id, - # label=m.label, - # log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.", - # status="failed", - # exception=str(e), - # ) - # self.submit_web_logs([status_log]) raise return wrapped_handler