Skip to content

Commit 2606df9

Browse files
Fix multi-topics consumer will crash if one internal consumer fails getBrokerConsumerStatsAsync (apache#538)
1 parent b3edc60 commit 2606df9

7 files changed

Lines changed: 81 additions & 43 deletions

lib/ClientConnection.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,9 +997,14 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
997997
lock.unlock();
998998
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
999999
promise.setFailed(ResultNotConnected);
1000+
return promise.getFuture();
10001001
}
10011002
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
10021003
lock.unlock();
1004+
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
1005+
mockServer_->sendRequest("CONSUMER_STATS", requestId)) {
1006+
return promise.getFuture();
1007+
}
10031008
sendCommand(Commands::newConsumerStats(consumerId, requestId));
10041009
return promise.getFuture();
10051010
}

lib/ClientConnection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
219219
mockingRequests_.store(true, std::memory_order_release);
220220
}
221221

222+
void handleKeepAliveTimeout();
223+
222224
private:
223225
struct PendingRequestData {
224226
Promise<Result, ResponseData> promise;
@@ -284,8 +286,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
284286

285287
void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const LastMessageIdRequestData& data);
286288

287-
void handleKeepAliveTimeout();
288-
289289
template <typename Handler>
290290
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
291291
return AllocHandler<Handler>(readHandlerAllocator_, h);

lib/MockServer.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,18 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
7575
}
7676
});
7777
}
78-
schedule(connection, request + std::to_string(requestId), iter->second, [connection, requestId] {
79-
proto::CommandSuccess success;
80-
success.set_request_id(requestId);
81-
connection->handleSuccess(success);
82-
});
78+
schedule(connection, request + std::to_string(requestId), iter->second,
79+
[connection, request, requestId] {
80+
if (request == "CONSUMER_STATS") {
81+
proto::CommandConsumerStatsResponse response;
82+
response.set_request_id(requestId);
83+
connection->handleConsumerStatsResponse(response);
84+
} else {
85+
proto::CommandSuccess success;
86+
success.set_request_id(requestId);
87+
connection->handleSuccess(success);
88+
}
89+
});
8390
return true;
8491
} else {
8592
return false;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -847,48 +847,47 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(const BrokerConsumerSt
847847
Lock lock(mutex_);
848848
MultiTopicsBrokerConsumerStatsPtr statsPtr =
849849
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
850-
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
850+
auto latchPtr = std::make_shared<std::atomic_size_t>(numberTopicPartitions_->load());
851851
lock.unlock();
852852

853853
size_t i = 0;
854-
consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
855-
size_t index = i++;
856-
auto weakSelf = weak_from_this();
857-
consumer->getBrokerConsumerStatsAsync([this, weakSelf, latchPtr, statsPtr, index, callback](
858-
Result result, const BrokerConsumerStats& stats) {
859-
auto self = weakSelf.lock();
860-
if (self) {
861-
handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
862-
}
854+
auto failedResult = std::make_shared<std::atomic<Result>>(ResultOk);
855+
consumers_.forEachValue(
856+
[this, &latchPtr, &statsPtr, &i, callback, &failedResult](const ConsumerImplPtr& consumer) {
857+
size_t index = i++;
858+
auto weakSelf = weak_from_this();
859+
consumer->getBrokerConsumerStatsAsync(
860+
[this, weakSelf, latchPtr, statsPtr, index, callback, failedResult](
861+
Result result, const BrokerConsumerStats& stats) {
862+
auto self = weakSelf.lock();
863+
if (!self) {
864+
return;
865+
}
866+
if (result == ResultOk) {
867+
std::lock_guard<std::mutex> lock{mutex_};
868+
statsPtr->add(stats, index);
869+
} else {
870+
// Store the first failed result as the final failed result
871+
auto expected = ResultOk;
872+
failedResult->compare_exchange_strong(expected, result);
873+
}
874+
if (--*latchPtr == 0) {
875+
if (auto firstFailedResult = failedResult->load(std::memory_order_acquire);
876+
firstFailedResult == ResultOk) {
877+
callback(ResultOk, BrokerConsumerStats{statsPtr});
878+
} else {
879+
// Fail the whole operation if any of the consumers failed
880+
callback(firstFailedResult, {});
881+
}
882+
}
883+
});
863884
});
864-
});
865885
}
866886

867887
void MultiTopicsConsumerImpl::getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) {
868888
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
869889
}
870890

871-
void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res,
872-
const BrokerConsumerStats& brokerConsumerStats,
873-
const LatchPtr& latchPtr,
874-
const MultiTopicsBrokerConsumerStatsPtr& statsPtr,
875-
size_t index,
876-
const BrokerConsumerStatsCallback& callback) {
877-
Lock lock(mutex_);
878-
if (res == ResultOk) {
879-
latchPtr->countdown();
880-
statsPtr->add(brokerConsumerStats, index);
881-
} else {
882-
lock.unlock();
883-
callback(res, BrokerConsumerStats());
884-
return;
885-
}
886-
if (latchPtr->getCount() == 0) {
887-
lock.unlock();
888-
callback(ResultOk, BrokerConsumerStats(statsPtr));
889-
}
890-
}
891-
892891
std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) {
893892
TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();
894893

lib/MultiTopicsConsumerImpl.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include "ConsumerImpl.h"
2929
#include "ConsumerInterceptors.h"
3030
#include "Future.h"
31-
#include "Latch.h"
3231
#include "LookupDataResult.h"
3332
#include "SynchronizedHashMap.h"
3433
#include "TestUtil.h"
@@ -100,9 +99,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
10099
uint64_t getNumberOfConnectedConsumer() override;
101100
void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) override;
102101

103-
void handleGetConsumerStats(Result, const BrokerConsumerStats&, const LatchPtr&,
104-
const MultiTopicsBrokerConsumerStatsPtr&, size_t,
105-
const BrokerConsumerStatsCallback&);
106102
// return first topic name when all topics name valid, or return null pointer
107103
static std::shared_ptr<TopicName> topicNamesValid(const std::vector<std::string>& topics);
108104
void unsubscribeOneTopicAsync(const std::string& topic, const ResultCallback& callback);

tests/ConsumerTest.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "WaitUtils.h"
4141
#include "lib/ClientConnection.h"
4242
#include "lib/Future.h"
43+
#include "lib/Latch.h"
4344
#include "lib/LogUtils.h"
4445
#include "lib/MessageIdUtil.h"
4546
#include "lib/MultiTopicsConsumerImpl.h"

tests/MultiTopicsConsumerTest.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
#include <pulsar/Client.h>
2121

2222
#include <chrono>
23+
#include <future>
24+
#include <thread>
2325

2426
#include "ThreadSafeMessages.h"
2527
#include "lib/LogUtils.h"
28+
#include "lib/MockServer.h"
29+
#include "tests/PulsarFriend.h"
2630

2731
static const std::string lookupUrl = "pulsar://localhost:6650";
2832

@@ -142,3 +146,29 @@ TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
142146

143147
client.close();
144148
}
149+
150+
TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
151+
Client client{lookupUrl};
152+
std::vector<std::string> topics{"testGetConsumerStatsFail0", "testGetConsumerStatsFail1"};
153+
Consumer consumer;
154+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
155+
156+
auto connection = *PulsarFriend::getConnections(client).begin();
157+
auto mockServer = std::make_shared<MockServer>(connection);
158+
connection->attachMockServer(mockServer);
159+
160+
mockServer->setRequestDelay({{"CONSUMER_STATS", 3000}});
161+
auto future = std::async(std::launch::async, [&consumer]() {
162+
BrokerConsumerStats stats;
163+
return consumer.getBrokerConsumerStats(stats);
164+
});
165+
// Trigger the `getBrokerConsumerStats` in a new thread
166+
future.wait_for(std::chrono::milliseconds(100));
167+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
168+
169+
connection->handleKeepAliveTimeout();
170+
ASSERT_EQ(ResultDisconnected, future.get());
171+
172+
mockServer->close();
173+
client.close();
174+
}

0 commit comments

Comments
 (0)