diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 50f21a092..a7441ec39 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -589,7 +589,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt self.memos_message_queue.submit_messages(messages=messages) def _submit_web_logs( - self, messages: ScheduleLogForWebItem | list[ScheduleLogForWebItem] + self, + messages: ScheduleLogForWebItem | list[ScheduleLogForWebItem], + additional_log_info: str | None = None, ) -> None: """Submit log messages to the web log queue and optionally to RabbitMQ. @@ -620,7 +622,9 @@ def _submit_web_logs( if self.is_rabbitmq_connected(): logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}") self.rabbitmq_publish_message(message=message.to_dict()) - logger.debug(f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue.") + logger.debug( + f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" + ) def get_web_log_messages(self) -> list[dict]: """ diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index f7c8e9d32..fecfba53d 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -241,6 +241,209 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: except Exception: logger.exception("Failed to record addMessage log for answer") + def log_add_messages(self, msg: ScheduleMessageItem): + try: + userinput_memory_ids = json.loads(msg.content) + except Exception as e: + logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True) + userinput_memory_ids = [] + + # Prepare data for both logging paths, fetching original content for updates + prepared_add_items = [] + prepared_update_items_with_original = [] + + for memory_id in userinput_memory_ids: + try: + # This mem_item represents the NEW content that was just added/processed + mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get( + memory_id=memory_id + ) + # Check if a memory with the same key already exists (determining if it's an update) + key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( + name=mem_item.memory + ) + exists = False + original_content = None + original_item_id = None + + # Only check graph_store if a key exists and the text_mem has a graph_store + if key and hasattr(self.current_mem_cube.text_mem, "graph_store"): + candidates = self.current_mem_cube.text_mem.graph_store.get_by_metadata( + [ + {"field": "key", "op": "=", "value": key}, + { + "field": "memory_type", + "op": "=", + "value": mem_item.metadata.memory_type, + }, + ] + ) + if candidates: + exists = True + original_item_id = candidates[0] + # Crucial step: Fetch the original content for updates + # This `get` is for the *existing* memory that will be updated + original_mem_item = self.current_mem_cube.text_mem.get( + memory_id=original_item_id + ) + original_content = original_mem_item.memory + + if exists: + prepared_update_items_with_original.append( + { + "new_item": mem_item, + "original_content": original_content, + "original_item_id": original_item_id, + } + ) + else: + prepared_add_items.append(mem_item) + + except Exception: + logger.warning( + f"This MemoryItem {memory_id} has already been deleted or an error occurred during preparation.", + stack_info=True, + ) + return prepared_add_items, prepared_update_items_with_original + + def send_add_log_messages_to_cloud_env( + self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original + ): + # New: Knowledge Base Logging (Cloud Service) + kb_log_content = [] + for item in prepared_add_items: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": msg.info.get("trigger_source", "Messages") + if msg.info + else "Messages", # Assuming msg.info is available and contains trigger_source + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + for item_data in prepared_update_items_with_original: + new_item = item_data["new_item"] + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": msg.info.get("trigger_source", "Messages") + if msg.info + else "Messages", + "operation": "UPDATE", + "memory_id": new_item.id, + "content": new_item.memory, + "original_content": item_data["original_content"], # Now correctly fetched + "source_doc_id": getattr(new_item.metadata, "source_doc_id", None), + } + ) + + if kb_log_content: + event = self.create_event_log( + label="knowledgeBaseUpdate", + # 1) Remove log_content parameter + # 2) Add memory_type + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + # 3) Assign log_content afterwards + event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes." + event.task_id = msg.task_id + self._submit_web_logs([event], additional_log_info="send_add_log_messages_to_cloud_env") + + def send_add_log_messages_to_local_env( + self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original + ): + # Existing: Playground/Default Logging + # Reconstruct add_content/add_meta/update_content/update_meta from prepared_items + # This ensures existing logging path continues to work with pre-existing data structures + add_content_legacy: list[dict] = [] + add_meta_legacy: list[dict] = [] + update_content_legacy: list[dict] = [] + update_meta_legacy: list[dict] = [] + + for item in prepared_add_items: + key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory) + add_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id}) + add_meta_legacy.append( + { + "ref_id": item.id, + "id": item.id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + + for item_data in prepared_update_items_with_original: + item = item_data["new_item"] + key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory) + update_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id}) + update_meta_legacy.append( + { + "ref_id": item.id, + "id": item.id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + + events = [] + if add_content_legacy: + event = self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=add_content_legacy, + metadata=add_meta_legacy, + memory_len=len(add_content_legacy), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + event.task_id = msg.task_id + events.append(event) + if update_content_legacy: + event = self.create_event_log( + label="updateMemory", + from_memory_type=LONG_TERM_MEMORY_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=update_content_legacy, + metadata=update_meta_legacy, + memory_len=len(update_content_legacy), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + event.task_id = msg.task_id + events.append(event) + logger.info(f"send_add_log_messages_to_local_env: {len(events)}") + if events: + self._submit_web_logs(events, additional_log_info="send_add_log_messages_to_cloud_env") + def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.") # Process the query in a session turn @@ -256,71 +459,9 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: # Process each message in the batch for msg in batch: - try: - userinput_memory_ids = json.loads(msg.content) - except Exception as e: - logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True) - userinput_memory_ids = [] - - # Prepare data for both logging paths, fetching original content for updates - prepared_add_items = [] - prepared_update_items_with_original = [] - - for memory_id in userinput_memory_ids: - try: - # This mem_item represents the NEW content that was just added/processed - mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get( - memory_id=memory_id - ) - # Check if a memory with the same key already exists (determining if it's an update) - key = getattr( - mem_item.metadata, "key", None - ) or transform_name_to_key(name=mem_item.memory) - exists = False - original_content = None - original_item_id = None - - # Only check graph_store if a key exists and the text_mem has a graph_store - if key and hasattr(self.current_mem_cube.text_mem, "graph_store"): - candidates = ( - self.current_mem_cube.text_mem.graph_store.get_by_metadata( - [ - {"field": "key", "op": "=", "value": key}, - { - "field": "memory_type", - "op": "=", - "value": mem_item.metadata.memory_type, - }, - ] - ) - ) - if candidates: - exists = True - original_item_id = candidates[0] - # Crucial step: Fetch the original content for updates - # This `get` is for the *existing* memory that will be updated - original_mem_item = self.current_mem_cube.text_mem.get( - memory_id=original_item_id - ) - original_content = original_mem_item.memory - - if exists: - prepared_update_items_with_original.append( - { - "new_item": mem_item, - "original_content": original_content, - "original_item_id": original_item_id, - } - ) - else: - prepared_add_items.append(mem_item) - - except Exception: - logger.warning( - f"This MemoryItem {memory_id} has already been deleted or an error occurred during preparation." - ) - continue - + prepared_add_items, prepared_update_items_with_original = ( + self.log_add_messages(msg=msg) + ) # Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default is_cloud_env = ( os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") @@ -328,152 +469,13 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: ) if is_cloud_env: - # New: Knowledge Base Logging (Cloud Service) - kb_log_content = [] - for item in prepared_add_items: - kb_log_content.append( - { - "log_source": "KNOWLEDGE_BASE_LOG", - "trigger_source": msg.info.get("trigger_source", "Messages") - if msg.info - else "Messages", # Assuming msg.info is available and contains trigger_source - "operation": "ADD", - "memory_id": item.id, - "content": item.memory, - "original_content": None, - "source_doc_id": getattr( - item.metadata, "source_doc_id", None - ), - } - ) - for item_data in prepared_update_items_with_original: - new_item = item_data["new_item"] - kb_log_content.append( - { - "log_source": "KNOWLEDGE_BASE_LOG", - "trigger_source": msg.info.get("trigger_source", "Messages") - if msg.info - else "Messages", - "operation": "UPDATE", - "memory_id": new_item.id, - "content": new_item.memory, - "original_content": item_data[ - "original_content" - ], # Now correctly fetched - "source_doc_id": getattr( - new_item.metadata, "source_doc_id", None - ), - } - ) - - if kb_log_content: - event = self.create_event_log( - label="knowledgeBaseUpdate", - # 1. 移除 log_content 参数 - # 2. 补充 memory_type - from_memory_type=USER_INPUT_TYPE, - to_memory_type=LONG_TERM_MEMORY_TYPE, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.current_mem_cube, - memcube_log_content=kb_log_content, - metadata=None, - memory_len=len(kb_log_content), - memcube_name=self._map_memcube_name(msg.mem_cube_id), - ) - # 3. 后置赋值 log_content - event.log_content = ( - f"Knowledge Base Memory Update: {len(kb_log_content)} changes." - ) - event.task_id = msg.task_id - self._submit_web_logs([event]) + self.send_add_log_messages_to_cloud_env( + msg, prepared_add_items, prepared_update_items_with_original + ) else: - # Existing: Playground/Default Logging - # Reconstruct add_content/add_meta/update_content/update_meta from prepared_items - # This ensures existing logging path continues to work with pre-existing data structures - add_content_legacy: list[dict] = [] - add_meta_legacy: list[dict] = [] - update_content_legacy: list[dict] = [] - update_meta_legacy: list[dict] = [] - - for item in prepared_add_items: - key = getattr(item.metadata, "key", None) or transform_name_to_key( - name=item.memory - ) - add_content_legacy.append( - {"content": f"{key}: {item.memory}", "ref_id": item.id} - ) - add_meta_legacy.append( - { - "ref_id": item.id, - "id": item.id, - "key": item.metadata.key, - "memory": item.memory, - "memory_type": item.metadata.memory_type, - "status": item.metadata.status, - "confidence": item.metadata.confidence, - "tags": item.metadata.tags, - "updated_at": getattr(item.metadata, "updated_at", None) - or getattr(item.metadata, "update_at", None), - } - ) - - for item_data in prepared_update_items_with_original: - item = item_data["new_item"] - key = getattr(item.metadata, "key", None) or transform_name_to_key( - name=item.memory - ) - update_content_legacy.append( - {"content": f"{key}: {item.memory}", "ref_id": item.id} - ) - update_meta_legacy.append( - { - "ref_id": item.id, - "id": item.id, - "key": item.metadata.key, - "memory": item.memory, - "memory_type": item.metadata.memory_type, - "status": item.metadata.status, - "confidence": item.metadata.confidence, - "tags": item.metadata.tags, - "updated_at": getattr(item.metadata, "updated_at", None) - or getattr(item.metadata, "update_at", None), - } - ) - - events = [] - if add_content_legacy: - event = self.create_event_log( - label="addMemory", - from_memory_type=USER_INPUT_TYPE, - to_memory_type=LONG_TERM_MEMORY_TYPE, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.current_mem_cube, - memcube_log_content=add_content_legacy, - metadata=add_meta_legacy, - memory_len=len(add_content_legacy), - memcube_name=self._map_memcube_name(msg.mem_cube_id), - ) - event.task_id = msg.task_id - events.append(event) - if update_content_legacy: - event = self.create_event_log( - label="updateMemory", - from_memory_type=LONG_TERM_MEMORY_TYPE, - to_memory_type=LONG_TERM_MEMORY_TYPE, - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - mem_cube=self.current_mem_cube, - memcube_log_content=update_content_legacy, - metadata=update_meta_legacy, - memory_len=len(update_content_legacy), - memcube_name=self._map_memcube_name(msg.mem_cube_id), - ) - event.task_id = msg.task_id - events.append(event) - if events: - self._submit_web_logs(events) + self.send_add_log_messages_to_local_env( + msg, prepared_add_items, prepared_update_items_with_original + ) except Exception as e: logger.error(f"Error: {e}", exc_info=True) diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 71700bc63..ae900abc7 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -1,3 +1,5 @@ +import os + from pathlib import Path @@ -65,3 +67,6 @@ # task queue DEFAULT_STREAM_KEY_PREFIX = "scheduler:messages:stream:v1.3" +exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME", None) +if exchange_name is not None: + DEFAULT_STREAM_KEY_PREFIX += f":{exchange_name}" diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 4570461c5..e96657ca7 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -160,21 +160,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): task_id=task_item.item_id, user_id=task_item.user_id ) self.metrics.task_completed(user_id=m.user_id, task_type=m.label) - - # acknowledge redis messages - if ( - isinstance(self.memos_message_queue, SchedulerRedisQueue) - and self.memos_message_queue is not None - ): - for msg in messages: - redis_message_id = msg.redis_message_id - # Acknowledge message processing - self.memos_message_queue.ack_message( - user_id=msg.user_id, - mem_cube_id=msg.mem_cube_id, - task_label=msg.label, - redis_message_id=redis_message_id, - ) + # Redis ack is handled in finally to cover failure cases # Mark task as completed and remove from tracking with self._task_lock: @@ -199,6 +185,23 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}") raise + finally: + # Ensure Redis messages are acknowledged even if handler fails + if ( + isinstance(self.memos_message_queue, SchedulerRedisQueue) + and self.memos_message_queue is not None + ): + try: + for msg in messages: + redis_message_id = getattr(msg, "redis_message_id", "") + self.memos_message_queue.ack_message( + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + task_label=msg.label, + redis_message_id=redis_message_id, + ) + except Exception as ack_err: + logger.warning(f"Ack in finally failed: {ack_err}") return wrapped_handler