diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2448490a6..5848fe176 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -464,6 +464,63 @@ def send_add_log_messages_to_local_env( if events: self._submit_web_logs(events, additional_log_info="send_add_log_messages_to_cloud_env") + def send_add_log_messages_to_cloud_env( + self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original + ): + """ + Cloud logging path for add/update events. + """ + kb_log_content: list[dict] = [] + info = msg.info or {} + # Process added items + for item in prepared_add_items: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages"), + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + + # Process updated items + for item_data in prepared_update_items_with_original: + item = item_data["new_item"] + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages"), + "operation": "UPDATE", + "memory_id": item.id, + "content": item.memory, + "original_content": item_data.get("original_content"), + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + + if kb_log_content: + logger.info( + f"[DIAGNOSTIC] general_scheduler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {msg.user_id}, mem_cube_id: {msg.mem_cube_id}, task_id: {msg.task_id}. KB content: {json.dumps(kb_log_content, indent=2)}" + ) + event = self.create_event_log( + label="knowledgeBaseUpdate", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." + event.task_id = msg.task_id + self._submit_web_logs([event]) + def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn @@ -502,6 +559,8 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: try: + if not messages: + return message = messages[0] mem_cube = self.current_mem_cube @@ -509,21 +568,31 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> mem_cube_id = message.mem_cube_id content = message.content - feedback_data = json.loads(content) + try: + feedback_data = json.loads(content) if isinstance(content, str) else content + if not isinstance(feedback_data, dict): + logger.error( + f"Failed to decode feedback_data or it is not a dict: {feedback_data}" + ) + return + except json.JSONDecodeError: + logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True) + return + task_id = feedback_data.get("task_id") or message.task_id feedback_result = self.feedback_server.process_feedback( user_id=user_id, user_name=mem_cube_id, - session_id=feedback_data["session_id"], - chat_history=feedback_data["history"], - retrieved_memory_ids=feedback_data["retrieved_memory_ids"], - feedback_content=feedback_data["feedback_content"], - feedback_time=feedback_data["feedback_time"], - task_id=feedback_data["task_id"], + session_id=feedback_data.get("session_id"), + chat_history=feedback_data.get("history", []), + retrieved_memory_ids=feedback_data.get("retrieved_memory_ids", []), + feedback_content=feedback_data.get("feedback_content"), + feedback_time=feedback_data.get("feedback_time"), + task_id=task_id, ) logger.info( - f"Successfully feedback memories for user_id={user_id}, mem_cube_id={mem_cube_id}" + f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}" ) should_send_log = ( @@ -533,13 +602,46 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> ) if feedback_result and should_send_log: feedback_content = [] - for _i, mem_item in enumerate(feedback_result): - feedback_content.append( - { - "content": mem_item.memory, - "id": mem_item["id"], - } + for mem_item in feedback_result: + # Safely access attributes, assuming mem_item could be dict or object + mem_id = ( + getattr(mem_item, "id", None) or mem_item.get("id") + if isinstance(mem_item, dict) + else None + ) + mem_memory = ( + getattr(mem_item, "memory", None) or mem_item.get("memory") + if isinstance(mem_item, dict) + else None ) + + if mem_id and mem_memory: + feedback_content.append( + { + "content": mem_memory, + "id": mem_id, + } + ) + else: + logger.warning( + "Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s", + user_id, + mem_cube_id, + task_id, + mem_item, + stack_info=True, + ) + + if not feedback_content: + logger.warning( + "No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s", + user_id, + mem_cube_id, + task_id, + stack_info=True, + ) + return + event = self.create_event_log( label="feedbackMemory", from_memory_type=USER_INPUT_TYPE, @@ -552,7 +654,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> memory_len=len(feedback_content), memcube_name=self._map_memcube_name(mem_cube_id), ) - event.task_id = message.task_id + event.task_id = task_id self._submit_web_logs([event]) except Exception as e: