diff --git a/python/packages/core/agent_framework/_agents.py b/python/packages/core/agent_framework/_agents.py index 249ee9ecfb..e5a9db4089 100644 --- a/python/packages/core/agent_framework/_agents.py +++ b/python/packages/core/agent_framework/_agents.py @@ -878,8 +878,9 @@ async def run( user=user, additional_properties=merged_additional_options, # type: ignore[arg-type] ) - # Filter chat_options from kwargs to prevent duplicate keyword argument - filtered_kwargs = {k: v for k, v in kwargs.items() if k != "chat_options"} + # Filter chat_options and filters from kwargs to prevent duplicate keyword argument + # filters is for context providers only, not for chat client + filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ("chat_options", "filters")} response = await self.chat_client.get_response( messages=thread_messages, chat_options=co, diff --git a/python/packages/mem0/agent_framework_mem0/_provider.py b/python/packages/mem0/agent_framework_mem0/_provider.py index 6d726f3a0f..a426ec1740 100644 --- a/python/packages/mem0/agent_framework_mem0/_provider.py +++ b/python/packages/mem0/agent_framework_mem0/_provider.py @@ -55,6 +55,25 @@ def __init__( user_id: The user ID for scoping memories or None. scope_to_per_operation_thread_id: Whether to scope memories to per-operation thread ID. context_prompt: The prompt to prepend to retrieved memories. + + Note: + For advanced filtering (OR logic, date ranges, comparisons, etc.), pass a `filters` + parameter to the `invoking()` through `agent.run()` method. + The filters will be merged with these init params. + + Example: + ```python + provider = Mem0Provider(user_id="user123") + agent = client.create_agent(context_providers=provider) + + # Advanced filtering with OR logic + await agent.run( + "Show memories from Bob or recent memories", + filters={"OR": [{"user_id": "bob"}, {"created_at": {"gte": "2024-12-01"}}]}, + ) + ``` + + See mem0 docs: https://docs.mem0.ai/api-reference/memory/search-memories """ should_close_client = False if mem0_client is None: @@ -137,7 +156,11 @@ async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], * messages: List of new messages in the thread. Keyword Args: - **kwargs: not used at present. + **kwargs: Optional keyword arguments. + filters (dict, optional): Optional dictionary for this specific search. + Merged with init parameters (user_id, agent_id, etc.). + Supports mem0's full filter syntax including logical operators (AND, OR, NOT), + comparison operators (in, gte, lte, gt, lt, ne, icontains), and wildcards (*). Returns: Context: Context object containing instructions with memories. @@ -150,11 +173,13 @@ async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], * if not input_text.strip(): return Context(messages=None) + # Extract filters from kwargs if provided + invocation_filters = kwargs.get("filters") + filters = self._build_filters(invocation_filters) + search_response: MemorySearchResponse_v1_1 | MemorySearchResponse_v2 = await self.mem0_client.search( # type: ignore[misc] query=input_text, - user_id=self.user_id, - agent_id=self.agent_id, - run_id=self._per_operation_thread_id if self.scope_to_per_operation_thread_id else self.thread_id, + filters=filters, ) # Depending on the API version, the response schema varies slightly @@ -185,6 +210,37 @@ def _validate_filters(self) -> None: "At least one of the filters: agent_id, user_id, application_id, or thread_id is required." ) + def _build_filters(self, invocation_filters: dict[str, Any] | None = None) -> dict[str, Any]: + """Build search filters from init parameters and optional per-invocation filters. + + Args: + invocation_filters: Optional filters passed to invoking() for this specific search. + + Returns: + Filter dictionary for mem0 search API. Merges init parameters with invocation filters. + Init parameters provide the base scope (user_id, agent_id, etc.). + Invocation filters can add or override for advanced queries. + """ + # Build base filters from init parameters (flat dictionary = implicit AND) + filters: dict[str, Any] = {} + + if self.user_id: + filters["user_id"] = self.user_id + if self.agent_id: + filters["agent_id"] = self.agent_id + if self.scope_to_per_operation_thread_id and self._per_operation_thread_id: + filters["run_id"] = self._per_operation_thread_id + elif self.thread_id: + filters["run_id"] = self.thread_id + if self.application_id: + filters["app_id"] = self.application_id + + # Merge with per-invocation filters (invocation filters take precedence) + if invocation_filters: + filters.update(invocation_filters) + + return filters + def _validate_per_operation_thread_id(self, thread_id: str | None) -> None: """Validates that a new thread ID doesn't conflict with an existing one when scoped. diff --git a/python/packages/mem0/tests/test_mem0_context_provider.py b/python/packages/mem0/tests/test_mem0_context_provider.py index 4c1be141dc..3ae6f3f48c 100644 --- a/python/packages/mem0/tests/test_mem0_context_provider.py +++ b/python/packages/mem0/tests/test_mem0_context_provider.py @@ -338,7 +338,7 @@ async def test_model_invoking_single_message(self, mock_mem0_client: AsyncMock) mock_mem0_client.search.assert_called_once() call_args = mock_mem0_client.search.call_args assert call_args.kwargs["query"] == "What's the weather?" - assert call_args.kwargs["user_id"] == "user123" + assert call_args.kwargs["filters"] == {"user_id": "user123"} assert isinstance(context, Context) expected_instructions = ( @@ -373,8 +373,7 @@ async def test_model_invoking_with_agent_id(self, mock_mem0_client: AsyncMock) - await provider.invoking(message) call_args = mock_mem0_client.search.call_args - assert call_args.kwargs["agent_id"] == "agent123" - assert call_args.kwargs["user_id"] is None + assert call_args.kwargs["filters"] == {"agent_id": "agent123"} async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock) -> None: """Test invoking with scope_to_per_operation_thread_id enabled.""" @@ -392,7 +391,7 @@ async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_m await provider.invoking(message) call_args = mock_mem0_client.search.call_args - assert call_args.kwargs["run_id"] == "operation_thread" + assert call_args.kwargs["filters"] == {"user_id": "user123", "run_id": "operation_thread"} async def test_model_invoking_no_memories_returns_none_instructions(self, mock_mem0_client: AsyncMock) -> None: """Test that no memories returns context with None instructions.""" @@ -510,3 +509,83 @@ def test_validate_per_operation_thread_id_disabled_scope(self, mock_mem0_client: # Should not raise exception even with different thread ID provider._validate_per_operation_thread_id("different_thread") + + +class TestMem0ProviderBuildFilters: + """Test the _build_filters method.""" + + def test_build_filters_with_invocation_filters(self, mock_mem0_client: AsyncMock) -> None: + """Test that invocation filters are merged with init parameters.""" + provider = Mem0Provider( + user_id="user123", + agent_id="agent123", + mem0_client=mock_mem0_client, + ) + + invocation_filters = {"created_at": {"gte": "2024-12-01"}} + filters = provider._build_filters(invocation_filters) + assert filters == { + "user_id": "user123", + "agent_id": "agent123", + "created_at": {"gte": "2024-12-01"}, + } + + def test_build_filters_invocation_overrides_init(self, mock_mem0_client: AsyncMock) -> None: + """Test that invocation filters can override init parameters.""" + provider = Mem0Provider( + user_id="user123", + mem0_client=mock_mem0_client, + ) + + invocation_filters = {"user_id": "alice"} # Override + filters = provider._build_filters(invocation_filters) + assert filters == {"user_id": "alice"} + + def test_build_filters_with_all_simple_parameters(self, mock_mem0_client: AsyncMock) -> None: + """Test building filters with all simple parameters combined.""" + provider = Mem0Provider( + user_id="user123", + agent_id="agent456", + thread_id="thread999", + application_id="app789", + mem0_client=mock_mem0_client, + ) + + filters = provider._build_filters() + assert filters == { + "user_id": "user123", + "agent_id": "agent456", + "run_id": "thread999", + "app_id": "app789", + } + + def test_build_filters_excludes_none_values(self, mock_mem0_client: AsyncMock) -> None: + """Test that None values are excluded from filters.""" + provider = Mem0Provider( + user_id="user123", + agent_id=None, + thread_id=None, + application_id=None, + mem0_client=mock_mem0_client, + ) + + filters = provider._build_filters() + assert filters == {"user_id": "user123"} + assert "agent_id" not in filters + assert "run_id" not in filters + assert "app_id" not in filters + + async def test_model_invoking_with_filters_kwarg(self, mock_mem0_client: AsyncMock) -> None: + """Test invoking with filters passed via kwargs.""" + provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client) + message = ChatMessage(role=Role.USER, text="Hello") + + mock_mem0_client.search.return_value = [] + + await provider.invoking(message, filters={"created_at": {"gte": "2024-12-01"}}) + + call_args = mock_mem0_client.search.call_args + assert call_args.kwargs["filters"] == { + "user_id": "user123", + "created_at": {"gte": "2024-12-01"}, + } diff --git a/python/samples/getting_started/context_providers/mem0/README.md b/python/samples/getting_started/context_providers/mem0/README.md index 61d8bbd51f..2f8865ca0e 100644 --- a/python/samples/getting_started/context_providers/mem0/README.md +++ b/python/samples/getting_started/context_providers/mem0/README.md @@ -9,6 +9,7 @@ This folder contains examples demonstrating how to use the Mem0 context provider | File | Description | |------|-------------| | [`mem0_basic.py`](mem0_basic.py) | Basic example of using Mem0 context provider to store and retrieve user preferences across different conversation threads. | +| [`mem0_filters.py`](mem0_filters.py) | Example demonstrating advanced filtering capabilities with Mem0, including OR logic and date-based filters for precise memory retrieval. | | [`mem0_threads.py`](mem0_threads.py) | Advanced example demonstrating different thread scoping strategies with Mem0. Covers global thread scope (memories shared across all operations), per-operation thread scope (memories isolated per thread), and multiple agents with different memory configurations for personal vs. work contexts. | | [`mem0_oss.py`](mem0_oss.py) | Example of using the Mem0 Open Source self-hosted version as the context provider. Demonstrates setup and configuration for local deployment. | diff --git a/python/samples/getting_started/context_providers/mem0/mem0_basic.py b/python/samples/getting_started/context_providers/mem0/mem0_basic.py index 0c2252d66a..5fef82d390 100644 --- a/python/samples/getting_started/context_providers/mem0/mem0_basic.py +++ b/python/samples/getting_started/context_providers/mem0/mem0_basic.py @@ -54,6 +54,13 @@ async def main() -> None: result = await agent.run(query) print(f"Agent: {result}\n") + # Mem0 processes and indexes memories asynchronously. + # Wait for memories to be indexed before querying in a new thread. + # In production, consider implementing retry logic or using Mem0's + # eventual consistency handling instead of a fixed delay. + print("Waiting for memories to be processed...") + await asyncio.sleep(12) # Empirically determined delay for Mem0 indexing + print("\nRequest within a new thread:") # Create a new thread for the agent. # The new thread has no context of the previous conversation. diff --git a/python/samples/getting_started/context_providers/mem0/mem0_filters.py b/python/samples/getting_started/context_providers/mem0/mem0_filters.py new file mode 100644 index 0000000000..87e14ecb59 --- /dev/null +++ b/python/samples/getting_started/context_providers/mem0/mem0_filters.py @@ -0,0 +1,75 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from datetime import datetime, timedelta + +from agent_framework.azure import AzureAIAgentClient +from agent_framework.mem0 import Mem0Provider +from azure.identity.aio import AzureCliCredential + + +async def main() -> None: + """Example demonstrating advanced filtering with Mem0 context provider.""" + print("=== Mem0 Advanced Filtering Example ===\n") + + # For Azure authentication, run `az login` command in terminal or replace AzureCliCredential with preferred + # authentication option. + # For Mem0 authentication, set Mem0 API key via "api_key" parameter or MEM0_API_KEY environment variable. + async with ( + AzureCliCredential() as credential, + AzureAIAgentClient(credential=credential).create_agent( + name="FilterAssistant", + instructions="You are a helpful assistant that retrieves and summarizes memories.", + context_providers=Mem0Provider(user_id="demo_user"), + ) as agent, + ): + # Store some memories with different timestamps + print("Storing memories...") + await agent.run("I love Python programming.") + await agent.run("My favorite color is blue.") + await agent.run("I work as a software engineer.") + + # Wait for memories to be indexed + print("Waiting for memories to be processed...") + await asyncio.sleep(12) # Empirically determined delay for Mem0 indexing + + # Calculate a date from a week ago for filtering + week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") + + print("\n=== Using OR Filter ===") + # Use OR logic to find memories matching either condition + query = "What do you know about me?" + print(f"User: {query}") + print(f"Filter: Memories from another user OR created after {week_ago}") + + result = await agent.run( + query, + filters={ + "OR": [ + {"user_id": "another_user"}, # This won't match + {"created_at": {"gte": week_ago}}, # This will match our recent memories + ] + }, + ) + print(f"Agent: {result}\n") + + print("\n=== Using Complex Filter ===") + # Demonstrate combining multiple filter conditions + query = "Tell me about my preferences" + print(f"User: {query}") + print("Filter: Recent memories with specific keywords") + + result = await agent.run( + query, + filters={ + "AND": [ + {"user_id": "demo_user"}, + {"created_at": {"gte": week_ago}}, + ] + }, + ) + print(f"Agent: {result}\n") + + +if __name__ == "__main__": + asyncio.run(main())