diff --git a/contributing/samples/sos_memory/README.md b/contributing/samples/sos_memory/README.md new file mode 100644 index 0000000..67740ac --- /dev/null +++ b/contributing/samples/sos_memory/README.md @@ -0,0 +1,102 @@ +# SOS Memory Agent Sample + +This sample demonstrates using **SOS (Sovereign Operating System) Mirror** as a memory backend for ADK agents. + +## What is SOS Mirror? + +SOS Mirror is a semantic memory system with unique features: + +- **FRC Physics**: Memories are ranked by Frequency (access count), Recency (time decay), and Context (semantic relevance) +- **Lineage Tracking**: Every memory has a cryptographic hash chain for provenance +- **Multi-Agent Isolation**: Each agent has its own memory namespace +- **Semantic Search**: Vector embeddings for similarity-based retrieval + +## Prerequisites + +1. **SOS Mirror API** running locally or remotely + - See: https://github.com/servathadi/sos + +2. **Environment variables**: + ```bash + export SOS_MIRROR_URL="http://localhost:8844" # or your deployment URL + export SOS_API_KEY="your-api-key" + export GOOGLE_API_KEY="your-gemini-key" + ``` + +## Quick Start + +```bash +# Install dependencies +pip install google-adk google-adk-community + +# Run the agent +cd contributing/samples/sos_memory +python main.py +``` + +## Using with ADK CLI + +```bash +# Start web interface +adk web sos_memory_agent + +# Or run in terminal +adk run sos_memory_agent +``` + +## Configuration + +The `SOSMemoryServiceConfig` allows customization: + +```python +from google.adk_community.memory import SOSMemoryService, SOSMemoryServiceConfig + +config = SOSMemoryServiceConfig( + search_top_k=10, # Max memories per search + timeout=30.0, # Request timeout + user_content_salience=0.8, # Weight for user messages + model_content_salience=0.7,# Weight for model responses + enable_lineage_tracking=True, +) + +memory_service = SOSMemoryService( + base_url="https://mirror.example.com", + api_key="your-key", + agent_id="my-agent", + config=config, +) +``` + +## How FRC Physics Works + +When searching memory, SOS ranks results using: + +``` +score = α·frequency + β·recency + γ·context_similarity +``` + +Where: +- **Frequency**: How often a memory has been accessed (builds importance over time) +- **Recency**: Time decay function (recent memories score higher) +- **Context**: Cosine similarity between query and memory embeddings + +This means frequently-accessed, recent, and semantically-relevant memories surface first. + +## Lineage Tracking + +Every memory gets a lineage hash: + +```python +hash = SHA256(previous_hash + agent_id + content + context)[:16] +``` + +This creates an immutable chain of memory provenance, useful for: +- Auditing agent decisions +- Debugging conversation flows +- Ensuring memory integrity + +## Learn More + +- [SOS GitHub](https://github.com/servathadi/sos) +- [ADK Documentation](https://google.github.io/adk-docs/) +- [ADK Community](https://github.com/google/adk-python-community) diff --git a/contributing/samples/sos_memory/main.py b/contributing/samples/sos_memory/main.py new file mode 100644 index 0000000..f01d639 --- /dev/null +++ b/contributing/samples/sos_memory/main.py @@ -0,0 +1,88 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Main entry point for SOS Memory Agent sample. + +This sample demonstrates how to use SOS Mirror as a memory backend +for ADK agents. SOS provides semantic memory with FRC physics and +lineage tracking for memory provenance. + +Prerequisites: + 1. Set up SOS Mirror API (https://github.com/servathadi/sos) + 2. Set environment variables: + - SOS_MIRROR_URL: URL of SOS Mirror API (default: http://localhost:8844) + - SOS_API_KEY: API key for authentication + - GOOGLE_API_KEY: Gemini API key + +Usage: + # Start the agent server + python main.py + + # Or use ADK CLI + adk web sos_memory_agent +""" + +import os +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.adk_community.memory import SOSMemoryService, SOSMemoryServiceConfig + +from sos_memory_agent import root_agent + + +def create_memory_service() -> SOSMemoryService: + """Create and configure SOS Memory service.""" + base_url = os.environ.get("SOS_MIRROR_URL", "http://localhost:8844") + api_key = os.environ.get("SOS_API_KEY", "") + + if not api_key: + raise ValueError( + "SOS_API_KEY environment variable is required. " + "Get your API key from your SOS Mirror deployment." + ) + + config = SOSMemoryServiceConfig( + search_top_k=10, + timeout=30.0, + user_content_salience=0.8, + model_content_salience=0.7, + enable_lineage_tracking=True, + ) + + return SOSMemoryService( + base_url=base_url, + api_key=api_key, + agent_id="sos_memory_agent", + config=config, + ) + + +def main(): + """Run the SOS Memory Agent.""" + memory_service = create_memory_service() + session_service = InMemorySessionService() + + runner = Runner( + agent=root_agent, + app_name="sos_memory_sample", + session_service=session_service, + memory_service=memory_service, + ) + + # Start the web interface + runner.run() + + +if __name__ == "__main__": + main() diff --git a/contributing/samples/sos_memory/sos_memory_agent/__init__.py b/contributing/samples/sos_memory/sos_memory_agent/__init__.py new file mode 100644 index 0000000..641ccb6 --- /dev/null +++ b/contributing/samples/sos_memory/sos_memory_agent/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""SOS Memory Agent sample package.""" + +from .agent import root_agent + +__all__ = ["root_agent"] diff --git a/contributing/samples/sos_memory/sos_memory_agent/agent.py b/contributing/samples/sos_memory/sos_memory_agent/agent.py new file mode 100644 index 0000000..0158185 --- /dev/null +++ b/contributing/samples/sos_memory/sos_memory_agent/agent.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""SOS Memory Agent - Agent with semantic memory via SOS Mirror API. + +This sample demonstrates using SOS (Sovereign Operating System) Mirror +for agent memory. SOS provides: + +- FRC Physics: Memories ranked by Frequency, Recency, and Context relevance +- Lineage Tracking: Cryptographic provenance for every memory +- Multi-Agent Isolation: Each agent has its own memory namespace +- Semantic Search: Vector-based similarity matching +""" + +from datetime import datetime +from google.adk import Agent +from google.adk.agents.callback_context import CallbackContext +from google.adk.tools import load_memory, preload_memory + + +def update_current_time(callback_context: CallbackContext): + """Update the current time in agent state for temporal awareness.""" + callback_context.state['_time'] = datetime.now().isoformat() + + +root_agent = Agent( + model='gemini-2.0-flash', + name='sos_memory_agent', + description=( + 'Agent with semantic memory powered by SOS Mirror. ' + 'Uses FRC physics for intelligent memory retrieval and ' + 'lineage tracking for memory provenance.' + ), + before_agent_callback=update_current_time, + instruction=( + 'You are a helpful assistant with access to persistent semantic memory.\n\n' + 'Your memory is powered by SOS Mirror, which uses FRC physics to retrieve ' + 'the most relevant memories based on:\n' + '- Frequency: How often a memory has been accessed\n' + '- Recency: How recent the memory is\n' + '- Context: Semantic relevance to the current query\n\n' + 'When the user asks a question:\n' + '1. First, use load_memory to search for relevant past conversations\n' + '2. If the first search yields no results, try different keywords\n' + '3. Use remembered context to provide personalized responses\n' + '4. Reference past interactions when relevant ("As we discussed before...")\n\n' + 'Current time: {_time}' + ), + tools=[preload_memory, load_memory], +) diff --git a/src/google/adk_community/memory/__init__.py b/src/google/adk_community/memory/__init__.py index 1f3442c..56c6cf8 100644 --- a/src/google/adk_community/memory/__init__.py +++ b/src/google/adk_community/memory/__init__.py @@ -16,9 +16,13 @@ from .open_memory_service import OpenMemoryService from .open_memory_service import OpenMemoryServiceConfig +from .sos_memory_service import SOSMemoryService +from .sos_memory_service import SOSMemoryServiceConfig __all__ = [ "OpenMemoryService", "OpenMemoryServiceConfig", + "SOSMemoryService", + "SOSMemoryServiceConfig", ] diff --git a/src/google/adk_community/memory/sos_memory_service.py b/src/google/adk_community/memory/sos_memory_service.py new file mode 100644 index 0000000..7d6e727 --- /dev/null +++ b/src/google/adk_community/memory/sos_memory_service.py @@ -0,0 +1,369 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""SOS (Sovereign Operating System) Memory Service for ADK. + +This module provides integration with SOS Mirror API, a multi-tier semantic +memory system with FRC (Frequency-Recency-Context) physics for intelligent +retrieval and consolidation. + +Features: +- Semantic search with vector embeddings +- FRC-weighted retrieval (frequency, recency, context relevance) +- Lineage tracking for memory provenance +- Multi-agent memory isolation +- Automatic memory consolidation + +See https://github.com/servathadi/sos for more information. +""" + +from __future__ import annotations + +import hashlib +import logging +import re +from datetime import datetime, timezone +from typing import Optional, List, Dict, Any +from typing import TYPE_CHECKING + +import httpx +from google.genai import types +from pydantic import BaseModel +from pydantic import Field +from typing_extensions import override + +from google.adk.memory import _utils +from google.adk.memory.base_memory_service import BaseMemoryService +from google.adk.memory.base_memory_service import SearchMemoryResponse +from google.adk.memory.memory_entry import MemoryEntry + +from .utils import extract_text_from_event + +if TYPE_CHECKING: + from google.adk.sessions.session import Session + +logger = logging.getLogger('google_adk.' + __name__) + + +class SOSMemoryService(BaseMemoryService): + """Memory service implementation using SOS Mirror API. + + SOS Mirror provides semantic memory with FRC physics - memories are + retrieved based on frequency (how often accessed), recency (how recent), + and context (semantic relevance to query). + + Example usage: + ```python + from google.adk_community.memory import SOSMemoryService + + memory_service = SOSMemoryService( + base_url="https://mirror.mumega.com", + api_key="your-api-key", + agent_id="my-agent" + ) + + # Use with ADK agent + agent = Agent( + model='gemini-2.0-flash', + name='my_agent', + memory_service=memory_service, + tools=[load_memory, preload_memory], + ) + ``` + """ + + def __init__( + self, + base_url: str = "http://localhost:8844", + api_key: str = "", + agent_id: str = "adk_agent", + config: Optional[SOSMemoryServiceConfig] = None, + ): + """Initialize the SOS Memory service. + + Args: + base_url: Base URL of the SOS Mirror API. + api_key: API key for authentication. + agent_id: Unique identifier for this agent's memory namespace. + config: SOSMemoryServiceConfig instance for advanced options. + + Raises: + ValueError: If api_key is not provided. + """ + if not api_key: + raise ValueError( + "api_key is required for SOS Mirror. " + "Provide an API key when initializing SOSMemoryService." + ) + self._base_url = base_url.rstrip('/') + self._api_key = api_key + self._agent_id = agent_id + self._config = config or SOSMemoryServiceConfig() + self._lineage_chain: List[str] = [] + + def _compute_lineage_hash(self, content: str, context: str = "") -> str: + """Compute a lineage hash for memory provenance tracking.""" + prev_hash = self._lineage_chain[-1] if self._lineage_chain else "genesis" + data = f"{prev_hash}:{self._agent_id}:{content}:{context}" + return hashlib.sha256(data.encode()).hexdigest()[:16] + + def _determine_salience(self, author: Optional[str]) -> float: + """Determine salience based on content author.""" + if not author: + return self._config.default_salience + + author_lower = author.lower() + if author_lower == "user": + return self._config.user_content_salience + elif author_lower == "model": + return self._config.model_content_salience + else: + return self._config.default_salience + + def _prepare_memory_data( + self, event, content_text: str, session + ) -> Dict[str, Any]: + """Prepare memory data for SOS Mirror API.""" + timestamp_str = None + if event.timestamp: + timestamp_str = _utils.format_timestamp(event.timestamp) + + # Compute lineage hash for provenance + lineage_hash = self._compute_lineage_hash(content_text, session.id) + self._lineage_chain.append(lineage_hash) + + # Build enriched content with metadata prefix + enriched_content = content_text + metadata_parts = [] + if event.author: + metadata_parts.append(f"Author: {event.author}") + if timestamp_str: + metadata_parts.append(f"Time: {timestamp_str}") + + if metadata_parts: + metadata_prefix = "[" + ", ".join(metadata_parts) + "] " + enriched_content = metadata_prefix + content_text + + return { + "text": enriched_content, + "agent": self._agent_id, + "context_id": session.id, + "metadata": { + "app_name": session.app_name, + "user_id": session.user_id, + "session_id": session.id, + "event_id": event.id, + "invocation_id": event.invocation_id, + "author": event.author, + "timestamp": event.timestamp, + "lineage_hash": lineage_hash, + "salience": self._determine_salience(event.author), + "source": "adk_session" + } + } + + @override + async def add_session_to_memory(self, session: Session): + """Add a session's events to SOS Mirror memory.""" + memories_added = 0 + + async with httpx.AsyncClient(timeout=self._config.timeout) as client: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}" + } + + for event in session.events: + content_text = extract_text_from_event(event) + if not content_text: + continue + + memory_data = self._prepare_memory_data(event, content_text, session) + + try: + response = await client.post( + f"{self._base_url}/store", + json=memory_data, + headers=headers + ) + response.raise_for_status() + + memories_added += 1 + logger.debug("Added memory for event %s (lineage: %s)", + event.id, memory_data["metadata"]["lineage_hash"]) + except httpx.HTTPStatusError as e: + logger.error( + "Failed to add memory for event %s: HTTP %s - %s", + event.id, e.response.status_code, e.response.text + ) + except httpx.RequestError as e: + logger.error( + "Failed to add memory for event %s: %s", event.id, e + ) + except Exception as e: + logger.error( + "Unexpected error adding memory for event %s: %s", event.id, e + ) + + logger.info( + "Added %d memories from session %s to agent %s", + memories_added, session.id, self._agent_id + ) + + def _build_search_payload( + self, app_name: str, user_id: str, query: str + ) -> Dict[str, Any]: + """Build search payload for SOS Mirror query API.""" + return { + "query": query, + "agent": self._agent_id, + "limit": self._config.search_top_k, + "filters": { + "user_id": user_id, + "app_name": app_name, + } if self._config.enable_user_filtering else {} + } + + def _convert_to_memory_entry(self, result: Dict[str, Any]) -> Optional[MemoryEntry]: + """Convert SOS Mirror result to ADK MemoryEntry.""" + try: + raw_content = result.get("text", result.get("content", "")) + author = None + timestamp = None + clean_content = raw_content + + # Parse enriched content format: [Author: user, Time: ...] Content + match = re.match(r'^\[([^\]]+)\]\s+(.*)', raw_content, re.DOTALL) + if match: + metadata_str = match.group(1) + clean_content = match.group(2) + + author_match = re.search(r'Author:\s*([^,\]]+)', metadata_str) + if author_match: + author = author_match.group(1).strip() + + time_match = re.search(r'Time:\s*([^,\]]+)', metadata_str) + if time_match: + timestamp = time_match.group(1).strip() + + # Also check metadata dict + metadata = result.get("metadata", {}) + if not author and metadata.get("author"): + author = metadata["author"] + if not timestamp and metadata.get("timestamp"): + timestamp = metadata["timestamp"] + + content = types.Content(parts=[types.Part(text=clean_content)]) + + return MemoryEntry( + content=content, + author=author, + timestamp=timestamp + ) + except (KeyError, ValueError) as e: + logger.debug("Failed to convert result to MemoryEntry: %s", e) + return None + + @override + async def search_memory( + self, *, app_name: str, user_id: str, query: str + ) -> SearchMemoryResponse: + """Search memories using SOS Mirror's semantic search with FRC physics.""" + try: + search_payload = self._build_search_payload(app_name, user_id, query) + memories = [] + + async with httpx.AsyncClient(timeout=self._config.timeout) as client: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}" + } + + logger.debug("SOS search payload: %s", search_payload) + + response = await client.post( + f"{self._base_url}/search", + json=search_payload, + headers=headers + ) + response.raise_for_status() + result = response.json() + + # Handle both list and dict response formats + results = result if isinstance(result, list) else result.get("results", []) + + logger.debug("SOS search returned %d results", len(results)) + + for match in results: + memory_entry = self._convert_to_memory_entry(match) + if memory_entry: + memories.append(memory_entry) + + logger.info( + "Found %d memories for query '%s' (agent: %s)", + len(memories), query[:50], self._agent_id + ) + return SearchMemoryResponse(memories=memories) + + except httpx.HTTPStatusError as e: + logger.error( + "SOS search failed: HTTP %s - %s", + e.response.status_code, e.response.text + ) + return SearchMemoryResponse(memories=[]) + except httpx.RequestError as e: + logger.error("SOS search request failed: %s", e) + return SearchMemoryResponse(memories=[]) + except Exception as e: + logger.error("Unexpected error in SOS search: %s", e) + return SearchMemoryResponse(memories=[]) + + async def get_lineage(self) -> Dict[str, Any]: + """Get the current lineage chain for this agent session. + + Returns: + Dict with lineage information including chain and latest hash. + """ + return { + "agent_id": self._agent_id, + "chain_length": len(self._lineage_chain), + "latest_hash": self._lineage_chain[-1] if self._lineage_chain else None, + "chain": self._lineage_chain[-10:], # Last 10 hashes + } + + async def close(self): + """Close the memory service and cleanup resources.""" + self._lineage_chain.clear() + + +class SOSMemoryServiceConfig(BaseModel): + """Configuration for SOS Memory service behavior. + + Attributes: + search_top_k: Maximum memories to retrieve per search. + timeout: Request timeout in seconds. + user_content_salience: Salience for user content (0.0-1.0). + model_content_salience: Salience for model content (0.0-1.0). + default_salience: Default salience value (0.0-1.0). + enable_user_filtering: Filter results by user_id. + enable_lineage_tracking: Track memory provenance with hashes. + """ + + search_top_k: int = Field(default=10, ge=1, le=100) + timeout: float = Field(default=30.0, gt=0.0) + user_content_salience: float = Field(default=0.8, ge=0.0, le=1.0) + model_content_salience: float = Field(default=0.7, ge=0.0, le=1.0) + default_salience: float = Field(default=0.6, ge=0.0, le=1.0) + enable_user_filtering: bool = Field(default=True) + enable_lineage_tracking: bool = Field(default=True) diff --git a/tests/unittests/memory/test_sos_memory_service.py b/tests/unittests/memory/test_sos_memory_service.py new file mode 100644 index 0000000..a2ad1d6 --- /dev/null +++ b/tests/unittests/memory/test_sos_memory_service.py @@ -0,0 +1,329 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for SOS Memory Service.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +import json + +from google.adk_community.memory.sos_memory_service import ( + SOSMemoryService, + SOSMemoryServiceConfig, +) + + +class TestSOSMemoryServiceConfig: + """Tests for SOSMemoryServiceConfig.""" + + def test_default_config(self): + """Test default configuration values.""" + config = SOSMemoryServiceConfig() + + assert config.search_top_k == 10 + assert config.timeout == 30.0 + assert config.user_content_salience == 0.8 + assert config.model_content_salience == 0.7 + assert config.default_salience == 0.6 + assert config.enable_user_filtering is True + assert config.enable_lineage_tracking is True + + def test_custom_config(self): + """Test custom configuration values.""" + config = SOSMemoryServiceConfig( + search_top_k=20, + timeout=60.0, + user_content_salience=0.9, + enable_lineage_tracking=False, + ) + + assert config.search_top_k == 20 + assert config.timeout == 60.0 + assert config.user_content_salience == 0.9 + assert config.enable_lineage_tracking is False + + +class TestSOSMemoryService: + """Tests for SOSMemoryService.""" + + def test_init_requires_api_key(self): + """Test that initialization requires an API key.""" + with pytest.raises(ValueError) as exc_info: + SOSMemoryService(api_key="") + + assert "api_key is required" in str(exc_info.value) + + def test_init_with_api_key(self): + """Test successful initialization with API key.""" + service = SOSMemoryService( + base_url="http://localhost:8844", + api_key="test-key", + agent_id="test-agent", + ) + + assert service._base_url == "http://localhost:8844" + assert service._api_key == "test-key" + assert service._agent_id == "test-agent" + + def test_init_strips_trailing_slash(self): + """Test that trailing slashes are stripped from base_url.""" + service = SOSMemoryService( + base_url="http://localhost:8844/", + api_key="test-key", + ) + + assert service._base_url == "http://localhost:8844" + + def test_compute_lineage_hash(self): + """Test lineage hash computation.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + hash1 = service._compute_lineage_hash("content1", "context1") + hash2 = service._compute_lineage_hash("content2", "context2") + + # Hashes should be 16 characters + assert len(hash1) == 16 + assert len(hash2) == 16 + + # Different content should produce different hashes + assert hash1 != hash2 + + def test_lineage_chain_grows(self): + """Test that lineage chain grows when hashes are appended.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + assert len(service._lineage_chain) == 0 + + # Compute and manually append (as _prepare_memory_data does) + hash1 = service._compute_lineage_hash("content1", "") + service._lineage_chain.append(hash1) + assert len(service._lineage_chain) == 1 + + hash2 = service._compute_lineage_hash("content2", "") + service._lineage_chain.append(hash2) + assert len(service._lineage_chain) == 2 + + # Verify hashes are different + assert hash1 != hash2 + + def test_determine_salience_user(self): + """Test salience determination for user content.""" + service = SOSMemoryService(api_key="test-key") + + assert service._determine_salience("user") == 0.8 + assert service._determine_salience("User") == 0.8 + assert service._determine_salience("USER") == 0.8 + + def test_determine_salience_model(self): + """Test salience determination for model content.""" + service = SOSMemoryService(api_key="test-key") + + assert service._determine_salience("model") == 0.7 + assert service._determine_salience("Model") == 0.7 + + def test_determine_salience_default(self): + """Test default salience for unknown authors.""" + service = SOSMemoryService(api_key="test-key") + + assert service._determine_salience(None) == 0.6 + assert service._determine_salience("unknown") == 0.6 + + def test_build_search_payload(self): + """Test search payload construction.""" + service = SOSMemoryService( + api_key="test-key", + agent_id="test-agent", + ) + + payload = service._build_search_payload( + app_name="test-app", + user_id="user-123", + query="test query", + ) + + assert payload["query"] == "test query" + assert payload["agent"] == "test-agent" + assert payload["limit"] == 10 + assert payload["filters"]["user_id"] == "user-123" + assert payload["filters"]["app_name"] == "test-app" + + def test_build_search_payload_no_filtering(self): + """Test search payload without user filtering.""" + config = SOSMemoryServiceConfig(enable_user_filtering=False) + service = SOSMemoryService( + api_key="test-key", + agent_id="test-agent", + config=config, + ) + + payload = service._build_search_payload( + app_name="test-app", + user_id="user-123", + query="test query", + ) + + assert payload["filters"] == {} + + def test_convert_to_memory_entry_simple(self): + """Test converting simple result to MemoryEntry.""" + service = SOSMemoryService(api_key="test-key") + + result = {"text": "Hello world", "metadata": {}} + entry = service._convert_to_memory_entry(result) + + assert entry is not None + assert entry.content.parts[0].text == "Hello world" + + def test_convert_to_memory_entry_with_prefix(self): + """Test converting result with metadata prefix to MemoryEntry.""" + service = SOSMemoryService(api_key="test-key") + + result = { + "text": "[Author: user, Time: 2025-01-15T10:00:00] Hello world", + "metadata": {}, + } + entry = service._convert_to_memory_entry(result) + + assert entry is not None + assert entry.content.parts[0].text == "Hello world" + assert entry.author == "user" + assert entry.timestamp == "2025-01-15T10:00:00" + + def test_convert_to_memory_entry_from_metadata(self): + """Test converting result with metadata dict to MemoryEntry.""" + service = SOSMemoryService(api_key="test-key") + + result = { + "text": "Hello world", + "metadata": { + "author": "model", + "timestamp": "2025-01-15T11:00:00", + }, + } + entry = service._convert_to_memory_entry(result) + + assert entry is not None + assert entry.author == "model" + assert entry.timestamp == "2025-01-15T11:00:00" + + @pytest.mark.asyncio + async def test_get_lineage(self): + """Test getting lineage information.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + # Empty lineage + lineage = await service.get_lineage() + assert lineage["agent_id"] == "test-agent" + assert lineage["chain_length"] == 0 + assert lineage["latest_hash"] is None + + # Add some hashes (simulating what _prepare_memory_data does) + hash1 = service._compute_lineage_hash("content1", "") + service._lineage_chain.append(hash1) + hash2 = service._compute_lineage_hash("content2", "") + service._lineage_chain.append(hash2) + + lineage = await service.get_lineage() + assert lineage["chain_length"] == 2 + assert lineage["latest_hash"] == hash2 + assert len(lineage["chain"]) == 2 + + @pytest.mark.asyncio + async def test_close_clears_lineage(self): + """Test that close() clears the lineage chain.""" + service = SOSMemoryService(api_key="test-key") + + hash1 = service._compute_lineage_hash("content", "") + service._lineage_chain.append(hash1) + assert len(service._lineage_chain) == 1 + + await service.close() + assert len(service._lineage_chain) == 0 + + @pytest.mark.asyncio + async def test_search_memory_success(self): + """Test successful memory search.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + mock_response = MagicMock() + mock_response.json.return_value = { + "results": [ + {"text": "Memory 1", "metadata": {}}, + {"text": "Memory 2", "metadata": {}}, + ] + } + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = MagicMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_client + + response = await service.search_memory( + app_name="test-app", + user_id="user-123", + query="test query", + ) + + assert len(response.memories) == 2 + + @pytest.mark.asyncio + async def test_search_memory_handles_list_response(self): + """Test search handles list response format.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + mock_response = MagicMock() + # Some APIs return a list directly instead of {"results": [...]} + mock_response.json.return_value = [ + {"text": "Memory 1", "metadata": {}}, + ] + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = MagicMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_client + + response = await service.search_memory( + app_name="test-app", + user_id="user-123", + query="test query", + ) + + assert len(response.memories) == 1 + + @pytest.mark.asyncio + async def test_search_memory_handles_error(self): + """Test search handles HTTP errors gracefully.""" + service = SOSMemoryService(api_key="test-key", agent_id="test-agent") + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = MagicMock() + mock_client.post = AsyncMock(side_effect=Exception("Connection failed")) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_client + + response = await service.search_memory( + app_name="test-app", + user_id="user-123", + query="test query", + ) + + # Should return empty response on error, not raise + assert len(response.memories) == 0