Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions src/bedrock_agentcore/memory/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
Role,
StrategyType,
)
from .models.filters import EventMetadataFilter, MetadataValue
from .models.filters import EventMetadataFilter, IndexedKey, MemoryMetadataFilter, MetadataValue

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -160,8 +160,25 @@ def create_memory(
event_expiry_days: int = 90,
memory_execution_role_arn: Optional[str] = None,
stream_delivery_resources: Optional[Dict[str, Any]] = None,
indexed_keys: Optional[List[IndexedKey]] = None,
) -> Dict[str, Any]:
"""Create a memory with simplified configuration."""
"""Create a memory with simplified configuration.

Args:
name: Name for the memory resource
strategies: Optional list of strategy configurations
description: Optional description
event_expiry_days: How long to retain events (default: 90 days)
memory_execution_role_arn: IAM role ARN for memory execution
stream_delivery_resources: Optional delivery configuration for streaming memory records
indexed_keys: Optional list of metadata keys to index for filtering.
Each entry should have 'key' (str) and 'type' ('STRING', 'STRINGLIST', or 'NUMBER').
Once declared, indexed keys cannot be removed.
Example: [{"key": "priority", "type": "NUMBER"}, {"key": "agent_type", "type": "STRING"}]

Returns:
Created memory object
"""
if strategies is None:
strategies = []

Expand All @@ -184,6 +201,9 @@ def create_memory(
if stream_delivery_resources is not None:
params["streamDeliveryResources"] = stream_delivery_resources

if indexed_keys is not None:
params["indexedKeys"] = indexed_keys

response = self.gmcp_client.create_memory(**params)

memory = response["memory"]
Expand All @@ -205,9 +225,21 @@ def create_or_get_memory(
event_expiry_days: int = 90,
memory_execution_role_arn: Optional[str] = None,
stream_delivery_resources: Optional[Dict[str, Any]] = None,
indexed_keys: Optional[List[IndexedKey]] = None,
) -> Dict[str, Any]:
"""Create a memory resource or fetch the existing memory details if it already exists.

Args:
name: Name for the memory resource
strategies: Optional list of strategy configurations
description: Optional description
event_expiry_days: How long to retain events (default: 90 days)
memory_execution_role_arn: IAM role ARN for memory execution
stream_delivery_resources: Optional delivery configuration for streaming memory records
indexed_keys: Optional list of metadata keys to index for filtering.
Once declared, indexed keys cannot be removed; new keys can be added
via `update_memory(addIndexedKeys=...)`.

Returns:
Memory object, either newly created or existing
"""
Expand All @@ -219,6 +251,7 @@ def create_or_get_memory(
event_expiry_days=event_expiry_days,
memory_execution_role_arn=memory_execution_role_arn,
stream_delivery_resources=stream_delivery_resources,
indexed_keys=indexed_keys,
)
return memory
except ClientError as e:
Expand All @@ -243,6 +276,7 @@ def create_memory_and_wait(
stream_delivery_resources: Optional[Dict[str, Any]] = None,
max_wait: int = 300,
poll_interval: int = 10,
indexed_keys: Optional[List[IndexedKey]] = None,
) -> Dict[str, Any]:
"""Create a memory and wait for it to become ACTIVE.

Expand All @@ -256,6 +290,10 @@ def create_memory_and_wait(
event_expiry_days: How long to retain events (default: 90 days)
memory_execution_role_arn: IAM role ARN for memory execution
stream_delivery_resources: Optional delivery configuration for streaming memory records
indexed_keys: Optional list of metadata keys to index for filtering.
Each entry should have 'key' (str) and 'type' ('STRING', 'STRINGLIST', or 'NUMBER').
Once declared, indexed keys cannot be removed; new keys can be added
via `update_memory(addIndexedKeys=...)`.
max_wait: Maximum seconds to wait (default: 300)
poll_interval: Seconds between status checks (default: 10)

Expand All @@ -274,6 +312,7 @@ def create_memory_and_wait(
event_expiry_days=event_expiry_days,
memory_execution_role_arn=memory_execution_role_arn,
stream_delivery_resources=stream_delivery_resources,
indexed_keys=indexed_keys,
)

memory_id = memory.get("memoryId", memory.get("id")) # Handle both field names
Expand Down Expand Up @@ -318,6 +357,7 @@ def retrieve_memories(
actor_id: Optional[str] = None,
top_k: int = 3,
namespace_path: Optional[str] = None,
metadata_filters: Optional[List[MemoryMetadataFilter]] = None,
) -> List[Dict[str, Any]]:
"""Retrieve relevant memories using exact match or hierarchical path prefix.

Expand All @@ -330,15 +370,29 @@ def retrieve_memories(
actor_id: Optional actor ID (deprecated, use namespace)
top_k: Number of results to return
namespace_path: Hierarchical path prefix (e.g., "/org/team/")
metadata_filters: Optional list of metadata filter expressions to scope results.
Use MemoryMetadataFilter.build_expression() to construct filters.
The service accepts 1-5 filters. An empty list is treated as no filter.
Example: [MemoryMetadataFilter.build_expression(
MemoryRecordLeftExpression.build("priority"),
MemoryRecordOperatorType.EQUALS_TO,
MemoryRecordRightExpression.build_string("high"),
)]

Returns:
List of memory records. Returns an empty list if the namespace
arguments are invalid (both provided, neither provided, or contain
wildcards) or if the service call fails.

Raises:
ValueError: If `metadata_filters` exceeds the service maximum of 5.
"""
if query is None:
raise TypeError("retrieve_memories() missing required argument: 'query'")

if metadata_filters is not None and len(metadata_filters) > 5:
raise ValueError(f"metadata_filters supports a maximum of 5 expressions; received {len(metadata_filters)}.")

try:
ns_params = build_namespace_params(namespace, namespace_path)
except ValueError as e:
Expand All @@ -348,8 +402,13 @@ def retrieve_memories(
ns_value = namespace or namespace_path

try:
search_criteria = {"searchQuery": query, "topK": top_k}
if metadata_filters:
search_criteria["metadataFilters"] = metadata_filters
logger.debug("Applying %d metadata filter(s)", len(metadata_filters))

response = self.gmdp_client.retrieve_memory_records(
memoryId=memory_id, searchCriteria={"searchQuery": query, "topK": top_k}, **ns_params
memoryId=memory_id, searchCriteria=search_criteria, **ns_params
)
memories = response.get("memoryRecordSummaries", [])
logger.info("Retrieved %d memories from namespace: %s", len(memories), ns_value)
Expand Down
12 changes: 12 additions & 0 deletions src/bedrock_agentcore/memory/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
from .DictWrapper import DictWrapper
from .filters import (
EventMetadataFilter,
IndexedKey,
LeftExpression,
MemoryMetadataFilter,
MemoryRecordLeftExpression,
MemoryRecordOperatorType,
MemoryRecordRightExpression,
MetadataKey,
MetadataValue,
MetadataValueType,
OperatorType,
RightExpression,
StringValue,
Expand Down Expand Up @@ -101,4 +107,10 @@ def __init__(self, session_summary: Dict[str, Any]):
"OperatorType",
"RightExpression",
"EventMetadataFilter",
"MemoryRecordOperatorType",
"MemoryRecordLeftExpression",
"MemoryRecordRightExpression",
"MemoryMetadataFilter",
"MetadataValueType",
"IndexedKey",
]
Loading
Loading