[ISSUE #10179] Fix thread-safe issue in ConsumerManager.topicGroupTable#10180
Open
Senrian wants to merge 1 commit intoapache:developfrom
Open
[ISSUE #10179] Fix thread-safe issue in ConsumerManager.topicGroupTable#10180Senrian wants to merge 1 commit intoapache:developfrom
Senrian wants to merge 1 commit intoapache:developfrom
Conversation
…fe topicGroupTable
The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores
plain HashSet values, which are NOT thread-safe. When multiple consumers
concurrently register with the same topic via heartbeat requests,
HashSet.add() may lose entries due to race conditions.
This fix replaces:
- new HashSet<>() with ConcurrentHashMap.newKeySet()
- topicGroupTable.get() + null check + putIfAbsent() pattern with
computeIfAbsent() for atomic get-or-create
This pattern is already used in other RocketMQ components:
- LiteSubscriptionRegistryImpl.liteTopic2Group
- TopicList.topicList
- LiteSubscription.liteTopicSet
Fixes thread-safety issue reported in apache#10179
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Brief Description
The
topicGroupTableinConsumerManagerusesConcurrentHashMapbut stores plainHashSetvalues, which are NOT thread-safe. When multiple consumers concurrently register with the same topic via heartbeat requests,HashSet.add()may lose entries due to race conditions.Concurrent Scenarios That Trigger This Bug
Impact
topicGroupTable, causing message routing failuresHashSetinternal structure may be corrupted, leading to infinite loops during iterationSolution
Replace
HashSetwithConcurrentHashMap.newKeySet()which provides thread-safe add/remove operations using CAS + synchronized mechanism.Additionally, replaced the
topicGroupTable.get() + null check + putIfAbsent()pattern withcomputeIfAbsent()for atomic get-or-create, eliminating a potential TOCTOU race window.This pattern is already used in other RocketMQ components:
LiteSubscriptionRegistryImpl.liteTopic2GroupTopicList.topicListLiteSubscription.liteTopicSetModified Files
broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.javaTesting
The fix uses
ConcurrentHashMap.newKeySet()which is a well-established thread-safe pattern in the JVM ecosystem. The change is minimal and non-breaking - it only changes the internal synchronization mechanism without altering any public API or behavior visible to callers.Fixes #10179