Skip to content

Add message concurrency controls to AIChatAgent#1192

Merged
threepointone merged 1 commit intomainfrom
feat/ai-chat-message-concurrency
Mar 29, 2026
Merged

Add message concurrency controls to AIChatAgent#1192
threepointone merged 1 commit intomainfrom
feat/ai-chat-message-concurrency

Conversation

@whoiskatrin
Copy link
Copy Markdown
Contributor

@whoiskatrin whoiskatrin commented Mar 25, 2026

Summary

Add AIChatAgent.messageConcurrency to control how overlapping sendMessage() submits behave while another chat turn is active. Five strategies: queue (default), latest, merge, drop, and debounce.

Also enhance saveMessages() to accept a functional form for deriving messages from the latest transcript, and return { requestId, status } so callers can detect skipped turns.

Rebased on top of #1228 (onChatResponse hook). The two features share the same turn executor and integrate cleanly — onChatResponse fires only for turns that actually call _reply(), so superseded/dropped turns correctly do not trigger the hook.

Usage

Configuring a strategy

import { AIChatAgent } from "@cloudflare/ai-chat";

export class ChatAgent extends AIChatAgent {
  // Keep only the latest overlapping submit
  messageConcurrency = "latest";

  async onChatMessage() {
    // ...
    return result.toUIMessageStreamResponse();
  }
}

Choosing a strategy

  • Focused assistant (one question at a time): "latest" — user can correct themselves mid-stream, only the last message gets a response
  • Messaging/chat app (every message matters): "queue" (default) or "merge" to collapse rapid-fire messages into one turn
  • Prevent double-sends: "drop" — overlapping submits are rejected and the client rolls back
  • Burst messages: { strategy: "debounce", debounceMs: 750 } — wait for a quiet window before responding

What the user sees

Strategy Client behavior
"queue" Every message gets its own assistant response, in order
"latest" All messages appear in the transcript, but only the last overlapping one gets a response
"merge" Overlapping messages are collapsed into one combined message, which gets a single response
"drop" The overlapping message briefly appears (optimistic), then disappears (server rollback)
"debounce" Same as "latest", but the response waits for a quiet period before starting

Enhanced saveMessages()

Pass a function to derive from the latest transcript at execution time — useful for schedule callbacks and webhook handlers where the message list may have changed since the call was made:

// From a schedule() callback or webhook handler:
await this.saveMessages((messages) => [
  ...messages,
  {
    id: nanoid(),
    role: "user",
    parts: [{ type: "text", text: `Task result: ${result}` }]
  }
]);

Check status to detect whether the turn was skipped:

const { status } = await this.saveMessages((messages) => [...messages, msg]);
if (status === "skipped") {
  // Chat was cleared while we were queued — retry or log
}

Strategy details

  • "queue" — default. Every submit is persisted and every submit gets its own turn once earlier work finishes.
  • "latest" — persists overlapping submits, but only the newest queued overlap turn actually runs onChatMessage(). Older queued overlap turns receive an empty done: true response and complete without calling the model.
  • "merge" — persists overlapping submits while the active turn runs, then rewrites the queued tail of user messages into one combined user message before the latest queued turn starts.
  • "drop" — rejects overlapping submits before persistence and broadcasts the server transcript back so the client rolls back its optimistic state.
  • { strategy: "debounce", debounceMs } — behaves like "latest", but waits for a quiet window before starting the surviving queued overlap turn. Falls back to 750ms when debounceMs is missing or invalid.

Scope and exemptions

messageConcurrency applies only to overlapping sendMessage() / trigger: "submit-message" requests. The following are exempt and always serialize normally:

  • regenerate() / trigger: "regenerate-message"
  • Tool continuations (auto-continue after tool results)
  • Tool approvals (HITL CF_AGENT_TOOL_APPROVAL)
  • saveMessages() (programmatic turns)
  • CF_AGENT_CHAT_CLEAR

This means HITL tool approval flows work correctly with all strategies. Approvals enter the turn queue via _queueAutoContinuation, which bypasses _getSubmitConcurrencyDecision entirely. In "drop" mode, a user cannot send a new message while an approval continuation is queued — consistent with drop semantics ("one thing at a time").

Architecture

The implementation keeps a single turn executor (_chatTurnQueue / _runExclusiveChatTurn). All concurrency state is epoch-scoped and cleaned up in resetTurnState():

  • _queuedChatTurnCountsByEpoch — tracks active/queued turns per epoch to detect overlaps
  • _mergeQueuedUserStartIndexByEpoch — marks where overlapping user messages begin for merge
  • _submitSequence / _latestOverlappingSubmitSequence — monotonic counters for superseding
  • _activeDebounceTimer / _activeDebounceResolve — cancellable debounce timer, cleared on resetTurnState()

User messages are persisted and broadcast before entering the turn lock (so other tabs see them immediately). The concurrency decision is computed eagerly at submit time; the turn body checks epoch + superseded status when it finally runs.

The onChatResponse hook (from #1228) integrates naturally: it fires only when _reply() pushes a result to _pendingChatResponseResults. Superseded turns never call _reply, so the hook correctly fires only for turns that produced a response.

What changed

File Change
packages/ai-chat/src/index.ts Core implementation: concurrency decision, epoch tracking, merge helpers, debounce timer, enhanced saveMessages(), agentContext.run() for programmatic turns
packages/ai-chat/src/tests/message-concurrency.test.ts 13 tests: all strategies, onChatResponse interaction, regenerate exempt, clear/epoch behavior
packages/ai-chat/src/tests/programmatic-turns.test.ts 3 tests: saveMessages() queuing, functional form, skipped-after-clear
packages/ai-chat/src/tests/worker.ts Test agents for each strategy, onChatResponse tracking on SlowStreamAgent
packages/ai-chat/src/tests/wrangler.jsonc DO bindings and migrations for test agents
docs/chat-agents.md Strategy docs, decision guide, UX descriptions, saveMessages() functional form
packages/ai-chat/README.md Overlapping Messages section, API table updates
examples/ai-chat/src/server.ts sendProactiveMessage uses saveMessages() functional form
.changeset/nasty-pumpkins-juggle.md Minor changeset

Reviewer notes

  • Start with packages/ai-chat/src/index.ts — the submit-message handler (line ~625) is where concurrency decisions happen, and _runExclusiveChatTurn (line ~1800) is where epoch tracking and onChatResponse drain coexist.
  • saveMessages() replaces the old enqueueTurn() API from earlier iterations. One method, two forms (array or function), returns { requestId, status }. No new methods to learn.
  • Persistence happens outside the turn lock — this is intentional. The comment at line ~646 explains why. Other tabs see user messages immediately; the turn queue only serializes the model call.
  • _mergeQueuedUserMessages is called twice for merge mode — once outside the lock (line ~662) and once inside (line ~701). The comment at line ~699 explains: more overlapping submits may arrive while the turn is queued.
  • Debounce timer is cancelled on clear_cancelActiveDebounce() in resetTurnState() prevents the DO from staying alive unnecessarily after a clear.
  • _runProgrammaticChatTurn wraps in agentContext.run() — so getCurrentAgent() works inside programmatic turns, matching the WS and auto-continuation paths.
  • Hibernation safe — all new state is ephemeral and only meaningful during active turn processing. After hibernation, epoch counts reset to 0, first submit bypasses concurrency (no active turns to overlap with). User messages are in SQLite.
  • No transport protocol changes — skipped or collapsed work completes through existing CF_AGENT_USE_CHAT_RESPONSE frames with done: true.

Testing

# Concurrency + programmatic turn tests
npx vitest --project workers src/tests/message-concurrency.test.ts src/tests/programmatic-turns.test.ts

# Full ai-chat test suite (340 tests)
npx vitest --project workers

# Full repo checks (all 69 projects typecheck)
npm run check

All passing.

@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Mar 25, 2026

🦋 Changeset detected

Latest commit: ede9abb

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
@cloudflare/ai-chat Minor

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no bugs or issues to report.

Open in Devin Review

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new bot commented Mar 25, 2026

Open in StackBlitz

agents

npm i https://pkg.pr.new/agents@1192

@cloudflare/ai-chat

npm i https://pkg.pr.new/@cloudflare/ai-chat@1192

@cloudflare/codemode

npm i https://pkg.pr.new/@cloudflare/codemode@1192

hono-agents

npm i https://pkg.pr.new/hono-agents@1192

@cloudflare/shell

npm i https://pkg.pr.new/@cloudflare/shell@1192

@cloudflare/think

npm i https://pkg.pr.new/@cloudflare/think@1192

@cloudflare/voice

npm i https://pkg.pr.new/@cloudflare/voice@1192

@cloudflare/worker-bundler

npm i https://pkg.pr.new/@cloudflare/worker-bundler@1192

commit: ede9abb

dctanner added a commit to dctanner/agents that referenced this pull request Mar 26, 2026
Add enqueueTurn() to AIChatAgent so subclasses can queue
programmatic turns on the same serialized executor as websocket submits.
Add workers tests and docs covering queued programmatic turns.

User request: extend Cloudflare PR cloudflare#1192 so websocket and programmatic
turns can share the same queue system, then commit, push, and open a
draft PR against the parent repo.
@whoiskatrin whoiskatrin marked this pull request as ready for review March 27, 2026 10:49
@whoiskatrin
Copy link
Copy Markdown
Contributor Author

@dctanner if this works for you, I will ask @threepointone to review

@dctanner
Copy link
Copy Markdown

@dctanner if this works for you, I will ask @threepointone to review

Just thinking through the strategies, we could leave out debounce for now. It's a bit of an advanced use case, and may make sense to do as part of first class messaging app support.

Other than that, this looks great.

We'll need to make sure it works correctly with HITL tool approvals.

@threepointone
Copy link
Copy Markdown
Contributor

I'll review this soon. I'll aim to land this early/mid next week.

@threepointone
Copy link
Copy Markdown
Contributor

(don't rebase or merge, I'm working on this pr)

Add `AIChatAgent.messageConcurrency` with queue, latest, merge, drop,
and debounce strategies for overlapping sendMessage() submits.

Enhance `saveMessages()` to accept a functional form for deriving
messages from the latest transcript, and return { requestId, status }
so callers can detect skipped turns.

Test coverage for all strategies, onChatResponse interaction with each
strategy, queued programmatic turns, and clear/epoch behavior.

Co-authored-by: whoiskatrin <kreznykova@cloudflare.com>
Made-with: Cursor
@threepointone threepointone force-pushed the feat/ai-chat-message-concurrency branch from b309f45 to ede9abb Compare March 29, 2026 18:15
@threepointone
Copy link
Copy Markdown
Contributor

Changes from the original PR

Rebased onto main (after #1228 landed) and made the following changes:

API refinements

  • Removed enqueueTurn() from the public API. Its functionality was merged into saveMessages(), which now accepts a functional form: await this.saveMessages((messages) => [...messages, newMsg]). One method instead of two — no new API to learn.
  • saveMessages() now returns { requestId, status } (SaveMessagesResult) so callers can detect whether the turn ran or was skipped after a clear. Backwards-compatible: existing callers that ignore the return value still work.
  • EnqueueTurnInput / EnqueueTurnResult types removed from exports, replaced by SaveMessagesResult.
  • Kept messageConcurrency name (considered messageOverlap but reverted — "concurrency" correctly describes concurrent message arrivals, and messageOverlap = "queue" doesn't make sense since queue mode has no overlap).

Edge case fixes

  • _runProgrammaticChatTurn now wraps in agentContext.run() so getCurrentAgent() works inside programmatic turns, matching the WebSocket and auto-continuation paths.
  • Debounce timer is cancelled on clear via _cancelActiveDebounce() in resetTurnState() — prevents the DO from staying alive for the debounce window after the user clears the chat.
  • Merge walker logs a warning if it encounters a non-user message in the expected merge window — defensive diagnostic for an edge case that shouldn't happen under normal use.
  • Comments added explaining why persistence happens outside the turn lock and why _mergeQueuedUserMessages is called twice (once outside, once inside the lock).

Integration with #1228

  • onChatResponse works correctly with all concurrency strategies: fires only for turns that call _reply(), so superseded/dropped turns do not trigger the hook. Four new tests verify this for queue, latest, drop, and merge.
  • _runExclusiveChatTurn finally block cleanly combines epoch cleanup (before releaseTurn()) with the onChatResponse drain loop (after releaseTurn()).
  • resetTurnState clears _pendingChatResponseResults alongside the new merge/debounce state.

Docs

  • Added "Choosing a strategy" decision guide and "What the user sees" UX descriptions for each strategy.
  • Updated all docs and examples to use saveMessages() instead of enqueueTurn().
  • Updated examples/ai-chat to use the functional saveMessages() form for proactive messages.
  • Added messageConcurrency, onChatResponse, and SaveMessagesResult to the README API table.

Test coverage

  • 16 new tests (13 concurrency + 3 programmatic turns), bringing the total to 340.
  • onChatResponse interaction tests for each strategy (latest, drop, merge, queue).

@threepointone threepointone merged commit 28925b6 into main Mar 29, 2026
2 checks passed
@threepointone threepointone deleted the feat/ai-chat-message-concurrency branch March 29, 2026 18:48
@github-actions github-actions bot mentioned this pull request Mar 29, 2026
@threepointone
Copy link
Copy Markdown
Contributor

this is feeling good to me, so I merged it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants