Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions src/memos/graph_dbs/neo4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,9 +1415,9 @@ def build_filter_condition(condition_dict: dict, param_counter: list) -> tuple[s
params = {}

for key, value in condition_dict.items():
# Check if value is a dict with comparison operators (gt, lt, gte, lte)
# Check if value is a dict with comparison operators (gt, lt, gte, lte, contains, in, like)
if isinstance(value, dict):
# Handle comparison operators: gt (greater than), lt (less than), gte (greater than or equal), lte (less than or equal)
# Handle comparison operators: gt, lt, gte, lte, contains, in, like
for op, op_value in value.items():
if op in ("gt", "lt", "gte", "lte"):
# Map operator to Cypher operator
Expand All @@ -1440,24 +1440,28 @@ def build_filter_condition(condition_dict: dict, param_counter: list) -> tuple[s
f"{node_alias}.{key} {cypher_op} ${param_name}"
)
elif op == "contains":
# Handle contains operator (for array fields)
# Only supports array format: {"field": {"contains": ["value1", "value2"]}}
# Single string values are not supported, use array format instead: {"field": {"contains": ["value"]}}
# Handle contains operator
# For arrays: use IN to check if array contains value (value IN array_field)
# For strings: also use IN syntax to check if string value is in array field
# Note: In Neo4j, for array fields, we use "value IN field" syntax
param_name = f"filter_{key}_{op}_{param_counter[0]}"
param_counter[0] += 1
params[param_name] = op_value
# Use IN syntax: value IN array_field (works for both string and array values)
condition_parts.append(f"${param_name} IN {node_alias}.{key}")
elif op == "in":
# Handle in operator (for checking if field value is in a list)
# Supports array format: {"field": {"in": ["value1", "value2"]}}
if not isinstance(op_value, list):
raise ValueError(
f"contains operator only supports array format. "
f"Use {{'{key}': {{'contains': ['{op_value}']}}}} instead of {{'{key}': {{'contains': '{op_value}'}}}}"
f"in operator only supports array format. "
f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}"
)
# Handle array of values: generate AND conditions for each value (all must be present)
and_conditions = []
for item in op_value:
param_name = f"filter_{key}_{op}_{param_counter[0]}"
param_counter[0] += 1
params[param_name] = item
# For array fields, check if element is in array
and_conditions.append(f"${param_name} IN {node_alias}.{key}")
if and_conditions:
condition_parts.append(f"({' AND '.join(and_conditions)})")
# Build IN clause
param_name = f"filter_{key}_{op}_{param_counter[0]}"
param_counter[0] += 1
params[param_name] = op_value
condition_parts.append(f"{node_alias}.{key} IN ${param_name}")
elif op == "like":
# Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%')
# Neo4j uses CONTAINS for string matching
Expand Down
148 changes: 93 additions & 55 deletions src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def clean_properties(props):
return {k: v for k, v in props.items() if k not in vector_keys}


def escape_sql_string(value: str) -> str:
"""Escape single quotes in SQL string."""
return value.replace("'", "''")


class PolarDBGraphDB(BaseGraphDB):
"""PolarDB-based implementation using Apache AGE graph database extension."""

Expand Down Expand Up @@ -3438,9 +3443,11 @@ def build_cypher_filter_condition(condition_dict: dict) -> str:
"""Build a Cypher WHERE condition for a single filter item."""
condition_parts = []
for key, value in condition_dict.items():
# Check if value is a dict with comparison operators (gt, lt, gte, lte, =, contains)
# Check if value is a dict with comparison operators (gt, lt, gte, lte, =, contains, in, like)
if isinstance(value, dict):
# Handle comparison operators: gt, lt, gte, lte, =, contains
# Handle comparison operators: gt, lt, gte, lte, =, contains, in, like
# Supports multiple operators for the same field, e.g.:
# will generate: n.created_at >= '2025-09-19' AND n.created_at <= '2025-12-31'
for op, op_value in value.items():
if op in ("gt", "lt", "gte", "lte"):
# Map operator to Cypher operator
Expand Down Expand Up @@ -3540,40 +3547,90 @@ def build_cypher_filter_condition(condition_dict: dict) -> str:
condition_parts.append(f"n.{key} = {op_value}")
elif op == "contains":
# Handle contains operator (for array fields)
# Only supports array format: {"field": {"contains": ["value1", "value2"]}}
# Single string values are not supported, use array format instead: {"field": {"contains": ["value"]}}
# Check if key starts with "info." prefix
if key.startswith("info."):
info_field = key[5:] # Remove "info." prefix
if isinstance(op_value, str):
escaped_value = escape_cypher_string(op_value)
condition_parts.append(
f"'{escaped_value}' IN n.info.{info_field}"
)
else:
condition_parts.append(f"{op_value} IN n.info.{info_field}")
else:
# Direct property access
if isinstance(op_value, str):
escaped_value = escape_cypher_string(op_value)
condition_parts.append(f"'{escaped_value}' IN n.{key}")
else:
condition_parts.append(f"{op_value} IN n.{key}")
elif op == "in":
# Handle in operator (for checking if field value is in a list)
# Supports array format: {"field": {"in": ["value1", "value2"]}}
# Generates: n.field IN ['value1', 'value2'] or (n.field = 'value1' OR n.field = 'value2')
if not isinstance(op_value, list):
raise ValueError(
f"contains operator only supports array format. "
f"Use {{'{key}': {{'contains': ['{op_value}']}}}} instead of {{'{key}': {{'contains': '{op_value}'}}}}"
f"in operator only supports array format. "
f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}"
)
# Check if key starts with "info." prefix
if key.startswith("info."):
info_field = key[5:] # Remove "info." prefix
# Handle array of values: generate AND conditions for each value (all must be present)
and_conditions = []
for item in op_value:
# Build OR conditions for nested properties (Apache AGE compatibility)
if len(op_value) == 0:
# Empty list means no match
condition_parts.append("false")
elif len(op_value) == 1:
# Single value, use equality
item = op_value[0]
if isinstance(item, str):
escaped_value = escape_cypher_string(item)
and_conditions.append(
f"'{escaped_value}' IN n.info.{info_field}"
condition_parts.append(
f"n.info.{info_field} = '{escaped_value}'"
)
else:
and_conditions.append(f"{item} IN n.info.{info_field}")
if and_conditions:
condition_parts.append(f"({' AND '.join(and_conditions)})")
condition_parts.append(f"n.info.{info_field} = {item}")
else:
# Multiple values, use OR conditions instead of IN (Apache AGE compatibility)
or_conditions = []
for item in op_value:
if isinstance(item, str):
escaped_value = escape_cypher_string(item)
or_conditions.append(
f"n.info.{info_field} = '{escaped_value}'"
)
else:
or_conditions.append(
f"n.info.{info_field} = {item}"
)
if or_conditions:
condition_parts.append(
f"({' OR '.join(or_conditions)})"
)
else:
# Direct property access
# Handle array of values: generate AND conditions for each value (all must be present)
and_conditions = []
for item in op_value:
# Build array for IN clause or OR conditions
if len(op_value) == 0:
# Empty list means no match
condition_parts.append("false")
elif len(op_value) == 1:
# Single value, use equality
item = op_value[0]
if isinstance(item, str):
escaped_value = escape_cypher_string(item)
and_conditions.append(f"'{escaped_value}' IN n.{key}")
condition_parts.append(f"n.{key} = '{escaped_value}'")
else:
and_conditions.append(f"{item} IN n.{key}")
if and_conditions:
condition_parts.append(f"({' AND '.join(and_conditions)})")
condition_parts.append(f"n.{key} = {item}")
else:
# Multiple values, use IN clause
escaped_items = [
f"'{escape_cypher_string(str(item))}'"
if isinstance(item, str)
else str(item)
for item in op_value
]
array_str = "[" + ", ".join(escaped_items) + "]"
condition_parts.append(f"n.{key} IN {array_str}")
elif op == "like":
# Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%')
# Check if key starts with "info." prefix
Expand Down Expand Up @@ -3781,47 +3838,28 @@ def build_filter_condition(condition_dict: dict) -> str:
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {op_value}::agtype"
)
elif op == "contains":
# Handle contains operator (for array fields) - use @> operator
# Only supports array format: {"field": {"contains": ["value1", "value2"]}}
# Single string values are not supported, use array format instead: {"field": {"contains": ["value"]}}
if not isinstance(op_value, list):
# Handle contains operator (for string fields only)
# Check if agtype contains value (using @> operator)
if not isinstance(op_value, str):
raise ValueError(
f"contains operator only supports array format. "
f"Use {{'{key}': {{'contains': ['{op_value}']}}}} instead of {{'{key}': {{'contains': '{op_value}'}}}}"
f"contains operator only supports string format. "
f"Use {{'{key}': {{'contains': '{op_value}'}}}} instead of {{'{key}': {{'contains': {op_value}}}}}"
)
# Check if key starts with "info." prefix
if key.startswith("info."):
info_field = key[5:] # Remove "info." prefix
# Handle array of values: generate AND conditions for each value (all must be present)
and_conditions = []
for item in op_value:
if isinstance(item, str):
escaped_value = escape_sql_string(item)
and_conditions.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '\"{escaped_value}\"'::agtype"
)
else:
and_conditions.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> {item}::agtype"
)
if and_conditions:
condition_parts.append(f"({' AND '.join(and_conditions)})")
# String contains: use @> operator for agtype contains
escaped_value = escape_sql_string(op_value)
condition_parts.append(
f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '\"{escaped_value}\"'::agtype"
)
else:
# Direct property access
# Handle array of values: generate AND conditions for each value (all must be present)
and_conditions = []
for item in op_value:
if isinstance(item, str):
escaped_value = escape_sql_string(item)
and_conditions.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '\"{escaped_value}\"'::agtype"
)
else:
and_conditions.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> {item}::agtype"
)
if and_conditions:
condition_parts.append(f"({' AND '.join(and_conditions)})")
# String contains: use @> operator for agtype contains
escaped_value = escape_sql_string(op_value)
condition_parts.append(
f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '\"{escaped_value}\"'::agtype"
)
elif op == "like":
# Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%')
# Check if key starts with "info." prefix
Expand Down
Loading