Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d6b8993
feat: #636 Add human-in-the-loop (HITL) support to the SDK
seratch Dec 23, 2025
76b312e
split _run_impl.py into run_internal/
seratch Dec 25, 2025
ee0491f
Simplify the run state data
seratch Jan 7, 2026
514f705
Fix reported issues
seratch Jan 9, 2026
30b6f08
Fix HITL resume for computer actions and avoid duplicate rejections
seratch Jan 10, 2026
8d96622
Fix automatic Responses compaction trigger during session persistence
seratch Jan 10, 2026
2b6d07b
Preserve guardrail history when resuming from RunState
seratch Jan 10, 2026
c08dcaf
fix python 3.9 errors
seratch Jan 10, 2026
1b269f0
Preserve conversation tracking when resuming runs
seratch Jan 10, 2026
549efbb
Include CompactionItem in RunItem union for type safety
seratch Jan 10, 2026
04ebd0c
fix: skip re-executing function tools on HITL resume when outputs exist
seratch Jan 11, 2026
c556a03
fix: deserialize compaction_item in RunState
seratch Jan 11, 2026
f70ef25
fix: scope tool rejection to call ids
seratch Jan 11, 2026
95b8b9c
fix: keep approved tool outputs on HITL resume with pending approvals
seratch Jan 11, 2026
26fa8a3
feat: add RunState context serialization hooks and metadata
seratch Jan 11, 2026
ccd7798
fix: rebuild HITL function runs from object approvals
seratch Jan 11, 2026
e011f81
fix: normalize tool-call dedupe keys for unhashable arguments
seratch Jan 11, 2026
54d1af0
fix: harden HITL resume dedupe and nested tool approvals
seratch Jan 11, 2026
332f8f0
fix: make RunState serialization tolerant of non-JSON outputs
seratch Jan 11, 2026
add52d2
feat: add HITL session scenario example and tests
seratch Jan 11, 2026
e99d343
fix: surface needs_approval errors on HITL resume
seratch Jan 11, 2026
16ba1e1
fix: dedupe tool calls by call_id or id and centralize MCP approval p…
seratch Jan 11, 2026
5b523a7
Centralize tool call deduplication logic
seratch Jan 11, 2026
e276c28
fix: honor filtered inputs in conversation tracking and preserve dupl…
seratch Jan 12, 2026
dbc86a3
refactor; add comments
seratch Jan 12, 2026
d611b0e
fix: ignore fake response ids in dedupe and strip provider_data for O…
seratch Jan 12, 2026
103df85
Fix session persistence counter handling and add regression test
seratch Jan 12, 2026
3213000
fix: ignore fake response ids in conversation tracking
seratch Jan 12, 2026
2328dc3
fix: align OpenAI conversation persistence counts with sanitized items
seratch Jan 12, 2026
d31747a
Refine run loop cleanup and streaming retry removal
seratch Jan 13, 2026
f03c9a7
fix: align streaming input tracking, session persistence count, and t…
seratch Jan 13, 2026
8d26b9b
fix: honor max_output_length in run_internal shell tools
seratch Jan 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,5 @@ cython_debug/

# Redis database files
dump.rdb

tmp/
37 changes: 33 additions & 4 deletions examples/agent_patterns/agents_as_tools_conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from pydantic import BaseModel

from agents import Agent, AgentBase, RunContextWrapper, Runner, trace
from agents import Agent, AgentBase, ModelSettings, RunContextWrapper, Runner, trace
from agents.tool import function_tool
from examples.auto_mode import input_with_fallback

"""
Expand All @@ -26,10 +27,18 @@ def european_enabled(ctx: RunContextWrapper[AppContext], agent: AgentBase) -> bo
return ctx.context.language_preference == "european"


@function_tool(needs_approval=True)
async def get_user_name() -> str:
print("Getting the user's name...")
return "Kaz"


# Create specialized agents
spanish_agent = Agent(
name="spanish_agent",
instructions="You respond in Spanish. Always reply to the user's question in Spanish.",
instructions="You respond in Spanish. Always reply to the user's question in Spanish. You must call all the tools to best answer the user's question.",
model_settings=ModelSettings(tool_choice="required"),
tools=[get_user_name],
)

french_agent = Agent(
Expand All @@ -55,6 +64,7 @@ def european_enabled(ctx: RunContextWrapper[AppContext], agent: AgentBase) -> bo
tool_name="respond_spanish",
tool_description="Respond to the user's question in Spanish",
is_enabled=True, # Always enabled
needs_approval=True, # HITL
),
french_agent.as_tool(
tool_name="respond_french",
Expand Down Expand Up @@ -109,8 +119,27 @@ async def main():
input=user_request,
context=context.context,
)

print(f"\nResponse:\n{result.final_output}")
while result.interruptions:

async def confirm(question: str) -> bool:
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
normalized = answer.strip().lower()
return normalized in ("y", "yes")

state = result.to_state()
for interruption in result.interruptions:
prompt = f"\nDo you approve this tool call: {interruption.name} with arguments {interruption.arguments}?"
confirmed = await confirm(prompt)
if confirmed:
state.approve(interruption)
print(f"✓ Approved: {interruption.name}")
else:
state.reject(interruption)
print(f"✗ Rejected: {interruption.name}")
result = await Runner.run(orchestrator, state)

print(f"\nResponse:\n{result.final_output}")


if __name__ == "__main__":
Expand Down
141 changes: 141 additions & 0 deletions examples/agent_patterns/human_in_the_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Human-in-the-loop example with tool approval.

This example demonstrates how to:
1. Define tools that require approval before execution
2. Handle interruptions when tool approval is needed
3. Serialize/deserialize run state to continue execution later
4. Approve or reject tool calls based on user input
"""

import asyncio
import json
from pathlib import Path

from agents import Agent, Runner, RunState, function_tool


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny"


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


# Main agent with tool that requires approval
agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_weather, get_temperature],
)

RESULT_PATH = Path(".cache/agent_patterns/human_in_the_loop/result.json")


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
# Note: In a real application, you would use proper async input
# For now, using synchronous input with run_in_executor
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
normalized = answer.strip().lower()
return normalized in ("y", "yes")


async def main():
"""Run the human-in-the-loop example."""
result = await Runner.run(
agent,
"What is the weather and temperature in Oakland?",
)

has_interruptions = len(result.interruptions) > 0

while has_interruptions:
print("\n" + "=" * 80)
print("Run interrupted - tool approval required")
print("=" * 80)

# Storing state to file (demonstrating serialization)
state = result.to_state()
state_json = state.to_json()
RESULT_PATH.parent.mkdir(parents=True, exist_ok=True)
with RESULT_PATH.open("w") as f:
json.dump(state_json, f, indent=2)

print(f"State saved to {RESULT_PATH}")

# From here on you could run things on a different thread/process

# Reading state from file (demonstrating deserialization)
print(f"Loading state from {RESULT_PATH}")
with RESULT_PATH.open() as f:
stored_state_json = json.load(f)

state = await RunState.from_json(agent, stored_state_json)

# Process each interruption
for interruption in result.interruptions:
print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.name}")
print(f" Arguments: {interruption.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.name}")
state.reject(interruption)

# Resume execution with the updated state
print("\nResuming agent execution...")
result = await Runner.run(agent, state)
has_interruptions = len(result.interruptions) > 0

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)


if __name__ == "__main__":
asyncio.run(main())
120 changes: 120 additions & 0 deletions examples/agent_patterns/human_in_the_loop_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Human-in-the-loop example with streaming.

This example demonstrates the human-in-the-loop (HITL) pattern with streaming.
The agent will pause execution when a tool requiring approval is called,
allowing you to approve or reject the tool call before continuing.

The streaming version provides real-time feedback as the agent processes
the request, then pauses for approval when needed.
"""

import asyncio

from agents import Agent, Runner, function_tool


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny."


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
return answer.strip().lower() in ["y", "yes"]


async def main():
"""Run the human-in-the-loop example."""
main_agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_temperature, get_weather],
)

# Run the agent with streaming
result = Runner.run_streamed(
main_agent,
"What is the weather and temperature in Oakland?",
)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

# Handle interruptions
while len(result.interruptions) > 0:
print("\n" + "=" * 80)
print("Human-in-the-loop: approval required for the following tool calls:")
print("=" * 80)

state = result.to_state()

for interruption in result.interruptions:
print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.name}")
print(f" Arguments: {interruption.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.name}")
state.reject(interruption)

# Resume execution with streaming
print("\nResuming agent execution...")
result = Runner.run_streamed(main_agent, state)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)
print("\nDone!")


if __name__ == "__main__":
asyncio.run(main())
Loading