Skip to content

[ISSUE #10179] Fix thread-safe issue in ConsumerManager.topicGroupTable#10180

Open
Senrian wants to merge 1 commit intoapache:developfrom
Senrian:fix/issue-10179-topicGroupTable-threadSafety
Open

[ISSUE #10179] Fix thread-safe issue in ConsumerManager.topicGroupTable#10180
Senrian wants to merge 1 commit intoapache:developfrom
Senrian:fix/issue-10179-topicGroupTable-threadSafety

Conversation

@Senrian
Copy link

@Senrian Senrian commented Mar 20, 2026

Brief Description

The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores plain HashSet values, which are NOT thread-safe. When multiple consumers concurrently register with the same topic via heartbeat requests, HashSet.add() may lose entries due to race conditions.

Concurrent Scenarios That Trigger This Bug

  • Multiple consumers start up simultaneously and send heartbeat requests
  • Network reconnect triggers batch consumer re-registration
  • Proxy syncs consumer info to broker concurrently

Impact

  • Consumer groups may not be recorded in topicGroupTable, causing message routing failures
  • In extreme cases, HashSet internal structure may be corrupted, leading to infinite loops during iteration

Solution

Replace HashSet with ConcurrentHashMap.newKeySet() which provides thread-safe add/remove operations using CAS + synchronized mechanism.

Additionally, replaced the topicGroupTable.get() + null check + putIfAbsent() pattern with computeIfAbsent() for atomic get-or-create, eliminating a potential TOCTOU race window.

This pattern is already used in other RocketMQ components:

  • LiteSubscriptionRegistryImpl.liteTopic2Group
  • TopicList.topicList
  • LiteSubscription.liteTopicSet

Modified Files

  • broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Testing

The fix uses ConcurrentHashMap.newKeySet() which is a well-established thread-safe pattern in the JVM ecosystem. The change is minimal and non-breaking - it only changes the internal synchronization mechanism without altering any public API or behavior visible to callers.


Fixes #10179

…fe topicGroupTable

The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores
plain HashSet values, which are NOT thread-safe. When multiple consumers
concurrently register with the same topic via heartbeat requests,
HashSet.add() may lose entries due to race conditions.

This fix replaces:
  - new HashSet<>() with ConcurrentHashMap.newKeySet()
  - topicGroupTable.get() + null check + putIfAbsent() pattern with
    computeIfAbsent() for atomic get-or-create

This pattern is already used in other RocketMQ components:
  - LiteSubscriptionRegistryImpl.liteTopic2Group
  - TopicList.topicList  
  - LiteSubscription.liteTopicSet

Fixes thread-safety issue reported in apache#10179
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant