Add MessageBufferConfig to allow custom back-pressure config for message.new events#6472
Conversation
…nfig for message.new events Co-Authored-By: Claude <noreply@anthropic.com>
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
SDK Size Comparison 📏
|
WalkthroughThis PR introduces configurable bounded buffering for socket ChangesMessage Buffering and Overflow Handling
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`:
- Around line 133-136: Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.
- Around line 218-221: Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c56d6018-28d0-4d38-a129-e44964a672b9
📒 Files selected for processing (7)
stream-chat-android-client/api/stream-chat-android-client.apistream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.ktstream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.ktstream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/plugin/factory/StreamStatePluginFactory.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/TotalUnreadCountTest.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialTest.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt
MessageBufferConfig to allow custom back-pressure config for message.new events
|
|
🚀 Available in v7.3.0 |


Goal
Ports V6 PR #6406 (commit
3b6c6546418d010ba4dc29847459bece845bd878) todevelop.Linear issue: AND-1166
Manual port (not a clean cherry-pick) — v7 restructured
stream-chat-android-stateintostream-chat-android-clientand mergedStatePluginConfigfields intoChatClientConfig. As a consequence the file paths, packages, and theMessageLimitConfiglocation all moved:stream-chat-android-statemodulestream-chat-android-clientmoduleio.getstream.chat.android.state.plugin.config.MessageLimitConfigio.getstream.chat.android.client.api.MessageLimitConfigStatePluginConfig.messageLimitConfigChatClientConfig.messageLimitConfigSame intent as the V6 PR — high-traffic channel types (e.g. livestreams) can produce a flood of
message.newevents that arrive faster than the sequential event-handling pipeline can process them. The current implementation funnels every socket event through a singleMutableSharedFlowwithextraBufferCapacity = Int.MAX_VALUE, which means there is no back-pressure: a burst of new-message events queues up unbounded memory and starves more important signals (reads, bans, member updates) of timely processing.This PR introduces a
MessageBufferConfig(underMessageLimitConfig.messageBufferConfig) that lets integrators opt specific channel types into a bounded buffer forNewMessageEvents, with a configurable overflow strategy (DROP_OLDEST/DROP_LATEST). Signal-critical events and events for non-opted-in channel types are unaffected. Default is a no-op.Implementation
MessageBufferConfig(underMessageLimitConfig.messageBufferConfig) exposing:channelTypes: Set<String>— channel types whoseNewMessageEvents go through the bounded buffer (empty by default → feature is a no-op).capacity: Int— buffer capacity (defaults toInt.MAX_VALUE).overflow: MessageBufferOverflow— overflow strategy (DROP_OLDEST/DROP_LATEST, defaults toDROP_OLDEST).EventHandlerSequentialnow allocates a secondaryMutableSharedFlow(bufferedNewMessageEvents) lazily, only when buffering is enabled, so the default configuration pays no cost for it.defaultSocketEventListener— the existing unbuffered path; used when no channel types are opted in.bufferedSocketEventListener— routesNewMessageEvents for opted-in channel types to the bounded flow, and everything else (including non-opted-inNewMessageEvents and all other event types) to the unbuffered flow.startListening()picks the listener based onbufferConfig.channelTypes.isNotEmpty()and only collects frombufferedNewMessageEventswhen buffering is enabled.StreamStatePluginFactorywires the config fromChatClientConfig.messageLimitConfig.messageBufferConfigintoEventHandlerSequential.The bounded flow shares the same downstream pipeline (
socketEventCollector→handleBatchEvent) as the unbuffered flow, so ordering inside each flow is preserved and back-pressure is applied independently per flow.Files changed (7)
stream-chat-android-client/api/stream-chat-android-client.api— auto-regenerated viaapiDumpstream-chat-android-client/.../api/ChatClientConfig.ktstream-chat-android-client/.../internal/state/event/handler/internal/EventHandlerSequential.ktstream-chat-android-client/.../internal/state/plugin/factory/StreamStatePluginFactory.ktstream-chat-android-client/.../test/.../internal/state/event/TotalUnreadCountTest.ktstream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialTest.ktstream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.ktStats match V6 exactly: +392 / −26.
UI Changes
No UI changes.
Testing
EventHandlerSequentialTestcovering:NewMessageEvents for opted-in channel types are routed through the bounded buffer.NewMessageEvents for non-opted-in channel types and all non-NewMessageEventevents keep using the unbuffered path.DROP_OLDEST/DROP_LATESToverflow strategies behave as expected when the buffer is full.TotalUnreadCountTest,EventHandlerSequentialUserMessagesDeletedTest) updated to pass the newbufferConfigargument.Verification on develop:
EventHandlerSequentialTest,EventHandlerSequentialUserMessagesDeletedTest,TotalUnreadCountTest,StateRegistryTest)Summary by CodeRabbit