diff --git a/src/agents/run.py b/src/agents/run.py index e772b254e..519ddde9b 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -298,6 +298,9 @@ class RunOptions(TypedDict, Generic[TContext]): session: NotRequired[Session | None] """The session for the run.""" + session_limit: NotRequired[int | None] + """The maximum number of items to generate.""" + class Runner: @classmethod @@ -314,6 +317,7 @@ async def run( auto_previous_response_id: bool = False, conversation_id: str | None = None, session: Session | None = None, + session_limit: int | None = None, ) -> RunResult: """ Run a workflow starting at the given agent. @@ -356,6 +360,10 @@ async def run( other model providers don't write to the Conversation object, so you'll end up having partial conversations stored. session: A session for automatic conversation history management. + session_limit: Maximum number of conversation history items to retrieve + from the session. Only applicable when using a session. If None, + retrieves the entire history. Useful for managing context window size + and controlling token costs. Returns: A run result containing all the inputs, guardrail results and the output of @@ -375,6 +383,7 @@ async def run( auto_previous_response_id=auto_previous_response_id, conversation_id=conversation_id, session=session, + session_limit=session_limit, ) @classmethod @@ -391,6 +400,7 @@ def run_sync( auto_previous_response_id: bool = False, conversation_id: str | None = None, session: Session | None = None, + session_limit: int | None = None, ) -> RunResult: """ Run a workflow synchronously, starting at the given agent. @@ -431,6 +441,10 @@ def run_sync( from the previous turn. conversation_id: The ID of the stored conversation, if any. session: A session for automatic conversation history management. + session_limit: Maximum number of conversation history items to retrieve + from the session. Only applicable when using a session. If None, + retrieves the entire history. Useful for managing context window size + and controlling token costs. Returns: A run result containing all the inputs, guardrail results and the output of @@ -450,6 +464,7 @@ def run_sync( conversation_id=conversation_id, session=session, auto_previous_response_id=auto_previous_response_id, + session_limit=session_limit, ) @classmethod @@ -465,6 +480,7 @@ def run_streamed( auto_previous_response_id: bool = False, conversation_id: str | None = None, session: Session | None = None, + session_limit: int | None = None, ) -> RunResultStreaming: """ Run a workflow starting at the given agent in streaming mode. @@ -503,6 +519,10 @@ def run_streamed( from the previous turn. conversation_id: The ID of the stored conversation, if any. session: A session for automatic conversation history management. + session_limit: Maximum number of conversation history items to retrieve + from the session. Only applicable when using a session. If None, + retrieves the entire history. Useful for managing context window size + and controlling token costs. Returns: A result object that contains data about the run, as well as a method to @@ -521,6 +541,7 @@ def run_streamed( auto_previous_response_id=auto_previous_response_id, conversation_id=conversation_id, session=session, + session_limit=session_limit, ) @@ -544,6 +565,7 @@ async def run( auto_previous_response_id = kwargs.get("auto_previous_response_id", False) conversation_id = kwargs.get("conversation_id") session = kwargs.get("session") + session_limit = kwargs.get("session_limit") if run_config is None: run_config = RunConfig() @@ -565,7 +587,10 @@ async def run( # Keep original user input separate from session-prepared input original_user_input = input prepared_input = await self._prepare_input_with_session( - input, session, run_config.session_input_callback + input, + session, + run_config.session_input_callback, + session_limit=session_limit, ) tool_use_tracker = AgentToolUseTracker() @@ -799,6 +824,7 @@ def run_sync( auto_previous_response_id = kwargs.get("auto_previous_response_id", False) conversation_id = kwargs.get("conversation_id") session = kwargs.get("session") + session_limit = kwargs.get("session_limit") # Python 3.14 stopped implicitly wiring up a default event loop # when synchronous code touches asyncio APIs for the first time. @@ -845,6 +871,7 @@ def run_sync( previous_response_id=previous_response_id, auto_previous_response_id=auto_previous_response_id, conversation_id=conversation_id, + session_limit=session_limit, ) ) @@ -880,6 +907,7 @@ def run_streamed( auto_previous_response_id = kwargs.get("auto_previous_response_id", False) conversation_id = kwargs.get("conversation_id") session = kwargs.get("session") + session_limit = kwargs.get("session_limit") if run_config is None: run_config = RunConfig() @@ -936,6 +964,7 @@ def run_streamed( auto_previous_response_id=auto_previous_response_id, conversation_id=conversation_id, session=session, + session_limit=session_limit, ) ) return streamed_result @@ -1065,6 +1094,7 @@ async def _start_streaming( auto_previous_response_id: bool, conversation_id: str | None, session: Session | None, + session_limit: int | None = None, ): if streamed_result.trace: streamed_result.trace.start(mark_as_current=True) @@ -1094,7 +1124,10 @@ async def _start_streaming( try: # Prepare input with session if enabled prepared_input = await AgentRunner._prepare_input_with_session( - starting_input, session, run_config.session_input_callback + starting_input, + session, + run_config.session_input_callback, + session_limit, ) # Update the streamed result with the prepared input @@ -1942,6 +1975,7 @@ async def _prepare_input_with_session( input: str | list[TResponseInputItem], session: Session | None, session_input_callback: SessionInputCallback | None, + session_limit: int | None = None, ) -> str | list[TResponseInputItem]: """Prepare input by combining it with session history if enabled.""" if session is None: @@ -1958,7 +1992,9 @@ async def _prepare_input_with_session( ) # Get previous conversation history - history = await session.get_items() + history = await session.get_items( + limit=session_limit, + ) # Convert input to list format new_input_list = ItemHelpers.input_to_new_input_list(input) diff --git a/tests/test_session_limit.py b/tests/test_session_limit.py new file mode 100644 index 000000000..3218f52cf --- /dev/null +++ b/tests/test_session_limit.py @@ -0,0 +1,175 @@ +"""Test session_limit parameter functionality.""" + +import tempfile +from pathlib import Path + +import pytest + +from agents import Agent, SQLiteSession +from tests.fake_model import FakeModel +from tests.test_responses import get_text_message +from tests.test_session import run_agent_async + + +@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"]) +@pytest.mark.asyncio +async def test_session_limit_parameter(runner_method): + """Test that session_limit parameter correctly limits conversation history + retrieved from session across all Runner methods.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_limit.db" + session_id = "limit_test" + session = SQLiteSession(session_id, db_path) + + model = FakeModel() + agent = Agent(name="test", model=model) + + # Build up a longer conversation history + model.set_next_output([get_text_message("Reply 1")]) + await run_agent_async(runner_method, agent, "Message 1", session=session) + + model.set_next_output([get_text_message("Reply 2")]) + await run_agent_async(runner_method, agent, "Message 2", session=session) + + model.set_next_output([get_text_message("Reply 3")]) + await run_agent_async(runner_method, agent, "Message 3", session=session) + + # Verify we have 6 items in total (3 user + 3 assistant) + all_items = await session.get_items() + assert len(all_items) == 6 + + # Now test session_limit parameter - should only get last 2 history items + new input + model.set_next_output([get_text_message("Reply 4")]) + await run_agent_async( + runner_method, + agent, + "Message 4", + session=session, + session_limit=2, # Only get last 2 history items + ) + + # Verify model received limited history + last_input = model.last_turn_args["input"] + # Should have: 2 history items + 1 new message = 3 total + assert len(last_input) == 3 + # First item should be "Message 3" (not Message 1 or 2) + assert last_input[0].get("content") == "Message 3" + # Assistant message has content as a list + assert last_input[1].get("content")[0]["text"] == "Reply 3" + assert last_input[2].get("content") == "Message 4" + + session.close() + + +@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"]) +@pytest.mark.asyncio +async def test_session_limit_zero(runner_method): + """Test that session_limit=0 provides no history, only new message.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_limit_zero.db" + session_id = "limit_zero_test" + session = SQLiteSession(session_id, db_path) + + model = FakeModel() + agent = Agent(name="test", model=model) + + # Build conversation history + model.set_next_output([get_text_message("Reply 1")]) + await run_agent_async(runner_method, agent, "Message 1", session=session) + + model.set_next_output([get_text_message("Reply 2")]) + await run_agent_async(runner_method, agent, "Message 2", session=session) + + # Test with limit=0 - should get NO history, just new message + model.set_next_output([get_text_message("Reply 3")]) + await run_agent_async( + runner_method, + agent, + "Message 3", + session=session, + session_limit=0, + ) + + # Verify model received only the new message + last_input = model.last_turn_args["input"] + assert len(last_input) == 1 + assert last_input[0].get("content") == "Message 3" + + session.close() + + +@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"]) +@pytest.mark.asyncio +async def test_session_limit_none_gets_all_history(runner_method): + """Test that session_limit=None retrieves entire history (default behavior).""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_limit_none.db" + session_id = "limit_none_test" + session = SQLiteSession(session_id, db_path) + + model = FakeModel() + agent = Agent(name="test", model=model) + + # Build longer conversation + for i in range(1, 6): + model.set_next_output([get_text_message(f"Reply {i}")]) + await run_agent_async(runner_method, agent, f"Message {i}", session=session) + + # Verify 10 items in session (5 user + 5 assistant) + all_items = await session.get_items() + assert len(all_items) == 10 + + # Test with session_limit=None (default) - should get all history + model.set_next_output([get_text_message("Reply 6")]) + await run_agent_async( + runner_method, + agent, + "Message 6", + session=session, + session_limit=None, # Explicit None = get all + ) + + # Verify model received all history + new message + last_input = model.last_turn_args["input"] + assert len(last_input) == 11 # 10 history + 1 new + assert last_input[0].get("content") == "Message 1" + assert last_input[-1].get("content") == "Message 6" + + session.close() + + +@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"]) +@pytest.mark.asyncio +async def test_session_limit_larger_than_history(runner_method): + """Test that session_limit larger than history size returns all items.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_limit_large.db" + session_id = "limit_large_test" + session = SQLiteSession(session_id, db_path) + + model = FakeModel() + agent = Agent(name="test", model=model) + + # Build small conversation + model.set_next_output([get_text_message("Reply 1")]) + await run_agent_async(runner_method, agent, "Message 1", session=session) + + # Test with limit=100 (much larger than actual history) + model.set_next_output([get_text_message("Reply 2")]) + await run_agent_async( + runner_method, + agent, + "Message 2", + session=session, + session_limit=100, + ) + + # Verify model received all available history + new message + last_input = model.last_turn_args["input"] + assert len(last_input) == 3 # 2 history + 1 new + assert last_input[0].get("content") == "Message 1" + # Assistant message has content as a list + assert last_input[1].get("content")[0]["text"] == "Reply 1" + assert last_input[2].get("content") == "Message 2" + + session.close()