Add message concurrency controls to AIChatAgent#1192
Conversation
🦋 Changeset detectedLatest commit: ede9abb The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
Add enqueueTurn() to AIChatAgent so subclasses can queue programmatic turns on the same serialized executor as websocket submits. Add workers tests and docs covering queued programmatic turns. User request: extend Cloudflare PR cloudflare#1192 so websocket and programmatic turns can share the same queue system, then commit, push, and open a draft PR against the parent repo.
|
@dctanner if this works for you, I will ask @threepointone to review |
Just thinking through the strategies, we could leave out debounce for now. It's a bit of an advanced use case, and may make sense to do as part of first class messaging app support. Other than that, this looks great. We'll need to make sure it works correctly with HITL tool approvals. |
|
I'll review this soon. I'll aim to land this early/mid next week. |
|
(don't rebase or merge, I'm working on this pr) |
Add `AIChatAgent.messageConcurrency` with queue, latest, merge, drop,
and debounce strategies for overlapping sendMessage() submits.
Enhance `saveMessages()` to accept a functional form for deriving
messages from the latest transcript, and return { requestId, status }
so callers can detect skipped turns.
Test coverage for all strategies, onChatResponse interaction with each
strategy, queued programmatic turns, and clear/epoch behavior.
Co-authored-by: whoiskatrin <kreznykova@cloudflare.com>
Made-with: Cursor
b309f45 to
ede9abb
Compare
Changes from the original PRRebased onto main (after #1228 landed) and made the following changes: API refinements
Edge case fixes
Integration with #1228
Docs
Test coverage
|
|
this is feeling good to me, so I merged it |
Summary
Add
AIChatAgent.messageConcurrencyto control how overlappingsendMessage()submits behave while another chat turn is active. Five strategies:queue(default),latest,merge,drop, anddebounce.Also enhance
saveMessages()to accept a functional form for deriving messages from the latest transcript, and return{ requestId, status }so callers can detect skipped turns.Rebased on top of #1228 (
onChatResponsehook). The two features share the same turn executor and integrate cleanly —onChatResponsefires only for turns that actually call_reply(), so superseded/dropped turns correctly do not trigger the hook.Usage
Configuring a strategy
Choosing a strategy
"latest"— user can correct themselves mid-stream, only the last message gets a response"queue"(default) or"merge"to collapse rapid-fire messages into one turn"drop"— overlapping submits are rejected and the client rolls back{ strategy: "debounce", debounceMs: 750 }— wait for a quiet window before respondingWhat the user sees
"queue""latest""merge""drop""debounce""latest", but the response waits for a quiet period before startingEnhanced
saveMessages()Pass a function to derive from the latest transcript at execution time — useful for schedule callbacks and webhook handlers where the message list may have changed since the call was made:
Check
statusto detect whether the turn was skipped:Strategy details
"queue"— default. Every submit is persisted and every submit gets its own turn once earlier work finishes."latest"— persists overlapping submits, but only the newest queued overlap turn actually runsonChatMessage(). Older queued overlap turns receive an emptydone: trueresponse and complete without calling the model."merge"— persists overlapping submits while the active turn runs, then rewrites the queued tail of user messages into one combined user message before the latest queued turn starts."drop"— rejects overlapping submits before persistence and broadcasts the server transcript back so the client rolls back its optimistic state.{ strategy: "debounce", debounceMs }— behaves like"latest", but waits for a quiet window before starting the surviving queued overlap turn. Falls back to 750ms whendebounceMsis missing or invalid.Scope and exemptions
messageConcurrencyapplies only to overlappingsendMessage()/trigger: "submit-message"requests. The following are exempt and always serialize normally:regenerate()/trigger: "regenerate-message"CF_AGENT_TOOL_APPROVAL)saveMessages()(programmatic turns)CF_AGENT_CHAT_CLEARThis means HITL tool approval flows work correctly with all strategies. Approvals enter the turn queue via
_queueAutoContinuation, which bypasses_getSubmitConcurrencyDecisionentirely. In"drop"mode, a user cannot send a new message while an approval continuation is queued — consistent with drop semantics ("one thing at a time").Architecture
The implementation keeps a single turn executor (
_chatTurnQueue/_runExclusiveChatTurn). All concurrency state is epoch-scoped and cleaned up inresetTurnState():_queuedChatTurnCountsByEpoch— tracks active/queued turns per epoch to detect overlaps_mergeQueuedUserStartIndexByEpoch— marks where overlapping user messages begin for merge_submitSequence/_latestOverlappingSubmitSequence— monotonic counters for superseding_activeDebounceTimer/_activeDebounceResolve— cancellable debounce timer, cleared onresetTurnState()User messages are persisted and broadcast before entering the turn lock (so other tabs see them immediately). The concurrency decision is computed eagerly at submit time; the turn body checks epoch + superseded status when it finally runs.
The
onChatResponsehook (from #1228) integrates naturally: it fires only when_reply()pushes a result to_pendingChatResponseResults. Superseded turns never call_reply, so the hook correctly fires only for turns that produced a response.What changed
packages/ai-chat/src/index.tssaveMessages(),agentContext.run()for programmatic turnspackages/ai-chat/src/tests/message-concurrency.test.tspackages/ai-chat/src/tests/programmatic-turns.test.tssaveMessages()queuing, functional form, skipped-after-clearpackages/ai-chat/src/tests/worker.tsonChatResponsetracking onSlowStreamAgentpackages/ai-chat/src/tests/wrangler.jsoncdocs/chat-agents.mdsaveMessages()functional formpackages/ai-chat/README.mdexamples/ai-chat/src/server.tssendProactiveMessageusessaveMessages()functional form.changeset/nasty-pumpkins-juggle.mdReviewer notes
packages/ai-chat/src/index.ts— the submit-message handler (line ~625) is where concurrency decisions happen, and_runExclusiveChatTurn(line ~1800) is where epoch tracking andonChatResponsedrain coexist.saveMessages()replaces the oldenqueueTurn()API from earlier iterations. One method, two forms (array or function), returns{ requestId, status }. No new methods to learn._mergeQueuedUserMessagesis called twice for merge mode — once outside the lock (line ~662) and once inside (line ~701). The comment at line ~699 explains: more overlapping submits may arrive while the turn is queued._cancelActiveDebounce()inresetTurnState()prevents the DO from staying alive unnecessarily after a clear._runProgrammaticChatTurnwraps inagentContext.run()— sogetCurrentAgent()works inside programmatic turns, matching the WS and auto-continuation paths.CF_AGENT_USE_CHAT_RESPONSEframes withdone: true.Testing
All passing.