diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 06a8a638d..0561d178e 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -134,10 +134,18 @@ def _add_memories_parallel( return added_ids def _add_memories_batch( - self, memories: list[TextualMemoryItem], user_name: str | None = None + self, memories: list[TextualMemoryItem], user_name: str | None = None, batch_size: int = 10 ) -> list[str]: """ Add memories using batch database operations (more efficient for large batches). + + Args: + memories: List of memory items to add. + user_name: Optional user name for the memories. + batch_size: Number of nodes to insert per batch. + + Returns: + List of added graph memory node IDs. """ if not memories: return [] @@ -150,7 +158,6 @@ def _add_memories_batch( for memory in memories: working_id = str(uuid.uuid4()) - # Prepare WorkingMemory node (skip for ToolSchemaMemory and ToolTrajectoryMemory) if memory.metadata.memory_type not in ("ToolSchemaMemory", "ToolTrajectoryMemory"): working_metadata = memory.metadata.model_copy( update={"memory_type": "WorkingMemory"} @@ -163,8 +170,6 @@ def _add_memories_batch( "metadata": working_metadata, } ) - - # Prepare graph memory node (LongTermMemory/UserMemory/ToolSchemaMemory/ToolTrajectoryMemory) if memory.metadata.memory_type in ( "LongTermMemory", "UserMemory", @@ -194,20 +199,26 @@ def _add_memories_batch( graph_node_ids.append(graph_node_id) added_ids.append(graph_node_id) - # Batch insert nodes - if working_nodes: + for i in range(0, len(working_nodes), batch_size): + batch = working_nodes[i : i + batch_size] try: - self.graph_store.add_nodes_batch(working_nodes, user_name=user_name) + self.graph_store.add_nodes_batch(batch, user_name=user_name) except Exception as e: - logger.exception("Batch add WorkingMemory nodes error: ", exc_info=e) + logger.exception( + f"Batch add WorkingMemory nodes error (batch {i // batch_size + 1}): ", + exc_info=e, + ) - if graph_nodes: + for i in range(0, len(graph_nodes), batch_size): + batch = graph_nodes[i : i + batch_size] try: - self.graph_store.add_nodes_batch(graph_nodes, user_name=user_name) + self.graph_store.add_nodes_batch(batch, user_name=user_name) except Exception as e: - logger.exception("Batch add graph memory nodes error: ", exc_info=e) + logger.exception( + f"Batch add graph memory nodes error (batch {i // batch_size + 1}): ", + exc_info=e, + ) - # Notify reorganizer (only if enabled) if graph_node_ids and self.is_reorganize: self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids))