From 2649035f6bd08069cc8f57fc5453401e0ea18701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Tue, 9 Dec 2025 13:51:54 +0800 Subject: [PATCH] optimize add batch --- src/memos/graph_dbs/polardb.py | 140 ++++++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 39 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index a5599643e..bbf62cc34 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -3348,58 +3348,120 @@ def add_nodes_batch( with conn.cursor() as cursor: # Process each group separately for embedding_column, nodes_group in nodes_by_embedding_column.items(): - # Delete existing records first (batch delete) - for node in nodes_group: + # Batch delete existing records using IN clause + ids_to_delete = [node["id"] for node in nodes_group] + if ids_to_delete: delete_query = f""" DELETE FROM {self.db_name}_graph."Memory" - WHERE id = ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring) + WHERE id IN ( + SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, unnest(%s::text[])::cstring) + ) """ - cursor.execute(delete_query, (node["id"],)) + cursor.execute(delete_query, (ids_to_delete,)) + + # Batch get graph_ids for all nodes + get_graph_ids_query = f""" + SELECT + id_val, + ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, id_val::text::cstring) as graph_id + FROM unnest(%s::text[]) as id_val + """ + cursor.execute(get_graph_ids_query, (ids_to_delete,)) + graph_id_map = {row[0]: row[1] for row in cursor.fetchall()} - # Insert nodes (batch insert using executemany for better performance) + # Add graph_id to properties for node in nodes_group: - # Get graph_id for this node - get_graph_id_query = f""" - SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring) - """ - cursor.execute(get_graph_id_query, (node["id"],)) - graph_id = cursor.fetchone()[0] - node["properties"]["graph_id"] = str(graph_id) - - # Insert node - if node["embedding_vector"]: - insert_query = f""" - INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column}) - VALUES ( - ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring), - %s, - %s + graph_id = graph_id_map.get(node["id"]) + if graph_id: + node["properties"]["graph_id"] = str(graph_id) + + # Batch insert using VALUES with multiple rows + # Use psycopg2.extras.execute_values for efficient batch insert + from psycopg2.extras import execute_values + + if embedding_column and any(node["embedding_vector"] for node in nodes_group): + # Prepare data tuples for batch insert with embedding + data_tuples = [] + for node in nodes_group: + # Each tuple: (id, properties_json, embedding_json) + data_tuples.append( + ( + node["id"], + json.dumps(node["properties"]), + json.dumps(node["embedding_vector"]) + if node["embedding_vector"] + else None, ) - """ - logger.info( - f"[add_nodes_batch] Inserting node insert_query={insert_query}" ) - cursor.execute( - insert_query, + + # Build the INSERT query template + insert_query = f""" + INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column}) + VALUES %s + """ + + # Build the VALUES template for execute_values + # Each row: (graph_id_function, agtype, vector) + # Note: properties column is agtype, not jsonb + template = f""" + ( + ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring), + %s::text::agtype, + %s::vector + ) + """ + logger.info( + f"[add_nodes_batch] embedding_column Inserting insert_query:{insert_query}" + ) + logger.info( + f"[add_nodes_batch] embedding_column Inserting data_tuples:{data_tuples}" + ) + + # Execute batch insert + execute_values( + cursor, + insert_query, + data_tuples, + template=template, + page_size=100, # Insert in batches of 100 + ) + else: + # Prepare data tuples for batch insert without embedding + data_tuples = [] + for node in nodes_group: + # Each tuple: (id, properties_json) + data_tuples.append( ( node["id"], json.dumps(node["properties"]), - json.dumps(node["embedding_vector"]), - ), - ) - else: - insert_query = f""" - INSERT INTO {self.db_name}_graph."Memory"(id, properties) - VALUES ( - ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring), - %s ) - """ - cursor.execute( - insert_query, - (node["id"], json.dumps(node["properties"])), ) + # Build the INSERT query template + insert_query = f""" + INSERT INTO {self.db_name}_graph."Memory"(id, properties) + VALUES %s + """ + + # Build the VALUES template for execute_values + # Note: properties column is agtype, not jsonb + template = f""" + ( + ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring), + %s::text::agtype + ) + """ + logger.info(f"[add_nodes_batch] Inserting insert_query:{insert_query}") + logger.info(f"[add_nodes_batch] Inserting data_tuples:{data_tuples}") + # Execute batch insert + execute_values( + cursor, + insert_query, + data_tuples, + template=template, + page_size=100, # Insert in batches of 100 + ) + logger.info( f"[add_nodes_batch] Inserted {len(nodes_group)} nodes with embedding_column={embedding_column}" )