Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions agent_memory_server/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
# Track pending extraction tasks to prevent garbage collection
# This is only used when running without Docket (asyncio mode)
_pending_extraction_tasks: set = set()
SEMANTIC_DEDUP_SEARCH_LIMIT = 10
SEMANTIC_DEDUP_QUERY_LIMIT = SEMANTIC_DEDUP_SEARCH_LIMIT + 1


def _parse_extraction_response_with_fallback(content: str, logger) -> dict:
Expand Down Expand Up @@ -1411,6 +1413,69 @@ async def deduplicate_by_id(
return memory, False


async def _semantic_merge_group_is_cohesive(
*,
db: Any,
memory: MemoryRecord,
candidate_memories: list[MemoryRecordResult],
namespace_filter: Namespace | None,
user_id_filter: UserId | None,
session_id_filter: SessionId | None,
vector_distance_threshold: float,
) -> bool:
"""Reject bridge memories and non-cohesive semantic merge groups."""

if not candidate_memories:
return True

candidate_ids = {candidate.id for candidate in candidate_memories if candidate.id}
# Once the merge group is already at the hard cap, extra neighbors can
# simply mean "there are more same-topic memories than we are willing to
# merge in one pass" rather than "this group is ambiguous".
merge_group_is_capped = len(candidate_memories) >= SEMANTIC_DEDUP_SEARCH_LIMIT

for candidate_memory in candidate_memories:
if not candidate_memory.text:
return False

search_result = await db.search_memories(
query=candidate_memory.text,
namespace=namespace_filter,
user_id=user_id_filter,
session_id=session_id_filter,
distance_threshold=vector_distance_threshold,
# During compaction the anchor memory is already indexed and can
# consume one search slot before we filter it out locally.
limit=SEMANTIC_DEDUP_QUERY_LIMIT,
)
Comment thread
cursor[bot] marked this conversation as resolved.
related_ids = {
result.id
for result in (search_result.memories if search_result else [])
if result.id not in {candidate_memory.id, memory.id}
}
extra_ids = related_ids - candidate_ids
Comment thread
abrookins marked this conversation as resolved.
if extra_ids and not merge_group_is_capped:
logger.info(
"Skipping ambiguous semantic merge group for %s via %s; extra neighbors=%s",
memory.id,
candidate_memory.id,
sorted(extra_ids),
)
return False
Comment thread
cursor[bot] marked this conversation as resolved.

missing_ids = candidate_ids - {candidate_memory.id} - related_ids
Comment thread
abrookins marked this conversation as resolved.
if missing_ids:
logger.info(
"Skipping non-cohesive semantic merge group for %s via %s; missing neighbors=%s",
memory.id,
candidate_memory.id,
sorted(missing_ids),
)
return False

return True


async def deduplicate_by_semantic_search(
memory: MemoryRecord,
redis_client: Redis | None = None,
Expand Down Expand Up @@ -1491,21 +1556,37 @@ async def deduplicate_by_semantic_search(
user_id=user_id_filter,
session_id=session_id_filter,
distance_threshold=vector_distance_threshold,
limit=10,
# Keep one extra slot so an already-indexed anchor memory does not
# reduce the number of semantic candidates we can evaluate.
limit=SEMANTIC_DEDUP_QUERY_LIMIT,
)

vector_search_result = search_result.memories if search_result else []

# Filter out the memory itself from the search results (avoid self-duplication)
vector_search_result = [m for m in vector_search_result if m.id != memory.id]
vector_search_result = [m for m in vector_search_result if m.id != memory.id][
:SEMANTIC_DEDUP_SEARCH_LIMIT
]

if vector_search_result and len(vector_search_result) > 0:
merge_group = [memory] + vector_search_result
if not await _semantic_merge_group_is_cohesive(
db=db,
memory=memory,
candidate_memories=vector_search_result,
namespace_filter=namespace_filter,
user_id_filter=user_id_filter,
session_id_filter=session_id_filter,
vector_distance_threshold=vector_distance_threshold,
):
return memory, False

# Found semantically similar memories
similar_memory_ids = [memory.id for memory in vector_search_result]

# Merge the memories
merged_memory = await merge_memories_with_llm(
[memory] + vector_search_result,
merge_group,
)

# Delete the similar memories using the database
Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_deduplication_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,54 @@ async def test_distinct_memories_are_not_merged(
"Distinct memories were incorrectly merged."
)

async def test_real_embeddings_do_not_merge_diet_and_basketball_memories(
self, use_test_redis_connection, unique_namespace, unique_user_id
):
"""
Test the reported false-positive shape directly with real embeddings.

- "User eats a healthy diet with vegetables and lean protein"
- "User played basketball in high school"

These should not be selected as semantic duplicates.
"""
memory1 = MemoryRecord(
id=str(ulid.ULID()),
text="User eats a healthy diet with vegetables and lean protein",
namespace=unique_namespace,
user_id=unique_user_id,
memory_type="semantic",
)

await index_long_term_memories(
[memory1],
redis_client=use_test_redis_connection,
deduplicate=False,
)

await asyncio.sleep(1)

memory2 = MemoryRecord(
id=str(ulid.ULID()),
text="User played basketball in high school",
namespace=unique_namespace,
user_id=unique_user_id,
memory_type="semantic",
)

_, was_merged = await deduplicate_by_semantic_search(
memory=memory2,
redis_client=use_test_redis_connection,
namespace=unique_namespace,
user_id=unique_user_id,
vector_distance_threshold=0.35,
)

assert not was_merged, (
"Healthy-diet and high-school basketball memories should not be "
"selected as semantic duplicates at threshold 0.35."
)

async def test_vector_search_catches_paraphrased_duplicates(
self, use_test_redis_connection, unique_namespace, unique_user_id
):
Expand Down
Loading
Loading