Skip to content

Commit 86d66bd

Browse files
authored
[fix] Consumer batch receive will cause data loss. (#228)
* [fix] Consumer batch receive will cause data loss. * Support popIf on queue. * Fix code review.
1 parent d0d7e39 commit 86d66bd

File tree

7 files changed

+131
-28
lines changed

7 files changed

+131
-28
lines changed

lib/BlockingQueue.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,37 @@ class BlockingQueue {
120120
return true;
121121
}
122122

123+
/**
124+
* First peek data to the condition judgment, if true then pop it.
125+
*
126+
* @param value A reference to the value assigned after pop
127+
* @param condition A function that returns true if the value should be popped
128+
* @return true if the value was popped, false otherwise.
129+
*/
130+
bool popIf(T& value, std::function<bool(const T& peekValue)> condition) {
131+
Lock lock(mutex_);
132+
133+
if (isEmptyNoMutex() || isClosedNoMutex()) {
134+
return false;
135+
}
136+
137+
bool wasFull = isFullNoMutex();
138+
139+
auto peekValue = queue_.front();
140+
if (condition(peekValue)) {
141+
value = peekValue;
142+
queue_.pop_front();
143+
lock.unlock();
144+
if (wasFull) {
145+
// Notify that an element is popped
146+
queueFullCondition.notify_all();
147+
}
148+
return true;
149+
} else {
150+
return false;
151+
}
152+
}
153+
123154
// Check the 1st element of the queue
124155
bool peek(T& value) {
125156
Lock lock(mutex_);

lib/ConsumerImpl.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -665,10 +665,11 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
665665
void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
666666
auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
667667
batchReceivePolicy_.getMaxNumBytes());
668-
Message peekMsg;
669-
while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
670-
messageProcessed(peekMsg);
671-
Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), peekMsg);
668+
Message msg;
669+
while (incomingMessages_.popIf(
670+
msg, [&messages](const Message& peekMsg) { return messages->canAdd(peekMsg); })) {
671+
messageProcessed(msg);
672+
Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
672673
messages->add(interceptMsg);
673674
}
674675
auto self = get_shared_this_ptr();

lib/MultiTopicsConsumerImpl.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,10 +1054,11 @@ bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const {
10541054
void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
10551055
auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
10561056
batchReceivePolicy_.getMaxNumBytes());
1057-
Message peekMsg;
1058-
while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
1059-
messageProcessed(peekMsg);
1060-
messages->add(peekMsg);
1057+
Message msg;
1058+
while (incomingMessages_.popIf(
1059+
msg, [&messages](const Message& peekMsg) { return messages->canAdd(peekMsg); })) {
1060+
messageProcessed(msg);
1061+
messages->add(msg);
10611062
}
10621063
auto weakSelf = weak_from_this();
10631064
listenerExecutor_->postWork([weakSelf, callback, messages]() {

lib/UnboundedBlockingQueue.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,30 @@ class UnboundedBlockingQueue {
8787
return true;
8888
}
8989

90+
/**
91+
* First peek data to the condition judgment, if true then pop it.
92+
*
93+
* @param value A reference to the value assigned after pop
94+
* @param condition A function that returns true if the value should be popped
95+
* @return true if the value was popped, false otherwise.
96+
*/
97+
bool popIf(T& value, std::function<bool(const T& peekValue)> condition) {
98+
Lock lock(mutex_);
99+
100+
if (isEmptyNoMutex() || isClosedNoMutex()) {
101+
return false;
102+
}
103+
104+
auto peekValue = queue_.front();
105+
if (condition(peekValue)) {
106+
value = peekValue;
107+
queue_.pop_front();
108+
return true;
109+
} else {
110+
return false;
111+
}
112+
}
113+
90114
// Check the 1st element of the queue
91115
bool peek(T& value) {
92116
Lock lock(mutex_);

tests/BasicEndToEndTest.cc

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4084,12 +4084,11 @@ void testBatchReceive(bool multiConsumer) {
40844084
Client client(lookupUrl);
40854085

40864086
std::string uniqueChunk = unique_str();
4087-
std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
4087+
std::string topicName = "test-batch-receive" + uniqueChunk + std::to_string(multiConsumer);
40884088

40894089
if (multiConsumer) {
40904090
// call admin api to make it partitioned
4091-
std::string url =
4092-
adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
4091+
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
40934092
int res = makePutRequest(url, "5");
40944093
LOG_INFO("res = " << res);
40954094
ASSERT_FALSE(res != 204 && res != 409);
@@ -4107,8 +4106,9 @@ void testBatchReceive(bool multiConsumer) {
41074106
Consumer consumer;
41084107
ConsumerConfiguration consumerConfig;
41094108
// when receiver queue size > maxNumMessages, use receiver queue size.
4109+
const int batchReceiveMaxNumMessages = 10;
41104110
consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
4111-
consumerConfig.setReceiverQueueSize(10);
4111+
consumerConfig.setReceiverQueueSize(batchReceiveMaxNumMessages);
41124112
consumerConfig.setProperty("consumer-name", "test-consumer-name");
41134113
consumerConfig.setProperty("consumer-id", "test-consumer-id");
41144114
Promise<Result, Consumer> consumerPromise;
@@ -4120,32 +4120,41 @@ void testBatchReceive(bool multiConsumer) {
41204120

41214121
// sync batch receive test
41224122
std::string prefix = "batch-receive-msg";
4123-
int numOfMessages = 10;
4123+
int numOfMessages = 100;
41244124
for (int i = 0; i < numOfMessages; i++) {
41254125
std::string messageContent = prefix + std::to_string(i);
41264126
Message msg = MessageBuilder().setContent(messageContent).build();
4127-
producer.send(msg);
4127+
producer.sendAsync(msg, NULL);
4128+
}
4129+
ASSERT_EQ(ResultOk, producer.flush());
4130+
for (int i = 0; i < numOfMessages / batchReceiveMaxNumMessages; i++) {
4131+
Messages messages;
4132+
Result receive = consumer.batchReceive(messages);
4133+
ASSERT_EQ(receive, ResultOk);
4134+
ASSERT_EQ(messages.size(), batchReceiveMaxNumMessages);
4135+
for (const auto &item : messages) {
4136+
consumer.acknowledge(item);
4137+
}
41284138
}
41294139

4130-
Messages messages;
4131-
Result receive = consumer.batchReceive(messages);
4132-
ASSERT_EQ(receive, ResultOk);
4133-
ASSERT_EQ(messages.size(), numOfMessages);
4134-
41354140
// async batch receive test
4136-
Latch latch(1);
4137-
BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
4138-
ASSERT_EQ(result, ResultOk);
4139-
ASSERT_EQ(messages.size(), numOfMessages);
4140-
latch.countdown();
4141-
};
4142-
consumer.batchReceiveAsync(batchReceiveCallback);
41434141
for (int i = 0; i < numOfMessages; i++) {
41444142
std::string messageContent = prefix + std::to_string(i);
41454143
Message msg = MessageBuilder().setContent(messageContent).build();
4146-
producer.send(msg);
4144+
producer.sendAsync(msg, NULL);
4145+
}
4146+
ASSERT_EQ(ResultOk, producer.flush());
4147+
for (int i = 0; i < numOfMessages / batchReceiveMaxNumMessages; i++) {
4148+
Latch latch(1);
4149+
BatchReceiveCallback batchReceiveCallback = [&latch, batchReceiveMaxNumMessages](Result result,
4150+
Messages messages) {
4151+
ASSERT_EQ(result, ResultOk);
4152+
ASSERT_EQ(messages.size(), batchReceiveMaxNumMessages);
4153+
latch.countdown();
4154+
};
4155+
consumer.batchReceiveAsync(batchReceiveCallback);
4156+
ASSERT_TRUE(latch.wait(std::chrono::seconds(1)));
41474157
}
4148-
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
41494158

41504159
producer.close();
41514160
consumer.close();

tests/BlockingQueueTest.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ TEST(BlockingQueueTest, testBlockingProducer) {
126126
ASSERT_EQ(three, queue.size());
127127
}
128128

129+
TEST(BlockingQueueTest, testPopIf) {
130+
size_t size = 5;
131+
BlockingQueue<int> queue(size);
132+
133+
for (int i = 1; i <= size; ++i) {
134+
queue.push(i);
135+
}
136+
137+
// Use producer worker to assert popIf will notify queueFull thread.
138+
ProducerWorker producerWorker(queue);
139+
producerWorker.produce(3);
140+
141+
int value;
142+
for (int i = 1; i <= size; ++i) {
143+
ASSERT_TRUE(queue.popIf(value, [&i](const int& peekValue) { return peekValue == i; }));
144+
}
145+
146+
producerWorker.join();
147+
ASSERT_EQ(3, queue.size());
148+
}
149+
129150
TEST(BlockingQueueTest, testBlockingConsumer) {
130151
size_t size = 5;
131152
BlockingQueue<int> queue(size);

tests/UnboundedBlockingQueueTest.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,22 @@ TEST(UnboundedBlockingQueueTest, testQueueOperations) {
112112
ASSERT_FALSE(queue.peek(poppedElement));
113113
}
114114

115+
TEST(UnboundedBlockingQueueTest, testPopIf) {
116+
size_t size = 5;
117+
UnboundedBlockingQueue<int> queue(size);
118+
119+
for (int i = 1; i <= size * 2; ++i) {
120+
queue.push(i);
121+
}
122+
123+
int value;
124+
for (int i = 1; i <= size; ++i) {
125+
ASSERT_TRUE(queue.popIf(value, [&i](const int& peekValue) { return peekValue == i; }));
126+
}
127+
128+
ASSERT_EQ(size, queue.size());
129+
}
130+
115131
TEST(UnboundedBlockingQueueTest, testBlockingProducer) {
116132
size_t size = 5;
117133
UnboundedBlockingQueue<int> queue(size);

0 commit comments

Comments
 (0)