diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 470d2c483..06a8a638d 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -85,13 +85,44 @@ def __init__( self._merged_threshold = merged_threshold def add( - self, memories: list[TextualMemoryItem], user_name: str | None = None, mode: str = "sync" + self, + memories: list[TextualMemoryItem], + user_name: str | None = None, + mode: str = "sync", + use_batch: bool = True, ) -> list[str]: """ - Add new memories in parallel to different memory types. + Add new memories to different memory types. + + Args: + memories: List of memory items to add. + user_name: Optional user name for the memories. + mode: "sync" to cleanup and refresh after adding, "async" to skip. + use_batch: If True, use batch database operations (more efficient for large batches). + If False, use parallel single-node operations (original behavior). + + Returns: + List of added memory IDs. """ added_ids: list[str] = [] + if use_batch: + added_ids = self._add_memories_batch(memories, user_name) + else: + added_ids = self._add_memories_parallel(memories, user_name) + + if mode == "sync": + self._cleanup_working_memory(user_name) + self._refresh_memory_size(user_name=user_name) + + return added_ids + def _add_memories_parallel( + self, memories: list[TextualMemoryItem], user_name: str | None = None + ) -> list[str]: + """ + Add memories using parallel single-node operations (original behavior). + """ + added_ids: list[str] = [] with ContextThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(self._process_memory, m, user_name): m for m in memories} for future in as_completed(futures, timeout=500): @@ -100,21 +131,101 @@ def add( added_ids.extend(ids) except Exception as e: logger.exception("Memory processing error: ", exc_info=e) + return added_ids - if mode == "sync": - for mem_type in ["WorkingMemory"]: - try: - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", - keep_latest=self.memory_size[mem_type], - user_name=user_name, + def _add_memories_batch( + self, memories: list[TextualMemoryItem], user_name: str | None = None + ) -> list[str]: + """ + Add memories using batch database operations (more efficient for large batches). + """ + if not memories: + return [] + + added_ids: list[str] = [] + working_nodes: list[dict] = [] + graph_nodes: list[dict] = [] + graph_node_ids: list[str] = [] + + for memory in memories: + working_id = str(uuid.uuid4()) + + # Prepare WorkingMemory node (skip for ToolSchemaMemory and ToolTrajectoryMemory) + if memory.metadata.memory_type not in ("ToolSchemaMemory", "ToolTrajectoryMemory"): + working_metadata = memory.metadata.model_copy( + update={"memory_type": "WorkingMemory"} + ).model_dump(exclude_none=True) + working_metadata["updated_at"] = datetime.now().isoformat() + working_nodes.append( + { + "id": working_id, + "memory": memory.memory, + "metadata": working_metadata, + } + ) + + # Prepare graph memory node (LongTermMemory/UserMemory/ToolSchemaMemory/ToolTrajectoryMemory) + if memory.metadata.memory_type in ( + "LongTermMemory", + "UserMemory", + "ToolSchemaMemory", + "ToolTrajectoryMemory", + ): + graph_node_id = str(uuid.uuid4()) + metadata_dict = memory.metadata.model_dump(exclude_none=True) + metadata_dict["updated_at"] = datetime.now().isoformat() + + # Add working_binding for fast mode + tags = metadata_dict.get("tags") or [] + if "mode:fast" in tags: + prev_bg = metadata_dict.get("background", "") or "" + binding_line = f"[working_binding:{working_id}] direct built from raw inputs" + metadata_dict["background"] = ( + f"{prev_bg} || {binding_line}" if prev_bg else binding_line ) - except Exception: - logger.warning(f"Remove {mem_type} error: {traceback.format_exc()}") - self._refresh_memory_size(user_name=user_name) + graph_nodes.append( + { + "id": graph_node_id, + "memory": memory.memory, + "metadata": metadata_dict, + } + ) + graph_node_ids.append(graph_node_id) + added_ids.append(graph_node_id) + + # Batch insert nodes + if working_nodes: + try: + self.graph_store.add_nodes_batch(working_nodes, user_name=user_name) + except Exception as e: + logger.exception("Batch add WorkingMemory nodes error: ", exc_info=e) + + if graph_nodes: + try: + self.graph_store.add_nodes_batch(graph_nodes, user_name=user_name) + except Exception as e: + logger.exception("Batch add graph memory nodes error: ", exc_info=e) + + # Notify reorganizer (only if enabled) + if graph_node_ids and self.is_reorganize: + self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids)) + return added_ids + def _cleanup_working_memory(self, user_name: str | None = None) -> None: + """ + Remove oldest WorkingMemory nodes to keep within size limit. + """ + try: + self.graph_store.remove_oldest_memory( + memory_type="WorkingMemory", + keep_latest=self.memory_size["WorkingMemory"], + user_name=user_name, + ) + except Exception: + logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}") + def replace_working_memory( self, memories: list[TextualMemoryItem], user_name: str | None = None ) -> None: