Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions src/agents/memory/openai_conversations_session.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations

from typing import Any

from openai import AsyncOpenAI

from agents.models._openai_shared import get_default_openai_client

from ..items import TResponseInputItem
from ..run_context import RunContextWrapper
from .session import SessionABC
from .session_settings import SessionSettings, resolve_session_limit

Expand Down Expand Up @@ -68,7 +71,18 @@ async def _get_session_id(self) -> str:
async def _clear_session_id(self) -> None:
self._session_id = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, wrapper: RunContextWrapper[Any] | None = None
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
wrapper: Optional RunContextWrapper for accessing context data during retrieval.

Returns:
List of input items representing the conversation history
"""
session_id = await self._get_session_id()

session_limit = resolve_session_limit(limit, self.session_settings)
Expand All @@ -95,7 +109,15 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:

return all_items # type: ignore

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self, items: list[TResponseInputItem], wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional RunContextWrapper for accessing context data during addition
"""
session_id = await self._get_session_id()
if not items:
return
Expand All @@ -105,7 +127,17 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
items=items,
)

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self, wrapper: RunContextWrapper[Any] | None = None
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during pop

Returns:
The most recent item if it exists, None if the session is empty
"""
session_id = await self._get_session_id()
items = await self.get_items(limit=1)
if not items:
Expand All @@ -116,7 +148,14 @@ async def pop_item(self) -> TResponseInputItem | None:
)
return items[0]

async def clear_session(self) -> None:
async def clear_session(
self, wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during clear
"""
session_id = await self._get_session_id()
await self._openai_client.conversations.delete(
conversation_id=session_id,
Expand Down
67 changes: 60 additions & 7 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from openai import AsyncOpenAI

from ..models._openai_shared import get_default_openai_client
from ..run_context import RunContextWrapper
from .openai_conversations_session import OpenAIConversationsSession
from .session import (
OpenAIResponsesCompactionArgs,
Expand Down Expand Up @@ -236,7 +237,22 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
f"candidates={len(self._compaction_candidate_items)})"
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, wrapper: RunContextWrapper[Any] | None = None
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve.
wrapper: Optional RunContextWrapper for accessing context data during retrieval.

Returns:
List of input items representing the conversation history
"""
# Only pass wrapper if provided, for backward compatibility with sessions
# that haven't updated their signatures yet
if wrapper is not None:
return await self.underlying_session.get_items(limit, wrapper)
return await self.underlying_session.get_items(limit)

async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
Expand Down Expand Up @@ -265,24 +281,61 @@ def _get_deferred_compaction_response_id(self) -> str | None:
def _clear_deferred_compaction(self) -> None:
self._deferred_response_id = None

async def add_items(self, items: list[TResponseInputItem]) -> None:
await self.underlying_session.add_items(items)
async def add_items(
self, items: list[TResponseInputItem], wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional RunContextWrapper for accessing context data during addition
"""
# Only pass wrapper if provided, for backward compatibility
if wrapper is not None:
await self.underlying_session.add_items(items, wrapper)
else:
await self.underlying_session.add_items(items)
if self._compaction_candidate_items is not None:
new_candidates = select_compaction_candidate_items(items)
if new_candidates:
self._compaction_candidate_items.extend(new_candidates)
if self._session_items is not None:
self._session_items.extend(items)

async def pop_item(self) -> TResponseInputItem | None:
popped = await self.underlying_session.pop_item()
async def pop_item(
self, wrapper: RunContextWrapper[Any] | None = None
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during pop

Returns:
The most recent item if it exists, None if the session is empty
"""
# Only pass wrapper if provided, for backward compatibility
if wrapper is not None:
popped = await self.underlying_session.pop_item(wrapper)
else:
popped = await self.underlying_session.pop_item()
if popped:
self._compaction_candidate_items = None
self._session_items = None
return popped

async def clear_session(self) -> None:
await self.underlying_session.clear_session()
async def clear_session(
self, wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during clear
"""
# Only pass wrapper if provided, for backward compatibility
if wrapper is not None:
await self.underlying_session.clear_session(wrapper)
else:
await self.underlying_session.clear_session()
self._compaction_candidate_items = []
self._session_items = []
self._deferred_response_id = None
Expand Down
48 changes: 39 additions & 9 deletions src/agents/memory/session.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Any, Literal, Protocol, runtime_checkable

from typing_extensions import TypedDict, TypeGuard

if TYPE_CHECKING:
from ..items import TResponseInputItem
from ..run_context import RunContextWrapper
from .session_settings import SessionSettings
from .session_settings import SessionSettings


Expand All @@ -21,36 +23,53 @@ class Session(Protocol):
session_id: str
session_settings: SessionSettings | None = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, wrapper: RunContextWrapper[Any] | None = None
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
wrapper: Optional RunContextWrapper for accessing context data during retrieval.

Returns:
List of input items representing the conversation history
"""
...

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self, items: list[TResponseInputItem], wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional RunContextWrapper for accessing context data during addition
"""
...

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self, wrapper: RunContextWrapper[Any] | None = None
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during pop

Returns:
The most recent item if it exists, None if the session is empty
"""
...

async def clear_session(self) -> None:
"""Clear all items for this session."""
async def clear_session(
self, wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during clear
"""
...


Expand All @@ -68,31 +87,42 @@ class SessionABC(ABC):
session_settings: SessionSettings | None = None

@abstractmethod
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, wrapper: RunContextWrapper[Any] | None = None
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
wrapper: Optional RunContextWrapper for accessing context data during retrieval.

Returns:
List of input items representing the conversation history
"""
...

@abstractmethod
async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self, items: list[TResponseInputItem], wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional RunContextWrapper for accessing context data during addition
"""
...

@abstractmethod
async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self, wrapper: RunContextWrapper[Any] | None = None
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during pop

Returns:
The most recent item if it exists, None if the session is empty
"""
Expand Down
29 changes: 24 additions & 5 deletions src/agents/memory/sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import sqlite3
import threading
from pathlib import Path
from typing import Any

from ..items import TResponseInputItem
from ..run_context import RunContextWrapper
from .session import SessionABC
from .session_settings import SessionSettings, resolve_session_limit

Expand Down Expand Up @@ -112,12 +114,15 @@ def _init_db_for_connection(self, conn: sqlite3.Connection) -> None:

conn.commit()

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, wrapper: RunContextWrapper[Any] | None = None
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
wrapper: Optional RunContextWrapper for accessing context data during retrieval.

Returns:
List of input items representing the conversation history
Expand Down Expand Up @@ -168,11 +173,14 @@ def _get_items_sync():

return await asyncio.to_thread(_get_items_sync)

async def add_items(self, items: list[TResponseInputItem]) -> None:
async def add_items(
self, items: list[TResponseInputItem], wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Add new items to the conversation history.

Args:
items: List of input items to add to the history
wrapper: Optional RunContextWrapper for accessing context data during addition
"""
if not items:
return
Expand Down Expand Up @@ -212,9 +220,14 @@ def _add_items_sync():

await asyncio.to_thread(_add_items_sync)

async def pop_item(self) -> TResponseInputItem | None:
async def pop_item(
self, wrapper: RunContextWrapper[Any] | None = None
) -> TResponseInputItem | None:
"""Remove and return the most recent item from the session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during pop

Returns:
The most recent item if it exists, None if the session is empty
"""
Expand Down Expand Up @@ -253,8 +266,14 @@ def _pop_item_sync():

return await asyncio.to_thread(_pop_item_sync)

async def clear_session(self) -> None:
"""Clear all items for this session."""
async def clear_session(
self, wrapper: RunContextWrapper[Any] | None = None
) -> None:
"""Clear all items for this session.

Args:
wrapper: Optional RunContextWrapper for accessing context data during clear
"""

def _clear_session_sync():
conn = self._get_connection()
Expand Down
Loading