feat: add TurnInterceptor for between-turn message injection#3463
Draft
0xSudoSSH wants to merge 1 commit into
Draft
feat: add TurnInterceptor for between-turn message injection#34630xSudoSSH wants to merge 1 commit into
0xSudoSSH wants to merge 1 commit into
Conversation
Member
|
Thanks for sharing this interesting idea. While we're aware that intercepting during a turn is a common need, we may not have this in the short term. |
Add TurnInterceptor — a mechanism for injecting user messages into an active streaming run between turns. Users call inject() (sync, thread-safe) from anywhere, and the framework drains the queue at NextStepRunAgain, running input guardrails before injecting accepted items into the model's next turn. Closes openai#2671
a937d5b to
4869114
Compare
Contributor
Author
@seratch totally understand. If you need this feature in the future, just let me know and I'm willing to make changes accordingly based on any feedbacks. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add
TurnInterceptor— a mechanism for injecting user messages into an active streaming run between turns. Users callinterceptor.inject(item)(sync, thread-safe) from anywhere, and the framework drains the queue atNextStepRunAgain, running input guardrails before injecting accepted items into the model's next turn.Usage
Types
TurnActiontells the framework what to do after draining. Extensible to new action types in the future, giving developers more granular control.TurnInterceptor class
Key design decisions
UserErrorif passed toRunner.run()inject()is sync — returns immediately with aninjection_id. No async context needed.on_consumed/on_rejectedwrapped in try/except, never crash the agent loopTest plan
tests/test_turn_interceptor.pycovering:__call__()drain behavior with guardrails (pass/trip/mixed), callback exceptions, async callbacksexamples/basic/turn_interceptor.pyexercises all 4 NextStep paths end-to-end with assertionsmake format,make lint,make typecheckall pass (zero errors)Issue number
Closes #2671
Checks
make lintandmake formatImplementation Details
How It Works (Lifecycle)
1. Run starts →
reset()(fresh runs only)Called before the while loop on fresh runs (not on resume from interruption). Rejects leftover items, sets context/agent, bumps version. Skipped on resume so messages injected during interruption survive.
2. Each loop iteration →
update_agent()Called at the top of every iteration. Bumps version on agent change (handoff).
3. At
NextStepRunAgain→__call__()drains the queueThe only injection point. Cancellation is checked first — if
_cancel_mode == "after_turn", skip drain.4. At other NextSteps:
on_rejectedfiresupdate_agent()on_rejectedat next drainreset()skipped on resume. Items consumed at next drain after resume.on_rejectedat run end5. Run ends →
_reject_all_pending()Called after the while loop exits (unless interrupted). Drains everything remaining and fires
on_rejected.Design Choices
Version-based staleness (no locks) — Each queued item tagged with
_versionat enqueue time. Version increments onreset()(new run) andupdate_agent()(handoff). Stale items rejected at drain. Handles: agent changes during guardrail execution, items queued between runs, items for previous agent after handoff.Guardrails at drain time —
inject()stays sync/thread-safe. Stale items discarded before guardrails run (no wasted validation). Guardrails run against whoever will actually process the message. Failed items go toon_rejected.Streaming-only —
UserErrorif passed toRunner.run(). Non-streaming blocks until completion; user can't interact mid-run.Turn counter not reset —
max_turnsis a hard safety limit. Agent uses remaining budget.Cancellation — Interceptor drain skipped when
_cancel_mode == "after_turn". Items stay in queue, rejected at run end via_reject_all_pending().Callback safety —
on_consumed/on_rejectedwrapped in try/except withlogger.warning. A broken callback never crashes the agent loop. Different from other SDK hooks (which propagate) because these are observability notifications, not control-flow gates.