Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/packages/core/agent_framework/_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,9 @@ async def run(
user=user,
additional_properties=merged_additional_options, # type: ignore[arg-type]
)
# Filter chat_options from kwargs to prevent duplicate keyword argument
filtered_kwargs = {k: v for k, v in kwargs.items() if k != "chat_options"}
# Filter chat_options and filters from kwargs to prevent duplicate keyword argument
# filters is for context providers only, not for chat client
filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ("chat_options", "filters")}
response = await self.chat_client.get_response(
messages=thread_messages,
chat_options=co,
Expand Down
64 changes: 60 additions & 4 deletions python/packages/mem0/agent_framework_mem0/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ def __init__(
user_id: The user ID for scoping memories or None.
scope_to_per_operation_thread_id: Whether to scope memories to per-operation thread ID.
context_prompt: The prompt to prepend to retrieved memories.

Note:
For advanced filtering (OR logic, date ranges, comparisons, etc.), pass a `filters`
parameter to the `invoking()` through `agent.run()` method.
Comment on lines +60 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add an example how it looks like from user perspective?

The filters will be merged with these init params.

Example:
```python
provider = Mem0Provider(user_id="user123")
agent = client.create_agent(context_providers=provider)

# Advanced filtering with OR logic
await agent.run(
"Show memories from Bob or recent memories",
filters={"OR": [{"user_id": "bob"}, {"created_at": {"gte": "2024-12-01"}}]},
)
```

See mem0 docs: https://docs.mem0.ai/api-reference/memory/search-memories
"""
should_close_client = False
if mem0_client is None:
Expand Down Expand Up @@ -137,7 +156,11 @@ async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], *
messages: List of new messages in the thread.

Keyword Args:
**kwargs: not used at present.
**kwargs: Optional keyword arguments.
filters (dict, optional): Optional dictionary for this specific search.
Merged with init parameters (user_id, agent_id, etc.).
Supports mem0's full filter syntax including logical operators (AND, OR, NOT),
comparison operators (in, gte, lte, gt, lt, ne, icontains), and wildcards (*).

Returns:
Context: Context object containing instructions with memories.
Expand All @@ -150,11 +173,13 @@ async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], *
if not input_text.strip():
return Context(messages=None)

# Extract filters from kwargs if provided
invocation_filters = kwargs.get("filters")
filters = self._build_filters(invocation_filters)

search_response: MemorySearchResponse_v1_1 | MemorySearchResponse_v2 = await self.mem0_client.search( # type: ignore[misc]
query=input_text,
user_id=self.user_id,
agent_id=self.agent_id,
run_id=self._per_operation_thread_id if self.scope_to_per_operation_thread_id else self.thread_id,
filters=filters,
)

# Depending on the API version, the response schema varies slightly
Expand Down Expand Up @@ -185,6 +210,37 @@ def _validate_filters(self) -> None:
"At least one of the filters: agent_id, user_id, application_id, or thread_id is required."
)

def _build_filters(self, invocation_filters: dict[str, Any] | None = None) -> dict[str, Any]:
"""Build search filters from init parameters and optional per-invocation filters.

Args:
invocation_filters: Optional filters passed to invoking() for this specific search.

Returns:
Filter dictionary for mem0 search API. Merges init parameters with invocation filters.
Init parameters provide the base scope (user_id, agent_id, etc.).
Invocation filters can add or override for advanced queries.
"""
# Build base filters from init parameters (flat dictionary = implicit AND)
filters: dict[str, Any] = {}

if self.user_id:
filters["user_id"] = self.user_id
if self.agent_id:
filters["agent_id"] = self.agent_id
if self.scope_to_per_operation_thread_id and self._per_operation_thread_id:
filters["run_id"] = self._per_operation_thread_id
elif self.thread_id:
filters["run_id"] = self.thread_id
if self.application_id:
filters["app_id"] = self.application_id

# Merge with per-invocation filters (invocation filters take precedence)
if invocation_filters:
filters.update(invocation_filters)

return filters

def _validate_per_operation_thread_id(self, thread_id: str | None) -> None:
"""Validates that a new thread ID doesn't conflict with an existing one when scoped.

Expand Down
87 changes: 83 additions & 4 deletions python/packages/mem0/tests/test_mem0_context_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def test_model_invoking_single_message(self, mock_mem0_client: AsyncMock)
mock_mem0_client.search.assert_called_once()
call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["query"] == "What's the weather?"
assert call_args.kwargs["user_id"] == "user123"
assert call_args.kwargs["filters"] == {"user_id": "user123"}

assert isinstance(context, Context)
expected_instructions = (
Expand Down Expand Up @@ -373,8 +373,7 @@ async def test_model_invoking_with_agent_id(self, mock_mem0_client: AsyncMock) -
await provider.invoking(message)

call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["agent_id"] == "agent123"
assert call_args.kwargs["user_id"] is None
assert call_args.kwargs["filters"] == {"agent_id": "agent123"}

async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test invoking with scope_to_per_operation_thread_id enabled."""
Expand All @@ -392,7 +391,7 @@ async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_m
await provider.invoking(message)

call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["run_id"] == "operation_thread"
assert call_args.kwargs["filters"] == {"user_id": "user123", "run_id": "operation_thread"}

async def test_model_invoking_no_memories_returns_none_instructions(self, mock_mem0_client: AsyncMock) -> None:
"""Test that no memories returns context with None instructions."""
Expand Down Expand Up @@ -510,3 +509,83 @@ def test_validate_per_operation_thread_id_disabled_scope(self, mock_mem0_client:

# Should not raise exception even with different thread ID
provider._validate_per_operation_thread_id("different_thread")


class TestMem0ProviderBuildFilters:
"""Test the _build_filters method."""

def test_build_filters_with_invocation_filters(self, mock_mem0_client: AsyncMock) -> None:
"""Test that invocation filters are merged with init parameters."""
provider = Mem0Provider(
user_id="user123",
agent_id="agent123",
mem0_client=mock_mem0_client,
)

invocation_filters = {"created_at": {"gte": "2024-12-01"}}
filters = provider._build_filters(invocation_filters)
assert filters == {
"user_id": "user123",
"agent_id": "agent123",
"created_at": {"gte": "2024-12-01"},
}

def test_build_filters_invocation_overrides_init(self, mock_mem0_client: AsyncMock) -> None:
"""Test that invocation filters can override init parameters."""
provider = Mem0Provider(
user_id="user123",
mem0_client=mock_mem0_client,
)

invocation_filters = {"user_id": "alice"} # Override
filters = provider._build_filters(invocation_filters)
assert filters == {"user_id": "alice"}

def test_build_filters_with_all_simple_parameters(self, mock_mem0_client: AsyncMock) -> None:
"""Test building filters with all simple parameters combined."""
provider = Mem0Provider(
user_id="user123",
agent_id="agent456",
thread_id="thread999",
application_id="app789",
mem0_client=mock_mem0_client,
)

filters = provider._build_filters()
assert filters == {
"user_id": "user123",
"agent_id": "agent456",
"run_id": "thread999",
"app_id": "app789",
}

def test_build_filters_excludes_none_values(self, mock_mem0_client: AsyncMock) -> None:
"""Test that None values are excluded from filters."""
provider = Mem0Provider(
user_id="user123",
agent_id=None,
thread_id=None,
application_id=None,
mem0_client=mock_mem0_client,
)

filters = provider._build_filters()
assert filters == {"user_id": "user123"}
assert "agent_id" not in filters
assert "run_id" not in filters
assert "app_id" not in filters

async def test_model_invoking_with_filters_kwarg(self, mock_mem0_client: AsyncMock) -> None:
"""Test invoking with filters passed via kwargs."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="Hello")

mock_mem0_client.search.return_value = []

await provider.invoking(message, filters={"created_at": {"gte": "2024-12-01"}})

call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["filters"] == {
"user_id": "user123",
"created_at": {"gte": "2024-12-01"},
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This folder contains examples demonstrating how to use the Mem0 context provider
| File | Description |
|------|-------------|
| [`mem0_basic.py`](mem0_basic.py) | Basic example of using Mem0 context provider to store and retrieve user preferences across different conversation threads. |
| [`mem0_filters.py`](mem0_filters.py) | Example demonstrating advanced filtering capabilities with Mem0, including OR logic and date-based filters for precise memory retrieval. |
| [`mem0_threads.py`](mem0_threads.py) | Advanced example demonstrating different thread scoping strategies with Mem0. Covers global thread scope (memories shared across all operations), per-operation thread scope (memories isolated per thread), and multiple agents with different memory configurations for personal vs. work contexts. |
| [`mem0_oss.py`](mem0_oss.py) | Example of using the Mem0 Open Source self-hosted version as the context provider. Demonstrates setup and configuration for local deployment. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ async def main() -> None:
result = await agent.run(query)
print(f"Agent: {result}\n")

# Mem0 processes and indexes memories asynchronously.
# Wait for memories to be indexed before querying in a new thread.
# In production, consider implementing retry logic or using Mem0's
# eventual consistency handling instead of a fixed delay.
print("Waiting for memories to be processed...")
await asyncio.sleep(12) # Empirically determined delay for Mem0 indexing

print("\nRequest within a new thread:")
# Create a new thread for the agent.
# The new thread has no context of the previous conversation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
from datetime import datetime, timedelta

from agent_framework.azure import AzureAIAgentClient
from agent_framework.mem0 import Mem0Provider
from azure.identity.aio import AzureCliCredential


async def main() -> None:
"""Example demonstrating advanced filtering with Mem0 context provider."""
print("=== Mem0 Advanced Filtering Example ===\n")

# For Azure authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
# For Mem0 authentication, set Mem0 API key via "api_key" parameter or MEM0_API_KEY environment variable.
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).create_agent(
name="FilterAssistant",
instructions="You are a helpful assistant that retrieves and summarizes memories.",
context_providers=Mem0Provider(user_id="demo_user"),
) as agent,
):
# Store some memories with different timestamps
print("Storing memories...")
await agent.run("I love Python programming.")
await agent.run("My favorite color is blue.")
await agent.run("I work as a software engineer.")

# Wait for memories to be indexed
print("Waiting for memories to be processed...")
await asyncio.sleep(12) # Empirically determined delay for Mem0 indexing

# Calculate a date from a week ago for filtering
week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")

print("\n=== Using OR Filter ===")
# Use OR logic to find memories matching either condition
query = "What do you know about me?"
print(f"User: {query}")
print(f"Filter: Memories from another user OR created after {week_ago}")

result = await agent.run(
query,
filters={
"OR": [
{"user_id": "another_user"}, # This won't match
{"created_at": {"gte": week_ago}}, # This will match our recent memories
]
},
)
print(f"Agent: {result}\n")

print("\n=== Using Complex Filter ===")
# Demonstrate combining multiple filter conditions
query = "Tell me about my preferences"
print(f"User: {query}")
print("Filter: Recent memories with specific keywords")

result = await agent.run(
query,
filters={
"AND": [
{"user_id": "demo_user"},
{"created_at": {"gte": week_ago}},
]
},
)
print(f"Agent: {result}\n")


if __name__ == "__main__":
asyncio.run(main())
Loading