Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 101 additions & 39 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3348,58 +3348,120 @@ def add_nodes_batch(
with conn.cursor() as cursor:
# Process each group separately
for embedding_column, nodes_group in nodes_by_embedding_column.items():
# Delete existing records first (batch delete)
for node in nodes_group:
# Batch delete existing records using IN clause
ids_to_delete = [node["id"] for node in nodes_group]
if ids_to_delete:
delete_query = f"""
DELETE FROM {self.db_name}_graph."Memory"
WHERE id = ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring)
WHERE id IN (
SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, unnest(%s::text[])::cstring)
)
"""
cursor.execute(delete_query, (node["id"],))
cursor.execute(delete_query, (ids_to_delete,))

# Batch get graph_ids for all nodes
get_graph_ids_query = f"""
SELECT
id_val,
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, id_val::text::cstring) as graph_id
FROM unnest(%s::text[]) as id_val
"""
cursor.execute(get_graph_ids_query, (ids_to_delete,))
graph_id_map = {row[0]: row[1] for row in cursor.fetchall()}

# Insert nodes (batch insert using executemany for better performance)
# Add graph_id to properties
for node in nodes_group:
# Get graph_id for this node
get_graph_id_query = f"""
SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring)
"""
cursor.execute(get_graph_id_query, (node["id"],))
graph_id = cursor.fetchone()[0]
node["properties"]["graph_id"] = str(graph_id)

# Insert node
if node["embedding_vector"]:
insert_query = f"""
INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column})
VALUES (
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
%s,
%s
graph_id = graph_id_map.get(node["id"])
if graph_id:
node["properties"]["graph_id"] = str(graph_id)

# Batch insert using VALUES with multiple rows
# Use psycopg2.extras.execute_values for efficient batch insert
from psycopg2.extras import execute_values

if embedding_column and any(node["embedding_vector"] for node in nodes_group):
# Prepare data tuples for batch insert with embedding
data_tuples = []
for node in nodes_group:
# Each tuple: (id, properties_json, embedding_json)
data_tuples.append(
(
node["id"],
json.dumps(node["properties"]),
json.dumps(node["embedding_vector"])
if node["embedding_vector"]
else None,
)
"""
logger.info(
f"[add_nodes_batch] Inserting node insert_query={insert_query}"
)
cursor.execute(
insert_query,

# Build the INSERT query template
insert_query = f"""
INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column})
VALUES %s
"""

# Build the VALUES template for execute_values
# Each row: (graph_id_function, agtype, vector)
# Note: properties column is agtype, not jsonb
template = f"""
(
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
%s::text::agtype,
%s::vector
)
"""
logger.info(
f"[add_nodes_batch] embedding_column Inserting insert_query:{insert_query}"
)
logger.info(
f"[add_nodes_batch] embedding_column Inserting data_tuples:{data_tuples}"
)

# Execute batch insert
execute_values(
cursor,
insert_query,
data_tuples,
template=template,
page_size=100, # Insert in batches of 100
)
else:
# Prepare data tuples for batch insert without embedding
data_tuples = []
for node in nodes_group:
# Each tuple: (id, properties_json)
data_tuples.append(
(
node["id"],
json.dumps(node["properties"]),
json.dumps(node["embedding_vector"]),
),
)
else:
insert_query = f"""
INSERT INTO {self.db_name}_graph."Memory"(id, properties)
VALUES (
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
%s
)
"""
cursor.execute(
insert_query,
(node["id"], json.dumps(node["properties"])),
)

# Build the INSERT query template
insert_query = f"""
INSERT INTO {self.db_name}_graph."Memory"(id, properties)
VALUES %s
"""

# Build the VALUES template for execute_values
# Note: properties column is agtype, not jsonb
template = f"""
(
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
%s::text::agtype
)
"""
logger.info(f"[add_nodes_batch] Inserting insert_query:{insert_query}")
logger.info(f"[add_nodes_batch] Inserting data_tuples:{data_tuples}")
# Execute batch insert
execute_values(
cursor,
insert_query,
data_tuples,
template=template,
page_size=100, # Insert in batches of 100
)

logger.info(
f"[add_nodes_batch] Inserted {len(nodes_group)} nodes with embedding_column={embedding_column}"
)
Expand Down