From 647b4c08efeffec285e936e1e0e80f603695d99a Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 19:37:34 +0800 Subject: [PATCH 1/5] Add cloud add-log handler fallback for schedulers --- src/memos/mem_scheduler/general_scheduler.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2448490a6..6734b104a 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -464,6 +464,25 @@ def send_add_log_messages_to_local_env( if events: self._submit_web_logs(events, additional_log_info="send_add_log_messages_to_cloud_env") + def send_add_log_messages_to_cloud_env( + self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original + ): + """ + Cloud logging path for add/update events. + + Currently reuses local env logging to avoid missing method errors in subclasses. + """ + logger.info( + "send_add_log_messages_to_cloud_env fallback to local handler. user_id=%s mem_cube_id=%s task_id=%s item_id=%s", + msg.user_id, + msg.mem_cube_id, + msg.task_id, + msg.item_id, + ) + return self.send_add_log_messages_to_local_env( + msg, prepared_add_items, prepared_update_items_with_original + ) + def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn From e6009db0640bfed46810ee5bb1eb8b336ce27c89 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 19:42:04 +0800 Subject: [PATCH 2/5] Implement cloud add log handler for optimized scheduler --- src/memos/mem_scheduler/general_scheduler.py | 62 ++++++++++++++++---- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 6734b104a..2b6766e33 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -469,19 +469,57 @@ def send_add_log_messages_to_cloud_env( ): """ Cloud logging path for add/update events. - - Currently reuses local env logging to avoid missing method errors in subclasses. """ - logger.info( - "send_add_log_messages_to_cloud_env fallback to local handler. user_id=%s mem_cube_id=%s task_id=%s item_id=%s", - msg.user_id, - msg.mem_cube_id, - msg.task_id, - msg.item_id, - ) - return self.send_add_log_messages_to_local_env( - msg, prepared_add_items, prepared_update_items_with_original - ) + kb_log_content: list[dict] = [] + info = msg.info or {} + # Process added items + for item in prepared_add_items: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages"), + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + + # Process updated items + for item_data in prepared_update_items_with_original: + item = item_data["new_item"] + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages"), + "operation": "UPDATE", + "memory_id": item.id, + "content": item.memory, + "original_content": item_data.get("original_content"), + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + + if kb_log_content: + logger.info( + f"[DIAGNOSTIC] general_scheduler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {msg.user_id}, mem_cube_id: {msg.mem_cube_id}, task_id: {msg.task_id}. KB content: {json.dumps(kb_log_content, indent=2)}" + ) + event = self.create_event_log( + label="knowledgeBaseUpdate", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." + event.task_id = msg.task_id + self._submit_web_logs([event]) def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") From 21662bf5dc85974345d20936e2ec5e9a0c0c91a7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 19:51:52 +0800 Subject: [PATCH 3/5] Refine cloud add log handler output --- src/memos/mem_scheduler/general_scheduler.py | 51 ++++++++++++++------ 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2b6766e33..99ff175de 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -559,6 +559,8 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: try: + if not messages: + return message = messages[0] mem_cube = self.current_mem_cube @@ -566,21 +568,28 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> mem_cube_id = message.mem_cube_id content = message.content - feedback_data = json.loads(content) + try: + feedback_data = json.loads(content) if isinstance(content, str) else content + if not isinstance(feedback_data, dict): + logger.error(f"Failed to decode feedback_data or it is not a dict: {feedback_data}") + return + except json.JSONDecodeError: + logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True) + return feedback_result = self.feedback_server.process_feedback( user_id=user_id, user_name=mem_cube_id, - session_id=feedback_data["session_id"], - chat_history=feedback_data["history"], - retrieved_memory_ids=feedback_data["retrieved_memory_ids"], - feedback_content=feedback_data["feedback_content"], - feedback_time=feedback_data["feedback_time"], - task_id=feedback_data["task_id"], + session_id=feedback_data.get("session_id"), + chat_history=feedback_data.get("history", []), + retrieved_memory_ids=feedback_data.get("retrieved_memory_ids", []), + feedback_content=feedback_data.get("feedback_content"), + feedback_time=feedback_data.get("feedback_time"), + task_id=feedback_data.get("task_id"), ) logger.info( - f"Successfully feedback memories for user_id={user_id}, mem_cube_id={mem_cube_id}" + f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}" ) should_send_log = ( @@ -590,13 +599,25 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> ) if feedback_result and should_send_log: feedback_content = [] - for _i, mem_item in enumerate(feedback_result): - feedback_content.append( - { - "content": mem_item.memory, - "id": mem_item["id"], - } - ) + for mem_item in feedback_result: + # Safely access attributes, assuming mem_item could be dict or object + mem_id = getattr(mem_item, 'id', None) or mem_item.get('id') if isinstance(mem_item, dict) else None + mem_memory = getattr(mem_item, 'memory', None) or mem_item.get('memory') if isinstance(mem_item, dict) else None + + if mem_id and mem_memory: + feedback_content.append( + { + "content": mem_memory, + "id": mem_id, + } + ) + else: + logger.warning(f"Skipping malformed mem_item in feedback_result: {mem_item}") + + if not feedback_content: + logger.warning("No valid feedback content generated from feedback_result.") + return + event = self.create_event_log( label="feedbackMemory", from_memory_type=USER_INPUT_TYPE, From a4f5cb50f2517de60b20570990a2e37a182d2592 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 19:55:09 +0800 Subject: [PATCH 4/5] Format general_scheduler with ruff --- src/memos/mem_scheduler/general_scheduler.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 99ff175de..1186c7b77 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -571,7 +571,9 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> try: feedback_data = json.loads(content) if isinstance(content, str) else content if not isinstance(feedback_data, dict): - logger.error(f"Failed to decode feedback_data or it is not a dict: {feedback_data}") + logger.error( + f"Failed to decode feedback_data or it is not a dict: {feedback_data}" + ) return except json.JSONDecodeError: logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True) @@ -601,8 +603,16 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> feedback_content = [] for mem_item in feedback_result: # Safely access attributes, assuming mem_item could be dict or object - mem_id = getattr(mem_item, 'id', None) or mem_item.get('id') if isinstance(mem_item, dict) else None - mem_memory = getattr(mem_item, 'memory', None) or mem_item.get('memory') if isinstance(mem_item, dict) else None + mem_id = ( + getattr(mem_item, "id", None) or mem_item.get("id") + if isinstance(mem_item, dict) + else None + ) + mem_memory = ( + getattr(mem_item, "memory", None) or mem_item.get("memory") + if isinstance(mem_item, dict) + else None + ) if mem_id and mem_memory: feedback_content.append( @@ -612,7 +622,9 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> } ) else: - logger.warning(f"Skipping malformed mem_item in feedback_result: {mem_item}") + logger.warning( + f"Skipping malformed mem_item in feedback_result: {mem_item}" + ) if not feedback_content: logger.warning("No valid feedback content generated from feedback_result.") From 87214c0a461cfafaab42f2b15031936464f6ef2c Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 19:59:34 +0800 Subject: [PATCH 5/5] Add stack_info to scheduler logging and format --- src/memos/mem_scheduler/general_scheduler.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 1186c7b77..5848fe176 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -579,6 +579,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True) return + task_id = feedback_data.get("task_id") or message.task_id feedback_result = self.feedback_server.process_feedback( user_id=user_id, user_name=mem_cube_id, @@ -587,7 +588,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> retrieved_memory_ids=feedback_data.get("retrieved_memory_ids", []), feedback_content=feedback_data.get("feedback_content"), feedback_time=feedback_data.get("feedback_time"), - task_id=feedback_data.get("task_id"), + task_id=task_id, ) logger.info( @@ -623,11 +624,22 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> ) else: logger.warning( - f"Skipping malformed mem_item in feedback_result: {mem_item}" + "Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s", + user_id, + mem_cube_id, + task_id, + mem_item, + stack_info=True, ) if not feedback_content: - logger.warning("No valid feedback content generated from feedback_result.") + logger.warning( + "No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s", + user_id, + mem_cube_id, + task_id, + stack_info=True, + ) return event = self.create_event_log( @@ -642,7 +654,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> memory_len=len(feedback_content), memcube_name=self._map_memcube_name(mem_cube_id), ) - event.task_id = message.task_id + event.task_id = task_id self._submit_web_logs([event]) except Exception as e: