From 656d267c75ae02f84d3c523b28b77ea40a7f7ff7 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sun, 28 Sep 2025 10:18:22 +0800 Subject: [PATCH] Support set batchingMaxAllowedSizeInBytes on producer batch configuration --- index.d.ts | 1 + src/ProducerConfig.cc | 11 +++++++ tests/producer.test.js | 74 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/index.d.ts b/index.d.ts index 72c89af..e9bf8e8 100644 --- a/index.d.ts +++ b/index.d.ts @@ -69,6 +69,7 @@ export interface ProducerConfig { accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; messageRouter?: MessageRouter; + batchingMaxAllowedSizeInBytes?: number; } export class Producer { diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 3889120..83afb9c 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -40,6 +40,7 @@ static const std::string CFG_COMPRESS_TYPE = "compressionType"; static const std::string CFG_BATCH_ENABLED = "batchingEnabled"; static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs"; static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages"; +static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES = "batchingMaxAllowedSizeInBytes"; static const std::string CFG_SCHEMA = "schema"; static const std::string CFG_PROPS = "properties"; static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath"; @@ -201,6 +202,16 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { } } + if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) && + producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) { + int64_t batchingMaxAllowedSizeInBytes = + producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value(); + if (batchingMaxAllowedSizeInBytes > 0) { + pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes( + this->cProducerConfig.get(), (unsigned long)batchingMaxAllowedSizeInBytes); + } + } + if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) { SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject()); schemaInfo->SetProducerSchema(this->cProducerConfig); diff --git a/tests/producer.test.js b/tests/producer.test.js index d094505..061d827 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -239,5 +239,79 @@ function getPartition(msgId) { expect(partitions.size).toBe(1); }, 30000); }); + describe('Batching', () => { + function getBatchIndex(msgId) { + const parts = msgId.toString().split(':'); + if (parts.length > 3) { + return Number(parts[3]); + } + return -1; + } + + test('should batch messages based on max allowed size in bytes', async () => { + const topicName = `persistent://public/default/test-batch-size-in-bytes-${Date.now()}`; + const subName = 'subscription-name'; + const numOfMessages = 30; + const prefix = '12345678'; // 8 bytes message prefix + + let producer; + let consumer; + + try { + // 1. Setup Producer with batching enabled and size limit + producer = await client.createProducer({ + topic: topicName, + compressionType: 'LZ4', + batchingEnabled: true, + batchingMaxMessages: 10000, + batchingMaxAllowedSizeInBytes: 20, + }); + + // 2. Setup Consumer + consumer = await client.subscribe({ + topic: topicName, + subscription: subName, + }); + + // 3. Send messages asynchronously + const sendPromises = []; + for (let i = 0; i < numOfMessages; i += 1) { + const messageContent = prefix + i; + const msg = { + data: Buffer.from(messageContent), + properties: { msgIndex: String(i) }, + }; + sendPromises.push(producer.send(msg)); + } + await producer.flush(); + await Promise.all(sendPromises); + + // 4. Receive messages and run assertions + let receivedCount = 0; + for (let i = 0; i < numOfMessages; i += 1) { + const receivedMsg = await consumer.receive(5000); + const expectedMessageContent = prefix + i; + + // Assert that batchIndex is 0 or 1, since batch size should be 2 + const batchIndex = getBatchIndex(receivedMsg.getMessageId()); + expect(batchIndex).toBeLessThan(2); + + // Assert message properties and content + expect(receivedMsg.getProperties().msgIndex).toBe(String(i)); + expect(receivedMsg.getData().toString()).toBe(expectedMessageContent); + + await consumer.acknowledge(receivedMsg); + receivedCount += 1; + } + + // 5. Final check on the number of consumed messages + expect(receivedCount).toBe(numOfMessages); + } finally { + // 6. Cleanup + if (producer) await producer.close(); + if (consumer) await consumer.close(); + } + }, 30000); + }); }); })();