Skip to content

[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179

Open
cherryMJY wants to merge 1 commit intoapache:developfrom
cherryMJY:develop
Open

[ISSUE #xxxx] Fix thread-safe issue in ConsumerManager.topicGroupTable#10179
cherryMJY wants to merge 1 commit intoapache:developfrom
cherryMJY:develop

Conversation

@cherryMJY
Copy link

Brief Description
The topicGroupTable in ConsumerManager uses HashSet to store consumer groups, which is not thread-safe. When multiple consumers concurrently register with the same topic via heartbeat requests, HashSet.add() may lose entries due to race conditions.

Concurrent scenarios that trigger this bug:

Multiple consumers start up simultaneously and send heartbeat requests
Network reconnect triggers batch consumer re-registration
Proxy syncs consumer info to broker concurrently
Impact:

Consumer groups may not be recorded in topicGroupTable, causing message routing failures
In extreme cases, HashSet internal structure may be corrupted, leading to infinite loops during iteration
Solution:
Replace HashSet with ConcurrentHashMap.newKeySet() which provides thread-safe add/remove operations using CAS + synchronized mechanism. This pattern is already used in other RocketMQ components:

LiteSubscriptionRegistryImpl.liteTopic2Group
TopicList.topicList
LiteSubscription.liteTopicSet
Modified files:

broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Replace HashSet with ConcurrentHashMap.newKeySet() to prevent data loss
when multiple consumers concurrently register with the same topic.

HashSet is not thread-safe and may lose entries under concurrent add()
operations. ConcurrentHashMap.newKeySet() provides thread-safe mutations
and is already used in other RocketMQ components.
Senrian added a commit to Senrian/rocketmq that referenced this pull request Mar 20, 2026
…fe topicGroupTable

The topicGroupTable in ConsumerManager uses ConcurrentHashMap but stores
plain HashSet values, which are NOT thread-safe. When multiple consumers
concurrently register with the same topic via heartbeat requests,
HashSet.add() may lose entries due to race conditions.

This fix replaces:
  - new HashSet<>() with ConcurrentHashMap.newKeySet()
  - topicGroupTable.get() + null check + putIfAbsent() pattern with
    computeIfAbsent() for atomic get-or-create

This pattern is already used in other RocketMQ components:
  - LiteSubscriptionRegistryImpl.liteTopic2Group
  - TopicList.topicList  
  - LiteSubscription.liteTopicSet

Fixes thread-safety issue reported in apache#10179
@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 48.86%. Comparing base (ebf1595) to head (38b71b8).
⚠️ Report is 1 commits behind head on develop.

Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10179      +/-   ##
=============================================
- Coverage      48.94%   48.86%   -0.08%     
+ Complexity     13389    13367      -22     
=============================================
  Files           1373     1373              
  Lines          99924    99920       -4     
  Branches       12908    12904       -4     
=============================================
- Hits           48904    48827      -77     
- Misses         45097    45151      +54     
- Partials        5923     5942      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants