Skip to content

feat: add TurnInterceptor for between-turn message injection#3463

Draft
0xSudoSSH wants to merge 1 commit into
openai:mainfrom
0xSudoSSH:feat/turn-interceptor
Draft

feat: add TurnInterceptor for between-turn message injection#3463
0xSudoSSH wants to merge 1 commit into
openai:mainfrom
0xSudoSSH:feat/turn-interceptor

Conversation

@0xSudoSSH
Copy link
Copy Markdown
Contributor

@0xSudoSSH 0xSudoSSH commented May 19, 2026

Summary

Add TurnInterceptor — a mechanism for injecting user messages into an active streaming run between turns. Users call interceptor.inject(item) (sync, thread-safe) from anywhere, and the framework drains the queue at NextStepRunAgain, running input guardrails before injecting accepted items into the model's next turn.

Usage

from agents import Agent, Runner, TurnInterceptor

agent = Agent(name="Researcher", tools=[web_search], input_guardrails=[my_guardrail])

interceptor = TurnInterceptor(
    on_consumed=lambda items: print(f"Injected: {[id for id, _ in items]}"),
    on_rejected=lambda items: print(f"Rejected: {[id for id, _ in items]}"),
)

# Start a streaming run
result = Runner.run_streamed(agent, "Research quantum computing", turn_interceptor=interceptor)

# From anywhere — sync, thread-safe, no event loop needed:
injection_id = interceptor.inject("also check recent news")

# Stream as usual — injected messages appear to the model on the next turn
async for event in result.stream_events():
    ...

Types

TurnAction tells the framework what to do after draining. Extensible to new action types in the future, giving developers more granular control.

class TurnActionType(str, Enum):
    PROCEED = "proceed"
    INJECT_INPUT = "inject_input"

@dataclass(frozen=True)
class TurnAction:
    action: TurnActionType
    data: list[TResponseInputItem] | None = None

    @classmethod
    def proceed(cls) -> TurnAction: ...

    @classmethod
    def inject_input(cls, data: list[TResponseInputItem]) -> TurnAction: ...

InjectionRecord = tuple[str, TResponseInputItem]  # (injection_id, item)

TurnInterceptor class

class TurnInterceptor:
    """Manages mid-run message injection with version-based staleness detection."""

    def __init__(
        self,
        on_consumed: Callable[[list[InjectionRecord]], MaybeAwaitable[None]] | None = None,
        on_rejected: Callable[[list[InjectionRecord]], MaybeAwaitable[None]] | None = None,
    ):
        self._queue: queue.Queue[tuple[int, str, TResponseInputItem]]  # (version, injection_id, item)
        self._current_agent: Agent | None      # Set by framework via reset()/update_agent()
        self._context: Any                     # Set once at run start via reset()
        self._version: int                     # Incremented on agent change or new run

    # Framework lifecycle — called by run_loop.py to keep interceptor in sync
    async def reset(self, agent: Agent, context: Any) -> None:
        # Reject any leftover items from previous run (fires on_rejected)
        # Set _context, _current_agent
        # Increment _version (makes old queued items stale)

    def update_agent(self, agent: Agent) -> None:
        # If agent changed: update _current_agent, increment _version
        # If same agent: no-op

    # User-facing API — the only method SDK users call directly
    def inject(self, item: str | TResponseInputItem) -> str:
        # Raise UserError if _current_agent is None (run not started)
        # Wrap string as {"role": "user", "content": item}
        # Generate injection_id (uuid4)
        # Enqueue (current_version, injection_id, item)
        # Return injection_id

    # Framework drain interface — called at NextStepRunAgain to flush the queue
    async def __call__(self) -> TurnAction:
        # Drain queue → split into (valid, stale) by version
        # Fire on_rejected for stale items
        # Run input guardrails on valid items → split into (accepted, failed)
        # Fire on_rejected for guardrail-failed items
        # Fire on_consumed for accepted items
        # Return TurnAction.inject_input(accepted) or TurnAction.proceed()

Key design decisions

  • Streaming-only — raises UserError if passed to Runner.run()
  • inject() is sync — returns immediately with an injection_id. No async context needed.
  • Guardrails at drain time — run inside the run loop against the current agent
  • Version-based staleness — no locks; items tagged with version at enqueue time, rejected if version mismatches at drain
  • Callback safetyon_consumed/on_rejected wrapped in try/except, never crash the agent loop

Test plan

  • 36 automated tests in tests/test_turn_interceptor.py covering:
    • Unit: inject basics, drain splitting, version management, reset lifecycle
    • Unit: __call__() drain behavior with guardrails (pass/trip/mixed), callback exceptions, async callbacks
    • Integration: inject at NextStepRunAgain, reject at FinalOutput, handoff rejection, cancellation, multiple injects batched
  • Interactive example at examples/basic/turn_interceptor.py exercises all 4 NextStep paths end-to-end with assertions
  • make format, make lint, make typecheck all pass (zero errors)
  • Full test suite passes (no regressions; pre-existing tracing failures unrelated)

Issue number

Closes #2671

Checks

  • I've added new tests (if relevant)
  • I've added/updated the relevant documentation
  • I've run make lint and make format
  • I've made sure tests pass

Implementation Details

How It Works (Lifecycle)

1. Run starts → reset() (fresh runs only)

Called before the while loop on fresh runs (not on resume from interruption). Rejects leftover items, sets context/agent, bumps version. Skipped on resume so messages injected during interruption survive.

if turn_interceptor is not None and not is_resumed_state:
    await turn_interceptor.reset(starting_agent, context_wrapper.context)

2. Each loop iteration → update_agent()

Called at the top of every iteration. Bumps version on agent change (handoff).

if turn_interceptor is not None:
    turn_interceptor.update_agent(current_agent)

3. At NextStepRunAgain__call__() drains the queue

The only injection point. Cancellation is checked first — if _cancel_mode == "after_turn", skip drain.

elif isinstance(turn_result.next_step, NextStepRunAgain):
    # Drain interceptor if not cancelled.
    if turn_interceptor is not None and streamed_result._cancel_mode != "after_turn":
        action = await turn_interceptor()
        if action.action == TurnActionType.INJECT_INPUT:
            for item in action.data:
                injected = InjectedInputItem(raw_item=item, agent=current_agent)
                streamed_result._model_input_items.append(injected)
                streamed_result.new_items.append(injected)
                turn_session_items.append(injected)
                if pending_server_items is not None:
                    pending_server_items.append(injected)

    await _save_stream_items_with_count(turn_session_items, ...)

    if streamed_result._cancel_mode == "after_turn":
        break

4. At other NextSteps:

NextStep Queue behavior Version Callback Rationale
FinalOutput All items rejected Unchanged on_rejected fires No more model calls. Caller can start a new run with rejected items.
Handoff Items become stale Bumped by update_agent() on_rejected at next drain Messages were for Agent A; Agent B has different context/guardrails. Caller can re-inject.
Interruption Items stay in queue Unchanged None (yet) Run will resume. reset() skipped on resume. Items consumed at next drain after resume.
Cancellation Items rejected Unchanged on_rejected at run end Cancel checked before drain. Items never consumed.

5. Run ends → _reject_all_pending()

Called after the while loop exits (unless interrupted). Drains everything remaining and fires on_rejected.

if turn_interceptor is not None and not streamed_result.interruptions:
    await turn_interceptor._reject_all_pending()

Design Choices

Version-based staleness (no locks) — Each queued item tagged with _version at enqueue time. Version increments on reset() (new run) and update_agent() (handoff). Stale items rejected at drain. Handles: agent changes during guardrail execution, items queued between runs, items for previous agent after handoff.

Guardrails at drain timeinject() stays sync/thread-safe. Stale items discarded before guardrails run (no wasted validation). Guardrails run against whoever will actually process the message. Failed items go to on_rejected.

Streaming-onlyUserError if passed to Runner.run(). Non-streaming blocks until completion; user can't interact mid-run.

Turn counter not resetmax_turns is a hard safety limit. Agent uses remaining budget.

Cancellation — Interceptor drain skipped when _cancel_mode == "after_turn". Items stay in queue, rejected at run end via _reject_all_pending().

Callback safetyon_consumed/on_rejected wrapped in try/except with logger.warning. A broken callback never crashes the agent loop. Different from other SDK hooks (which propagate) because these are observability notifications, not control-flow gates.

@seratch
Copy link
Copy Markdown
Member

seratch commented May 19, 2026

Thanks for sharing this interesting idea. While we're aware that intercepting during a turn is a common need, we may not have this in the short term.

@seratch seratch marked this pull request as draft May 19, 2026 23:43
Add TurnInterceptor — a mechanism for injecting user messages into an
active streaming run between turns. Users call inject() (sync, thread-safe)
from anywhere, and the framework drains the queue at NextStepRunAgain,
running input guardrails before injecting accepted items into the model's
next turn.

Closes openai#2671
@0xSudoSSH 0xSudoSSH force-pushed the feat/turn-interceptor branch from a937d5b to 4869114 Compare May 20, 2026 00:05
@0xSudoSSH
Copy link
Copy Markdown
Contributor Author

Thanks for sharing this interesting idea. While we're aware that intercepting during a turn is a common need, we may not have this in the short term.

@seratch totally understand. If you need this feature in the future, just let me know and I'm willing to make changes accordingly based on any feedbacks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: better support for agent state changes between turns

2 participants