[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179
Open
cherryMJY wants to merge 1 commit intoapache:developfrom
Open
[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179cherryMJY wants to merge 1 commit intoapache:developfrom
cherryMJY wants to merge 1 commit intoapache:developfrom
Conversation
Replace HashSet with ConcurrentHashMap.newKeySet() to prevent data loss when multiple consumers concurrently register with the same topic. HashSet is not thread-safe and may lose entries under concurrent add() operations. ConcurrentHashMap.newKeySet() provides thread-safe mutations and is already used in other RocketMQ components.
Senrian
added a commit
to Senrian/rocketmq
that referenced
this pull request
Mar 20, 2026
…fe topicGroupTable
The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores
plain HashSet values, which are NOT thread-safe. When multiple consumers
concurrently register with the same topic via heartbeat requests,
HashSet.add() may lose entries due to race conditions.
This fix replaces:
- new HashSet<>() with ConcurrentHashMap.newKeySet()
- topicGroupTable.get() + null check + putIfAbsent() pattern with
computeIfAbsent() for atomic get-or-create
This pattern is already used in other RocketMQ components:
- LiteSubscriptionRegistryImpl.liteTopic2Group
- TopicList.topicList
- LiteSubscription.liteTopicSet
Fixes thread-safety issue reported in apache#10179
lizhimins
approved these changes
Mar 21, 2026
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #10179 +/- ##
=============================================
- Coverage 48.94% 48.86% -0.08%
+ Complexity 13389 13367 -22
=============================================
Files 1373 1373
Lines 99924 99920 -4
Branches 12908 12904 -4
=============================================
- Hits 48904 48827 -77
- Misses 45097 45151 +54
- Partials 5923 5942 +19 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Brief Description
The topicGroupTable in ConsumerManager uses HashSet to store consumer groups, which is not thread-safe. When multiple consumers concurrently register with the same topic via heartbeat requests, HashSet.add() may lose entries due to race conditions.
Concurrent scenarios that trigger this bug:
Multiple consumers start up simultaneously and send heartbeat requests
Network reconnect triggers batch consumer re-registration
Proxy syncs consumer info to broker concurrently
Impact:
Consumer groups may not be recorded in topicGroupTable, causing message routing failures
In extreme cases, HashSet internal structure may be corrupted, leading to infinite loops during iteration
Solution:
Replace HashSet with ConcurrentHashMap.newKeySet() which provides thread-safe add/remove operations using CAS + synchronized mechanism. This pattern is already used in other RocketMQ components:
LiteSubscriptionRegistryImpl.liteTopic2Group
TopicList.topicList
LiteSubscription.liteTopicSet
Modified files:
broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java