Skip to content

Commit 058d099

Browse files
shibdBewareMyPower
andauthored
Complete batch receive other feature of C client (#254)
* Complete batch receive other feture * Fix code reviews. * Change notes. * Fix typo --------- Co-authored-by: Yunze Xu <xyzinfernity@163.com>
1 parent bc57b4d commit 058d099

6 files changed

Lines changed: 129 additions & 5 deletions

File tree

include/pulsar/c/consumer.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *);
3535

3636
typedef void (*pulsar_receive_callback)(pulsar_result result, pulsar_message_t *msg, void *ctx);
3737

38+
typedef void (*pulsar_batch_receive_callback)(pulsar_result result, pulsar_messages_t *msgs, void *ctx);
39+
3840
/**
3941
* @return the topic this consumer is subscribed to
4042
*/
@@ -119,6 +121,17 @@ PULSAR_PUBLIC void pulsar_consumer_receive_async(pulsar_consumer_t *consumer,
119121
PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer,
120122
pulsar_messages_t **msgs);
121123

124+
/**
125+
* Async batch receiving messages.
126+
*
127+
* @param callback
128+
* 1. When the result in the callback is `ResultOk`, `msgs` in the callback will point to the memory that
129+
* is allocated internally. You have to call `pulsar_messages_free` to free it.
130+
* 2. If the result in the callback is not `ResultOk`, `msgs` in the callback will be nullptr.
131+
*/
132+
PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer,
133+
pulsar_batch_receive_callback callback, void *ctx);
134+
122135
/**
123136
* Acknowledge the reception of a single message.
124137
*

include/pulsar/c/consumer_configuration.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ typedef enum
8989
pulsar_consumer_regex_sub_mode_AllTopics = 2
9090
} pulsar_consumer_regex_subscription_mode;
9191

92+
typedef struct {
93+
// Max num message, if less than 0, it means no limit.
94+
int maxNumMessage;
95+
// Max num bytes, if less than 0, it means no limit.
96+
long maxNumBytes;
97+
// If less than 0, it means no limit.
98+
long timeoutMs;
99+
} pulsar_consumer_batch_receive_policy_t;
100+
92101
/// Callback definition for MessageListener
93102
typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);
94103

@@ -328,6 +337,22 @@ PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode
328337
pulsar_consumer_configuration_get_regex_subscription_mode(
329338
pulsar_consumer_configuration_t *consumer_configuration);
330339

340+
/**
341+
* Set batch receive policy.
342+
*
343+
* The default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
344+
* @param consumer_configuration the consumer conf object.
345+
* @param maxNumMessage default: Max num message, if less than 0, it means no limit.
346+
* @param maxNumBytes Max num bytes, if less than 0, it means no limit.
347+
* @param timeoutMs If less than 0, it means no limit.
348+
*/
349+
PULSAR_PUBLIC void pulsar_consumer_configuration_set_batch_receive_policy(
350+
pulsar_consumer_configuration_t *consumer_configuration,
351+
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy);
352+
353+
PULSAR_PUBLIC pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy(
354+
pulsar_consumer_configuration_t *consumer_configuration);
355+
331356
// const CryptoKeyReaderPtr getCryptoKeyReader()
332357
//
333358
// const;

lib/c/c_Consumer.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer, pulsar_
7373
return (pulsar_result)res;
7474
}
7575

76+
void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback,
77+
void *ctx) {
78+
consumer->consumer.batchReceiveAsync([callback, ctx](pulsar::Result result, pulsar::Messages messages) {
79+
if (callback) {
80+
pulsar_messages_t *msgs = nullptr;
81+
if (result == pulsar::ResultOk) {
82+
msgs = new pulsar_messages_t;
83+
msgs->messages.resize(messages.size());
84+
for (size_t i = 0; i < messages.size(); i++) {
85+
msgs->messages[i].message = messages[i];
86+
}
87+
}
88+
callback((pulsar_result)result, msgs, ctx);
89+
}
90+
});
91+
}
92+
7693
static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
7794
pulsar_receive_callback callback, void *ctx) {
7895
if (callback) {

lib/c/c_ConsumerConfiguration.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,20 @@ pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_
250250
return (pulsar_consumer_regex_subscription_mode)
251251
consumer_configuration->consumerConfiguration.getRegexSubscriptionMode();
252252
}
253+
254+
void pulsar_consumer_configuration_set_batch_receive_policy(
255+
pulsar_consumer_configuration_t *consumer_configuration,
256+
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy_t) {
257+
pulsar::BatchReceivePolicy batchReceivePolicy(batch_receive_policy_t->maxNumMessage,
258+
batch_receive_policy_t->maxNumBytes,
259+
batch_receive_policy_t->timeoutMs);
260+
consumer_configuration->consumerConfiguration.setBatchReceivePolicy(batchReceivePolicy);
261+
}
262+
263+
pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy(
264+
pulsar_consumer_configuration_t *consumer_configuration) {
265+
pulsar::BatchReceivePolicy batchReceivePolicy =
266+
consumer_configuration->consumerConfiguration.getBatchReceivePolicy();
267+
return {batchReceivePolicy.getMaxNumMessages(), batchReceivePolicy.getMaxNumBytes(),
268+
batchReceivePolicy.getTimeoutMs()};
269+
}

tests/c/c_ConsumerConfigurationTest.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,14 @@ TEST(C_ConsumerConfigurationTest, testCApiConfig) {
4242
consumer_conf, pulsar_consumer_regex_sub_mode_NonPersistentOnly);
4343
ASSERT_EQ(pulsar_consumer_configuration_get_regex_subscription_mode(consumer_conf),
4444
pulsar_consumer_regex_sub_mode_NonPersistentOnly);
45+
46+
pulsar_consumer_batch_receive_policy_t batch_receive_policy{10, 1000, 1000};
47+
pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy);
48+
pulsar_consumer_batch_receive_policy_t get_batch_receive_policy =
49+
pulsar_consumer_configuration_get_batch_receive_policy(consumer_conf);
50+
ASSERT_EQ(get_batch_receive_policy.maxNumMessage, 10);
51+
ASSERT_EQ(get_batch_receive_policy.maxNumBytes, 1000);
52+
ASSERT_EQ(get_batch_receive_policy.timeoutMs, 1000);
53+
54+
pulsar_consumer_configuration_free(consumer_conf);
4555
}

tests/c/c_ConsumerTest.cc

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,38 @@
2323
#include <string.h>
2424
#include <time.h>
2525

26+
#include <future>
27+
2628
static const char *lookup_url = "pulsar://localhost:6650";
2729

30+
struct batch_receive_ctx {
31+
pulsar_consumer_t *consumer;
32+
std::promise<pulsar_result> *promise;
33+
int expect_receive_num;
34+
};
35+
36+
static void batch_receive_callback(pulsar_result async_result, pulsar_messages_t *msgs, void *ctx) {
37+
struct batch_receive_ctx *receive_ctx = (struct batch_receive_ctx *)ctx;
38+
receive_ctx->promise->set_value(async_result);
39+
if (async_result == pulsar_result_Ok) {
40+
ASSERT_EQ(pulsar_messages_size(msgs), receive_ctx->expect_receive_num);
41+
for (int i = 0; i < pulsar_messages_size(msgs); i++) {
42+
pulsar_message_t *msg = pulsar_messages_get(msgs, i);
43+
size_t length = pulsar_message_get_length(msg);
44+
char *str = (char *)malloc(pulsar_message_get_length(msg));
45+
strncpy(str, (const char *)pulsar_message_get_data(msg), length);
46+
47+
char expected_str[128];
48+
snprintf(expected_str, sizeof(expected_str), "msg-%d", 10 + i);
49+
printf("%d received: %s (%zd), expected: %s (%zd)\n", i, str, strlen(str), expected_str,
50+
strlen(expected_str));
51+
ASSERT_EQ(strcmp(str, expected_str), 0);
52+
free(str);
53+
}
54+
pulsar_messages_free(msgs);
55+
}
56+
}
57+
2858
TEST(c_ConsumerTest, testBatchReceive) {
2959
pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
3060
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
@@ -39,11 +69,16 @@ TEST(c_ConsumerTest, testBatchReceive) {
3969

4070
pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
4171
pulsar_consumer_t *consumer;
72+
73+
const int batch_receive_max_size = 10;
74+
pulsar_consumer_batch_receive_policy_t batch_receive_policy{batch_receive_max_size, -1, -1};
75+
pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy);
76+
4277
result = pulsar_client_subscribe(client, topic, "sub", consumer_conf, &consumer);
4378
ASSERT_EQ(pulsar_result_Ok, result);
4479

45-
const int num_messages = 10;
46-
for (int i = 0; i < num_messages; i++) {
80+
// Sending two more messages proves that the batch_receive_policy works.
81+
for (int i = 0; i < batch_receive_max_size * 2; i++) {
4782
pulsar_message_t *msg = pulsar_message_create();
4883
char buf[128];
4984
snprintf(buf, sizeof(buf), "msg-%d", i);
@@ -54,8 +89,8 @@ TEST(c_ConsumerTest, testBatchReceive) {
5489

5590
pulsar_messages_t *msgs = NULL;
5691
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_batch_receive(consumer, &msgs));
57-
ASSERT_EQ(pulsar_messages_size(msgs), num_messages);
58-
for (int i = 0; i < num_messages; i++) {
92+
ASSERT_EQ(pulsar_messages_size(msgs), batch_receive_max_size);
93+
for (int i = 0; i < batch_receive_max_size; i++) {
5994
pulsar_message_t *msg = pulsar_messages_get(msgs, i);
6095
size_t length = pulsar_message_get_length(msg);
6196
char *str = (char *)malloc(pulsar_message_get_length(msg));
@@ -69,9 +104,16 @@ TEST(c_ConsumerTest, testBatchReceive) {
69104

70105
free(str);
71106
}
107+
pulsar_messages_free(msgs);
108+
109+
std::promise<pulsar_result> receive_promise;
110+
std::future<pulsar_result> receive_future = receive_promise.get_future();
111+
struct batch_receive_ctx batch_receive_ctx = {consumer, &receive_promise, batch_receive_max_size};
112+
pulsar_consumer_batch_receive_async(consumer, batch_receive_callback, &batch_receive_ctx);
113+
pulsar_client_close(client);
114+
ASSERT_EQ(pulsar_result_Ok, receive_future.get());
72115

73116
pulsar_client_close(client);
74-
pulsar_messages_free(msgs);
75117
pulsar_consumer_free(consumer);
76118
pulsar_consumer_configuration_free(consumer_conf);
77119
pulsar_producer_free(producer);

0 commit comments

Comments
 (0)