Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ class RunOptions(TypedDict, Generic[TContext]):
session: NotRequired[Session | None]
"""The session for the run."""

session_limit: NotRequired[int | None]
"""The maximum number of items to generate."""


class Runner:
@classmethod
Expand All @@ -314,6 +317,7 @@ async def run(
auto_previous_response_id: bool = False,
conversation_id: str | None = None,
session: Session | None = None,
session_limit: int | None = None,
) -> RunResult:
"""
Run a workflow starting at the given agent.
Expand Down Expand Up @@ -356,6 +360,10 @@ async def run(
other model providers don't write to the Conversation object,
so you'll end up having partial conversations stored.
session: A session for automatic conversation history management.
session_limit: Maximum number of conversation history items to retrieve
from the session. Only applicable when using a session. If None,
retrieves the entire history. Useful for managing context window size
and controlling token costs.

Returns:
A run result containing all the inputs, guardrail results and the output of
Expand All @@ -375,6 +383,7 @@ async def run(
auto_previous_response_id=auto_previous_response_id,
conversation_id=conversation_id,
session=session,
session_limit=session_limit,
)

@classmethod
Expand All @@ -391,6 +400,7 @@ def run_sync(
auto_previous_response_id: bool = False,
conversation_id: str | None = None,
session: Session | None = None,
session_limit: int | None = None,
) -> RunResult:
"""
Run a workflow synchronously, starting at the given agent.
Expand Down Expand Up @@ -431,6 +441,10 @@ def run_sync(
from the previous turn.
conversation_id: The ID of the stored conversation, if any.
session: A session for automatic conversation history management.
session_limit: Maximum number of conversation history items to retrieve
from the session. Only applicable when using a session. If None,
retrieves the entire history. Useful for managing context window size
and controlling token costs.

Returns:
A run result containing all the inputs, guardrail results and the output of
Expand All @@ -450,6 +464,7 @@ def run_sync(
conversation_id=conversation_id,
session=session,
auto_previous_response_id=auto_previous_response_id,
session_limit=session_limit,
)

@classmethod
Expand All @@ -465,6 +480,7 @@ def run_streamed(
auto_previous_response_id: bool = False,
conversation_id: str | None = None,
session: Session | None = None,
session_limit: int | None = None,
) -> RunResultStreaming:
"""
Run a workflow starting at the given agent in streaming mode.
Expand Down Expand Up @@ -503,6 +519,10 @@ def run_streamed(
from the previous turn.
conversation_id: The ID of the stored conversation, if any.
session: A session for automatic conversation history management.
session_limit: Maximum number of conversation history items to retrieve
from the session. Only applicable when using a session. If None,
retrieves the entire history. Useful for managing context window size
and controlling token costs.

Returns:
A result object that contains data about the run, as well as a method to
Expand All @@ -521,6 +541,7 @@ def run_streamed(
auto_previous_response_id=auto_previous_response_id,
conversation_id=conversation_id,
session=session,
session_limit=session_limit,
)


Expand All @@ -544,6 +565,7 @@ async def run(
auto_previous_response_id = kwargs.get("auto_previous_response_id", False)
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")
session_limit = kwargs.get("session_limit")

if run_config is None:
run_config = RunConfig()
Expand All @@ -565,7 +587,10 @@ async def run(
# Keep original user input separate from session-prepared input
original_user_input = input
prepared_input = await self._prepare_input_with_session(
input, session, run_config.session_input_callback
input,
session,
run_config.session_input_callback,
session_limit=session_limit,
)

tool_use_tracker = AgentToolUseTracker()
Expand Down Expand Up @@ -799,6 +824,7 @@ def run_sync(
auto_previous_response_id = kwargs.get("auto_previous_response_id", False)
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")
session_limit = kwargs.get("session_limit")

# Python 3.14 stopped implicitly wiring up a default event loop
# when synchronous code touches asyncio APIs for the first time.
Expand Down Expand Up @@ -845,6 +871,7 @@ def run_sync(
previous_response_id=previous_response_id,
auto_previous_response_id=auto_previous_response_id,
conversation_id=conversation_id,
session_limit=session_limit,
)
)

Expand Down Expand Up @@ -880,6 +907,7 @@ def run_streamed(
auto_previous_response_id = kwargs.get("auto_previous_response_id", False)
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")
session_limit = kwargs.get("session_limit")

if run_config is None:
run_config = RunConfig()
Expand Down Expand Up @@ -936,6 +964,7 @@ def run_streamed(
auto_previous_response_id=auto_previous_response_id,
conversation_id=conversation_id,
session=session,
session_limit=session_limit,
)
)
return streamed_result
Expand Down Expand Up @@ -1065,6 +1094,7 @@ async def _start_streaming(
auto_previous_response_id: bool,
conversation_id: str | None,
session: Session | None,
session_limit: int | None = None,
):
if streamed_result.trace:
streamed_result.trace.start(mark_as_current=True)
Expand Down Expand Up @@ -1094,7 +1124,10 @@ async def _start_streaming(
try:
# Prepare input with session if enabled
prepared_input = await AgentRunner._prepare_input_with_session(
starting_input, session, run_config.session_input_callback
starting_input,
session,
run_config.session_input_callback,
session_limit,
)

# Update the streamed result with the prepared input
Expand Down Expand Up @@ -1942,6 +1975,7 @@ async def _prepare_input_with_session(
input: str | list[TResponseInputItem],
session: Session | None,
session_input_callback: SessionInputCallback | None,
session_limit: int | None = None,
) -> str | list[TResponseInputItem]:
"""Prepare input by combining it with session history if enabled."""
if session is None:
Expand All @@ -1958,7 +1992,9 @@ async def _prepare_input_with_session(
)

# Get previous conversation history
history = await session.get_items()
history = await session.get_items(
limit=session_limit,
)

# Convert input to list format
new_input_list = ItemHelpers.input_to_new_input_list(input)
Expand Down
175 changes: 175 additions & 0 deletions tests/test_session_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Test session_limit parameter functionality."""

import tempfile
from pathlib import Path

import pytest

from agents import Agent, SQLiteSession
from tests.fake_model import FakeModel
from tests.test_responses import get_text_message
from tests.test_session import run_agent_async


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_limit_parameter(runner_method):
"""Test that session_limit parameter correctly limits conversation history
retrieved from session across all Runner methods."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_limit.db"
session_id = "limit_test"
session = SQLiteSession(session_id, db_path)

model = FakeModel()
agent = Agent(name="test", model=model)

# Build up a longer conversation history
model.set_next_output([get_text_message("Reply 1")])
await run_agent_async(runner_method, agent, "Message 1", session=session)

model.set_next_output([get_text_message("Reply 2")])
await run_agent_async(runner_method, agent, "Message 2", session=session)

model.set_next_output([get_text_message("Reply 3")])
await run_agent_async(runner_method, agent, "Message 3", session=session)

# Verify we have 6 items in total (3 user + 3 assistant)
all_items = await session.get_items()
assert len(all_items) == 6

# Now test session_limit parameter - should only get last 2 history items + new input
model.set_next_output([get_text_message("Reply 4")])
await run_agent_async(
runner_method,
agent,
"Message 4",
session=session,
session_limit=2, # Only get last 2 history items
)

# Verify model received limited history
last_input = model.last_turn_args["input"]
# Should have: 2 history items + 1 new message = 3 total
assert len(last_input) == 3
# First item should be "Message 3" (not Message 1 or 2)
assert last_input[0].get("content") == "Message 3"
# Assistant message has content as a list
assert last_input[1].get("content")[0]["text"] == "Reply 3"
assert last_input[2].get("content") == "Message 4"

session.close()


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_limit_zero(runner_method):
"""Test that session_limit=0 provides no history, only new message."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_limit_zero.db"
session_id = "limit_zero_test"
session = SQLiteSession(session_id, db_path)

model = FakeModel()
agent = Agent(name="test", model=model)

# Build conversation history
model.set_next_output([get_text_message("Reply 1")])
await run_agent_async(runner_method, agent, "Message 1", session=session)

model.set_next_output([get_text_message("Reply 2")])
await run_agent_async(runner_method, agent, "Message 2", session=session)

# Test with limit=0 - should get NO history, just new message
model.set_next_output([get_text_message("Reply 3")])
await run_agent_async(
runner_method,
agent,
"Message 3",
session=session,
session_limit=0,
)

# Verify model received only the new message
last_input = model.last_turn_args["input"]
assert len(last_input) == 1
assert last_input[0].get("content") == "Message 3"

session.close()


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_limit_none_gets_all_history(runner_method):
"""Test that session_limit=None retrieves entire history (default behavior)."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_limit_none.db"
session_id = "limit_none_test"
session = SQLiteSession(session_id, db_path)

model = FakeModel()
agent = Agent(name="test", model=model)

# Build longer conversation
for i in range(1, 6):
model.set_next_output([get_text_message(f"Reply {i}")])
await run_agent_async(runner_method, agent, f"Message {i}", session=session)

# Verify 10 items in session (5 user + 5 assistant)
all_items = await session.get_items()
assert len(all_items) == 10

# Test with session_limit=None (default) - should get all history
model.set_next_output([get_text_message("Reply 6")])
await run_agent_async(
runner_method,
agent,
"Message 6",
session=session,
session_limit=None, # Explicit None = get all
)

# Verify model received all history + new message
last_input = model.last_turn_args["input"]
assert len(last_input) == 11 # 10 history + 1 new
assert last_input[0].get("content") == "Message 1"
assert last_input[-1].get("content") == "Message 6"

session.close()


@pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"])
@pytest.mark.asyncio
async def test_session_limit_larger_than_history(runner_method):
"""Test that session_limit larger than history size returns all items."""
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test_limit_large.db"
session_id = "limit_large_test"
session = SQLiteSession(session_id, db_path)

model = FakeModel()
agent = Agent(name="test", model=model)

# Build small conversation
model.set_next_output([get_text_message("Reply 1")])
await run_agent_async(runner_method, agent, "Message 1", session=session)

# Test with limit=100 (much larger than actual history)
model.set_next_output([get_text_message("Reply 2")])
await run_agent_async(
runner_method,
agent,
"Message 2",
session=session,
session_limit=100,
)

# Verify model received all available history + new message
last_input = model.last_turn_args["input"]
assert len(last_input) == 3 # 2 history + 1 new
assert last_input[0].get("content") == "Message 1"
# Assistant message has content as a list
assert last_input[1].get("content")[0]["text"] == "Reply 1"
assert last_input[2].get("content") == "Message 2"

session.close()