From 47671c5125fdb7d75b8293a3058bd365c9dbe2a7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 17:26:48 +0800 Subject: [PATCH 01/10] Support custom message router for partitioned topic producer --- index.d.ts | 18 ++++++++++++++++++ src/Producer.cc | 4 +++- src/Producer.h | 5 +++++ src/ProducerConfig.cc | 31 +++++++++++++++++++++++++++++++ src/ProducerConfig.h | 7 +++++++ 5 files changed, 64 insertions(+), 1 deletion(-) diff --git a/index.d.ts b/index.d.ts index 270c5e67..1ca4be22 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,7 @@ export interface ProducerConfig { schema?: SchemaInfo; accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; + messageRouter?: MessageRouter; } export class Producer { @@ -176,6 +177,23 @@ export class MessageId { toString(): string; } +export interface TopicMetadata { + numPartitions: number; +} + +/** + * A custom message router interface that can be implemented by the user. + */ +export interface MessageRouter { + /** + * Choose a partition for the given message. + * @param message The message to be routed. + * @param topicMetadata Metadata for the topic. + * @returns The partition index to send the message to. + */ + getPartition(message: ProducerMessage, topicMetadata: TopicMetadata): number; +} + export interface SchemaInfo { schemaType: SchemaType; name?: string; diff --git a/src/Producer.cc b/src/Producer.cc index c827f9f8..5874b220 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt auto instanceContext = static_cast(ctx); auto deferred = instanceContext->deferred; auto cClient = instanceContext->cClient; + auto producerConfig = instanceContext->producerConfig; delete instanceContext; if (result != pulsar_result_Ok) { @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt std::shared_ptr cProducer(rawProducer, pulsar_producer_free); - deferred->Resolve([cProducer](const Napi::Env env) { + deferred->Resolve([cProducer, producerConfig](const Napi::Env env) { Napi::Object obj = Producer::constructor.New({}); Producer *producer = Producer::Unwrap(obj); producer->SetCProducer(cProducer); + producer->producerConfig = producerConfig; return obj; }); }, diff --git a/src/Producer.h b/src/Producer.h index 70c23420..98849edb 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include "ProducerConfig.h" class Producer : public Napi::ObjectWrap { public: @@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap { private: std::shared_ptr cProducer; + // Extend the lifetime of the producer config since it's env and router function could be used when sending + // messages + std::shared_ptr producerConfig; Napi::Value Send(const Napi::CallbackInfo &info); Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 2c704bfd..424a669c 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -18,8 +18,13 @@ */ #include "SchemaInfo.h" #include "ProducerConfig.h" +#include "Message.h" #include +#include "napi-inl.h" +#include "napi.h" #include "pulsar/ProducerConfiguration.h" +#include "pulsar/c/message.h" +#include "pulsar/c/message_router.h" static const std::string CFG_TOPIC = "topic"; static const std::string CFG_PRODUCER_NAME = "producerName"; @@ -42,6 +47,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction"; static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled"; static const std::string CFG_ACCESS_MODE = "accessMode"; static const std::string CFG_BATCHING_TYPE = "batchingType"; +static const std::string CFG_MESSAGE_ROUTER = "messageRouter"; struct _pulsar_producer_configuration { pulsar::ProducerConfiguration conf; @@ -82,6 +88,19 @@ static std::map PRODUC {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, }; +static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metadata, void* ctx) { + auto router = static_cast(ctx); + const auto& env = router->Env(); + auto jsMessage = Message::NewInstance(Napi::Object::New(env), + std::shared_ptr(msg, [](pulsar_message_t*) {})); + int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata); + + Napi::Object jsTopicMetadata = Napi::Object::New(env); + jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions)); + + return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value(); +} + ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -224,6 +243,18 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { if (PRODUCER_BATCHING_TYPE.count(batchingType)) { this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType)); } + + if (producerConfig.Has(CFG_MESSAGE_ROUTER)) { + auto value = producerConfig.Get(CFG_MESSAGE_ROUTER); + if (value.IsFunction()) { + messageRouter = Napi::Persistent(value.As()); + pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition, + &messageRouter); + } else { + Napi::TypeError::New(producerConfig.Env(), "messageRouter should be a function") + .ThrowAsJavaScriptException(); + } + } } ProducerConfig::~ProducerConfig() {} diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h index 3d495570..0437768c 100644 --- a/src/ProducerConfig.h +++ b/src/ProducerConfig.h @@ -22,6 +22,11 @@ #include #include +#include + +struct MessageRouterContext { + Napi::FunctionReference messageRouter; +}; class ProducerConfig { public: @@ -33,6 +38,8 @@ class ProducerConfig { private: std::shared_ptr cProducerConfig; std::string topic; + std::unique_ptr routerContext; + Napi::FunctionReference messageRouter; }; #endif From 327f9c58d79b9c082c9c19535cb744b0eb437a9b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 18:54:12 +0800 Subject: [PATCH 02/10] Add test --- tests/producer.test.js | 60 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/producer.test.js b/tests/producer.test.js index e6908cb8..177a62d9 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,6 +18,9 @@ */ const Pulsar = require('../index'); +const httpRequest = require('./http_utils'); + +const adminUrl = 'http://localhost:8080'; (() => { describe('Producer', () => { @@ -156,5 +159,62 @@ const Pulsar = require('../index'); await producer2.close(); }); }); + describe('Message Routing', () => { + test('Custom Message Router', async () => { + const topic = `test-custom-router-${Date.now()}`; + const numPartitions = 3; + + // Create a partitioned topic via admin REST API + const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${topic}/partitions`; + const response = await httpRequest( + partitionedTopicAdminURL, { + headers: { + 'Content-Type': 'application/json', + }, + data: numPartitions, + method: 'PUT', + }, + ); + expect(response.statusCode).toBe(204); + + const producer = await client.createProducer({ + topic: topic, + batchingMaxMessages: 2, + messageRouter: (message, topicMetadata) => { + console.log(`key: ${message.getPartitionKey()}, partitions: ${topicMetadata.numPartitions}`); + return parseInt(message.getPartitionKey()) % topicMetadata.numPartitions; + }, + messageRoutingMode: 'CustomPartition', + }); + + const promises = []; + const numMessages = 5; + for (let i = 0; i < numMessages; i += 1) { + const sendPromise = producer.send({ + partitionKey: `${i}`, + data: Buffer.from(`msg-${i}`), + }).then(msgId => { + // You can log the result here inside the .then() + console.log(`Message sent: ${msgId}`); + return msgId; // Pass the result along + }); + + await sendPromise; + promises.push(sendPromise); + } + try { + const allMsgIds = await Promise.all(promises); + console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`); + for (let i = 0; i < allMsgIds.length; i += 1) { + // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" + const partition = Number(allMsgIds[i].toString().split(',')[2]) + assert(partition === i % numPartitions); + } + } catch (error) { + console.error("One or more messages failed to send:", error); + } + + }, 30000); + }); }); })(); From bbcf6edaa2a661010758463ba5f79e62961eabb1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 18:59:42 +0800 Subject: [PATCH 03/10] Fix lint --- tests/producer.test.js | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/tests/producer.test.js b/tests/producer.test.js index 177a62d9..4be3dd14 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -178,11 +178,11 @@ const adminUrl = 'http://localhost:8080'; expect(response.statusCode).toBe(204); const producer = await client.createProducer({ - topic: topic, + topic, batchingMaxMessages: 2, messageRouter: (message, topicMetadata) => { - console.log(`key: ${message.getPartitionKey()}, partitions: ${topicMetadata.numPartitions}`); - return parseInt(message.getPartitionKey()) % topicMetadata.numPartitions; + console.log(`key: ${message.getPartitionKey()}, partitions: ${topicMetadata.numPartitions}`); + return parseInt(message.getPartitionKey(), 10) % topicMetadata.numPartitions; }, messageRoutingMode: 'CustomPartition', }); @@ -190,30 +190,24 @@ const adminUrl = 'http://localhost:8080'; const promises = []; const numMessages = 5; for (let i = 0; i < numMessages; i += 1) { - const sendPromise = producer.send({ - partitionKey: `${i}`, - data: Buffer.from(`msg-${i}`), - }).then(msgId => { - // You can log the result here inside the .then() - console.log(`Message sent: ${msgId}`); - return msgId; // Pass the result along - }); - - await sendPromise; - promises.push(sendPromise); + const sendPromise = producer.send({ + partitionKey: `${i}`, + data: Buffer.from(`msg-${i}`), + }); + await sendPromise; + promises.push(sendPromise); } try { - const allMsgIds = await Promise.all(promises); - console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`); - for (let i = 0; i < allMsgIds.length; i += 1) { - // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" - const partition = Number(allMsgIds[i].toString().split(',')[2]) - assert(partition === i % numPartitions); - } + const allMsgIds = await Promise.all(promises); + console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`); + for (let i = 0; i < allMsgIds.length; i += 1) { + // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" + const partition = Number(allMsgIds[i].toString().split(',')[2]); + expect(i % numPartitions).toBe(partition); + } } catch (error) { - console.error("One or more messages failed to send:", error); + console.error('One or more messages failed to send:', error); } - }, 30000); }); }); From b01db3d9a0ca52d6f8e0cd8f924f7be437b0b53e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 19:09:13 +0800 Subject: [PATCH 04/10] simplify code --- tests/http_utils.js | 16 +++++++++++++++- tests/producer.test.js | 15 +-------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/http_utils.js b/tests/http_utils.js index 8fa94f23..81d09de8 100644 --- a/tests/http_utils.js +++ b/tests/http_utils.js @@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new Promise((resolve, r req.end(); }); -module.exports = request; +function createPartitionedTopic(topic, numPartitions) { + const url = `http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`; + return request(url, { + headers: { + 'Content-Type': 'application/json', + }, + data: numPartitions, + method: 'PUT', + }); +} + +module.exports = { + createPartitionedTopic, + request, +}; diff --git a/tests/producer.test.js b/tests/producer.test.js index 4be3dd14..7a474a41 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -20,8 +20,6 @@ const Pulsar = require('../index'); const httpRequest = require('./http_utils'); -const adminUrl = 'http://localhost:8080'; - (() => { describe('Producer', () => { let client; @@ -163,18 +161,7 @@ const adminUrl = 'http://localhost:8080'; test('Custom Message Router', async () => { const topic = `test-custom-router-${Date.now()}`; const numPartitions = 3; - - // Create a partitioned topic via admin REST API - const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${topic}/partitions`; - const response = await httpRequest( - partitionedTopicAdminURL, { - headers: { - 'Content-Type': 'application/json', - }, - data: numPartitions, - method: 'PUT', - }, - ); + const response = await httpRequest.createPartitionedTopic(topic, numPartitions); expect(response.statusCode).toBe(204); const producer = await client.createProducer({ From 04c218a514e4f76d38f1131e7ed7a8abbf4dc8dd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 19:44:20 +0800 Subject: [PATCH 05/10] Add tests for exceptional cases --- src/ProducerConfig.cc | 9 ++++++++- tests/producer.test.js | 26 ++++++++++++++++++++------ tests/reader.test.js | 4 ++-- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 424a669c..5b251379 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -19,6 +19,7 @@ #include "SchemaInfo.h" #include "ProducerConfig.h" #include "Message.h" +#include #include #include "napi-inl.h" #include "napi.h" @@ -98,7 +99,13 @@ static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metad Napi::Object jsTopicMetadata = Napi::Object::New(env); jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions)); - return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value(); + try { + return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value(); + } catch (const Napi::Error& e) { + // TODO: how to handle the error properly? For now, return an invalid partition to fail the send + fprintf(stderr, "Error when calling messageRouter: %s\n", e.what()); + return numPartitions; + } } ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { diff --git a/tests/producer.test.js b/tests/producer.test.js index 7a474a41..3d90cbdc 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,7 +18,7 @@ */ const Pulsar = require('../index'); -const httpRequest = require('./http_utils'); +const httpUtils = require('./http_utils'); (() => { describe('Producer', () => { @@ -161,16 +161,14 @@ const httpRequest = require('./http_utils'); test('Custom Message Router', async () => { const topic = `test-custom-router-${Date.now()}`; const numPartitions = 3; - const response = await httpRequest.createPartitionedTopic(topic, numPartitions); + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); expect(response.statusCode).toBe(204); const producer = await client.createProducer({ topic, batchingMaxMessages: 2, - messageRouter: (message, topicMetadata) => { - console.log(`key: ${message.getPartitionKey()}, partitions: ${topicMetadata.numPartitions}`); - return parseInt(message.getPartitionKey(), 10) % topicMetadata.numPartitions; - }, + messageRouter: (message, topicMetadata) => parseInt(message.getPartitionKey(), 10) + % topicMetadata.numPartitions, messageRoutingMode: 'CustomPartition', }); @@ -196,6 +194,22 @@ const httpRequest = require('./http_utils'); console.error('One or more messages failed to send:', error); } }, 30000); + test('Exception in router', async () => { + const topic = `test-exception-in-router-${Date.now()}`; + const numPartitions = 2; + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); + expect(response.statusCode).toBe(204); + const producer = await client.createProducer({ + topic, + messageRouter: (message, topicMetadata) => { + throw new Error('Custom error in message router'); + }, + messageRoutingMode: 'CustomPartition', + }); + await expect( + producer.send({ data: Buffer.from('test') }), + ).rejects.toThrow('Failed to send message: UnknownError'); + }, 30000); }); }); })(); diff --git a/tests/reader.test.js b/tests/reader.test.js index 56d1b48e..fb0842b8 100644 --- a/tests/reader.test.js +++ b/tests/reader.test.js @@ -19,7 +19,7 @@ const lodash = require('lodash'); const Pulsar = require('../index'); -const httpRequest = require('./http_utils'); +const httpUtils = require('./http_utils'); const baseUrl = 'http://localhost:8080'; @@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080'; const partitionedTopicName = 'test-reader-partitioned-topic'; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; - const createPartitionedTopicRes = await httpRequest( + const createPartitionedTopicRes = await httpUtils.request( partitionedTopicAdminURL, { headers: { 'Content-Type': 'text/plain', From 11c910b98430759648d703983abdee88db56fdd3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 20:00:29 +0800 Subject: [PATCH 06/10] Fix router signature --- index.d.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.d.ts b/index.d.ts index 1ca4be22..69dbef7f 100644 --- a/index.d.ts +++ b/index.d.ts @@ -191,7 +191,7 @@ export interface MessageRouter { * @param topicMetadata Metadata for the topic. * @returns The partition index to send the message to. */ - getPartition(message: ProducerMessage, topicMetadata: TopicMetadata): number; + getPartition(message: Message, topicMetadata: TopicMetadata): number; } export interface SchemaInfo { From 9737c5a230d97fd5ec1a67957b5b0c7248c4dc07 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 20:04:47 +0800 Subject: [PATCH 07/10] Fix interface --- index.d.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/index.d.ts b/index.d.ts index 69dbef7f..9d07d9d8 100644 --- a/index.d.ts +++ b/index.d.ts @@ -181,18 +181,7 @@ export interface TopicMetadata { numPartitions: number; } -/** - * A custom message router interface that can be implemented by the user. - */ -export interface MessageRouter { - /** - * Choose a partition for the given message. - * @param message The message to be routed. - * @param topicMetadata Metadata for the topic. - * @returns The partition index to send the message to. - */ - getPartition(message: Message, topicMetadata: TopicMetadata): number; -} +export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number; export interface SchemaInfo { schemaType: SchemaType; From 3c91fd291b281049af6dcb99bdf1af0b691ea0ef Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 20:16:04 +0800 Subject: [PATCH 08/10] Fix tests --- tests/client.test.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/client.test.js b/tests/client.test.js index f7bc6d5f..d97763e9 100644 --- a/tests/client.test.js +++ b/tests/client.test.js @@ -17,7 +17,7 @@ * under the License. */ -const httpRequest = require('./http_utils'); +const httpUtils = require('./http_utils'); const Pulsar = require('../index'); const baseUrl = 'http://localhost:8080'; @@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080'; const nonPartitionedTopicName = 'test-non-partitioned-topic'; const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`; const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`; - const createNonPartitionedTopicRes = await httpRequest( + const createNonPartitionedTopicRes = await httpUtils.request( nonPartitionedTopicAdminURL, { headers: { 'Content-Type': 'application/json', @@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080'; const partitionedTopicName = 'test-partitioned-topic-1'; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; - const createPartitionedTopicRes = await httpRequest( + const createPartitionedTopicRes = await httpUtils.request( partitionedTopicAdminURL, { headers: { 'Content-Type': 'text/plain', @@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080'; 'persistent://public/default/test-partitioned-topic-1-partition-3', ]); - const deleteNonPartitionedTopicRes = await httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' }); + const deleteNonPartitionedTopicRes = await httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' }); expect(deleteNonPartitionedTopicRes.statusCode).toBe(204); - const deletePartitionedTopicRes = await httpRequest(partitionedTopicAdminURL, { method: 'DELETE' }); + const deletePartitionedTopicRes = await httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' }); expect(deletePartitionedTopicRes.statusCode).toBe(204); await client.close(); From 47d45f2ea5b22a882742692fe7567d39855530cd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 20:31:41 +0800 Subject: [PATCH 09/10] Add documents --- index.d.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/index.d.ts b/index.d.ts index 9d07d9d8..72c89af9 100644 --- a/index.d.ts +++ b/index.d.ts @@ -181,6 +181,17 @@ export interface TopicMetadata { numPartitions: number; } +/** + * @callback MessageRouter + * @description When producing messages to a partitioned topic, this router is used to select the + * target partition for each message. The router only works when the `messageRoutingMode` is set to + * `CustomPartition`. Please note that `getTopicName()` cannot be called on the `message`, otherwise + * the behavior will be undefined because the topic is unknown before sending the message. + * @param message The message to be routed. + * @param topicMetadata Metadata for the partitioned topic the message is being routed to. + * @returns {number} The index of the target partition (must be a number between 0 and + * topicMetadata.numPartitions - 1). + */ export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number; export interface SchemaInfo { From 610c94c30550bdbf56e17a232a68a61e1f7ffe81 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 26 Sep 2025 21:23:56 +0800 Subject: [PATCH 10/10] Test conflicts of messageRoutingMode and messageRouter --- src/ProducerConfig.cc | 7 +++---- tests/producer.test.js | 30 +++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 5b251379..3889120e 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -157,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(), blockIfQueueFull); } + bool useCustomPartition = false; if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); + useCustomPartition = (messageRoutingMode == "CustomPartition"); if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(), MESSAGE_ROUTING_MODE.at(messageRoutingMode)); @@ -251,15 +253,12 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType)); } - if (producerConfig.Has(CFG_MESSAGE_ROUTER)) { + if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) { auto value = producerConfig.Get(CFG_MESSAGE_ROUTER); if (value.IsFunction()) { messageRouter = Napi::Persistent(value.As()); pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition, &messageRouter); - } else { - Napi::TypeError::New(producerConfig.Env(), "messageRouter should be a function") - .ThrowAsJavaScriptException(); } } } diff --git a/tests/producer.test.js b/tests/producer.test.js index 3d90cbdc..d0945053 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -20,6 +20,11 @@ const Pulsar = require('../index'); const httpUtils = require('./http_utils'); +function getPartition(msgId) { + // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" + return Number(msgId.toString().split(',')[2]); +} + (() => { describe('Producer', () => { let client; @@ -187,7 +192,7 @@ const httpUtils = require('./http_utils'); console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`); for (let i = 0; i < allMsgIds.length; i += 1) { // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" - const partition = Number(allMsgIds[i].toString().split(',')[2]); + const partition = getPartition(allMsgIds[i]); expect(i % numPartitions).toBe(partition); } } catch (error) { @@ -210,6 +215,29 @@ const httpUtils = require('./http_utils'); producer.send({ data: Buffer.from('test') }), ).rejects.toThrow('Failed to send message: UnknownError'); }, 30000); + test('Not CustomPartition', async () => { + const topic = `test-not-custom-part-${Date.now()}`; + const numPartitions = 2; + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); + expect(response.statusCode).toBe(204); + + let index = 0; + const producer = await client.createProducer({ + topic, + messageRouter: (_, topicMetadata) => { + const result = index % topicMetadata.numPartitions; + index += 1; + return result; + }, + messageRoutingMode: 'UseSinglePartition', + }); + const partitions = new Set(); + for (let i = 0; i < 10; i += 1) { + const msgId = await producer.send({ data: Buffer.from('msg') }); + partitions.add(getPartition(msgId)); + } + expect(partitions.size).toBe(1); + }, 30000); }); }); })();