From ab5580363d3067adf924930e527f8b57c9c3dbe8 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 25 Sep 2025 22:17:31 +0800 Subject: [PATCH 1/7] Support custome router for producer --- index.d.ts | 13 ++++ src/Client.js | 6 +- src/Producer.cc | 4 +- src/Producer.h | 2 + src/Producer.js | 142 +++++++++++++++++++++++++++++++++++++++++ src/ProducerConfig.cc | 25 +++++++- tests/producer.test.js | 97 ++++++++++++++++++++++++++++ 7 files changed, 285 insertions(+), 4 deletions(-) create mode 100644 src/Producer.js diff --git a/index.d.ts b/index.d.ts index 270c5e67..fd8961b6 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,7 @@ export interface ProducerConfig { schema?: SchemaInfo; accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; + messageRouter?: MessageRouter; } export class Producer { @@ -176,6 +177,18 @@ export class MessageId { toString(): string; } +export interface TopicMetadata { + numPartitions: number; +} + +/** + * A custom message router function that can be implemented by the user. + * @param message The message to be routed. + * @param topicMetadata Metadata for the topic. + * @returns The partition index to send the message to, or a Promise that resolves to it. + */ +export type MessageRouter = (message: ProducerMessage, topicMetadata: TopicMetadata) => number | Promise; + export interface SchemaInfo { schemaType: SchemaType; name?: string; diff --git a/src/Client.js b/src/Client.js index f51b1b6a..1cf9e04c 100644 --- a/src/Client.js +++ b/src/Client.js @@ -20,6 +20,7 @@ const fs = require('fs'); const tls = require('tls'); const os = require('os'); const PulsarBinding = require('./pulsar-binding'); +const Producer = require('./Producer'); const certsFilePath = `${__dirname}/cert.pem`; @@ -32,8 +33,9 @@ class Client { this.client = new PulsarBinding.Client(params); } - createProducer(params) { - return this.client.createProducer(params); + async createProducer(params) { + const addonProducer = await this.client.createProducer(params); + return new Producer(this, addonProducer, params); } subscribe(params) { diff --git a/src/Producer.cc b/src/Producer.cc index c827f9f8..5874b220 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt auto instanceContext = static_cast(ctx); auto deferred = instanceContext->deferred; auto cClient = instanceContext->cClient; + auto producerConfig = instanceContext->producerConfig; delete instanceContext; if (result != pulsar_result_Ok) { @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt std::shared_ptr cProducer(rawProducer, pulsar_producer_free); - deferred->Resolve([cProducer](const Napi::Env env) { + deferred->Resolve([cProducer, producerConfig](const Napi::Env env) { Napi::Object obj = Producer::constructor.New({}); Producer *producer = Producer::Unwrap(obj); producer->SetCProducer(cProducer); + producer->producerConfig = producerConfig; return obj; }); }, diff --git a/src/Producer.h b/src/Producer.h index 70c23420..1d52a58a 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -23,6 +23,7 @@ #include #include #include +#include "ProducerConfig.h" class Producer : public Napi::ObjectWrap { public: @@ -35,6 +36,7 @@ class Producer : public Napi::ObjectWrap { private: std::shared_ptr cProducer; + std::shared_ptr producerConfig; Napi::Value Send(const Napi::CallbackInfo &info); Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/Producer.js b/src/Producer.js new file mode 100644 index 00000000..41339c58 --- /dev/null +++ b/src/Producer.js @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @typedef {import('../index').ProducerMessage} ProducerMessage + * @typedef {import('../index').MessageId} MessageId + * @typedef {import('../index').ProducerConfig} ProducerConfig + * @typedef {import('../index').Client} PulsarClient + * @typedef {import('../index').Producer} AddonProducer + */ + +const PARTITION_PROP_KEY = '__partition__'; +const CACHE_TTL_MS = 60 * 1000; + +class Producer { + /** + * This class is a JavaScript wrapper around the C++ N-API Producer object. + * It should not be instantiated by users directly. + * @param {PulsarClient} client + * @param {Producer.h} addonProducer - The native addon producer instance. + * @param {ProducerConfig} config - The original producer configuration object. + */ + constructor(client, addonProducer, config) { + /** @private */ + this.client = client; + /** @private */ + this.addonProducer = addonProducer; + /** @private */ + this.producerConfig = config; + /** @private */ + this.numPartitions = undefined; + /** @private */ + this.partitionsCacheTimestamp = 0; + } + + /** + * Sends a message. If a custom message router was provided, it is called first + * to determine the partition before passing the message to the C++ addon. + * @param {ProducerMessage} message - The message object to send. + * @returns {Promise} A promise that resolves with the MessageId of the sent message. + */ + async send(message) { + // 1. Create a shallow copy of the message parameter at the beginning. + const finalMessage = { ...message }; + const config = this.producerConfig; + + // Check if custom routing mode is enabled + if (config.messageRoutingMode === 'CustomPartition') { + if (typeof config.messageRouter === 'function') { + const numPartitions = await this.getNumPartitions(); + const topicMetadata = { numPartitions }; + const partitionIndex = config.messageRouter(finalMessage, topicMetadata); + if (typeof partitionIndex === 'number' && partitionIndex >= 0) { + if (!finalMessage.properties) { + finalMessage.properties = {}; + } + finalMessage.properties[PARTITION_PROP_KEY] = String(partitionIndex); + } + } else { + throw new Error("Producer is configured with 'CustomPartition' routing mode, " + + "but a 'messageRouter' function was not provided."); + } + } + + // 3. Pass the modified copy to the C++ addon. + return this.addonProducer.send(finalMessage); + } + + /** + * Gets the number of partitions for the topic, using a cache with a TTL. + * @private + * @returns {Promise} + */ + async getNumPartitions() { + const now = Date.now(); + // Check if cache is missing or expired + if (this.numPartitions === undefined || now > this.partitionsCacheTimestamp + CACHE_TTL_MS) { + const partitions = await this.client.getPartitionsForTopic(this.getTopic()); + this.numPartitions = partitions.length; + this.partitionsCacheTimestamp = now; + } + return this.numPartitions; + } + + /** + * Flushes all the messages buffered in the client. + * @returns {Promise} + */ + async flush() { + return this.addonProducer.flush(); + } + + /** + * Closes the producer. + * @returns {Promise} + */ + async close() { + return this.addonProducer.close(); + } + + /** + * Gets the producer name. + * @returns {string} + */ + getProducerName() { + return this.addonProducer.getProducerName(); + } + + /** + * Gets the topic name. + * @returns {string} + */ + getTopic() { + return this.addonProducer.getTopic(); + } + + /** + * Checks if the producer is connected. + * @returns {boolean} + */ + isConnected() { + return this.addonProducer.isConnected(); + } +} + +module.exports = Producer; diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 2c704bfd..d6a6f569 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -82,6 +82,24 @@ static std::map PRODUC {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, }; +// Define a special key that the JS layer will use to pass the partition index. +static const std::string PARTITION_PROP_KEY = "__partition__"; + +static int internalCppMessageRouter(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata, + void *ctx) { + int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata); + if (pulsar_message_has_property(msg, PARTITION_PROP_KEY.c_str())) { + const char *partitionStr = pulsar_message_get_property(msg, PARTITION_PROP_KEY.c_str()); + try { + return std::stoi(partitionStr); + } catch (...) { + return numPartitions; + } + } + // return numPartitions to make cpp client failed callback + return numPartitions; +} + ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -133,9 +151,14 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); - if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) + if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) { pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(), MESSAGE_ROUTING_MODE.at(messageRoutingMode)); + } + if (messageRoutingMode == "CustomPartition") { + pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), internalCppMessageRouter, + nullptr); + } } if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) { diff --git a/tests/producer.test.js b/tests/producer.test.js index e6908cb8..b28d67d3 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,6 +18,9 @@ */ const Pulsar = require('../index'); +const httpRequest = require('./http_utils'); + +const adminUrl = 'http://localhost:8080'; (() => { describe('Producer', () => { @@ -156,5 +159,99 @@ const Pulsar = require('../index'); await producer2.close(); }); }); + describe('Message Routing', () => { + test('Custom Message Router', async () => { + // 1. Define a partitioned topic and a custom router + const targetPartition = 1; + const partitionedTopicName = `test-custom-router-${Date.now()}`; + const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; + const numPartitions = 10; + + // Use admin client to create a partitioned topic. This is more robust. + // Assuming 'adminUrl' and 'httpRequest' are available from your test setup. + const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; + const createPartitionedTopicRes = await httpRequest( + partitionedTopicAdminURL, { + headers: { + 'Content-Type': 'application/json', // Use application/json for REST API + }, + data: numPartitions, + method: 'PUT', + }, + ); + // 204 No Content is success for PUT create + expect(createPartitionedTopicRes.statusCode).toBe(204); + + // 2. Create a producer with the custom message router + const producer = await client.createProducer({ + topic: partitionedTopic, // Note: For producer, use the base topic name + messageRouter: (message, topicMetadata) => + // Always route to the target partition for this test + targetPartition, + messageRoutingMode: 'CustomPartition', + }); + + // 3. Create a single consumer for the entire partitioned topic + const consumer = await client.subscribe({ + topic: partitionedTopic, + subscription: 'test-sub', + subscriptionInitialPosition: 'Earliest', + }); + + // 4. Send 1000 messages in parallel for efficiency + console.log(`Sending messages to partitioned topic ${partitionedTopic}...`); + const numMessages = 1000; + for (let i = 0; i < numMessages; i += 1) { + const msg = `message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + } + await producer.flush(); + console.log(`Sent ${numMessages} messages.`); + + // 5. Receive messages and assert they all come from the target partition + const receivedMessages = new Set(); + const expectedPartitionName = `${partitionedTopic}-partition-${targetPartition}`; + + for (let i = 0; i < numMessages; i += 1) { + const msg = await consumer.receive(10000); + // eslint-disable-next-line no-underscore-dangle + expect(msg.getProperties().__partition__).toBe(String(targetPartition)); + expect(msg.getTopicName()).toBe(expectedPartitionName); + receivedMessages.add(msg.getData().toString()); + await consumer.acknowledge(msg); + } + // Final assertion to ensure all unique messages were received + expect(receivedMessages.size).toBe(numMessages); + console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`); + await producer.close(); + await consumer.close(); + await client.close(); + }, 30000); + + test('Custom Message Router Exception', async () => { + // 1. Define a partitioned topic and a custom router + const partitionedTopicName = `test-custom-router-${Date.now()}`; + const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; + + // 2. Create a producer with the custom message router + const producer = await client.createProducer({ + topic: partitionedTopic, // Note: For producer, use the base topic name + messageRouter: (message, topicMetadata) => { + throw new Error('Custom router error'); + }, + messageRoutingMode: 'CustomPartition', + }); + + // 4. Send 1000 messages in parallel for efficiency + await expect( + producer.send({ data: Buffer.from('test') }), + ).rejects.toThrow('Custom router error'); + + await producer.close(); + await client.close(); + }, 30000); + }); }); })(); From 0695244ec1b6ddc4b58205a11a092573d654adff Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 25 Sep 2025 22:45:42 +0800 Subject: [PATCH 2/7] code format --- src/ProducerConfig.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index d6a6f569..610401b7 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -100,7 +100,7 @@ static int internalCppMessageRouter(pulsar_message_t *msg, pulsar_topic_metadata return numPartitions; } -ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { +ProducerConfig::ProducerConfig(const Napi::Object &producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -197,7 +197,7 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { } if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) { - SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject()); + SchemaInfo *schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject()); schemaInfo->SetProducerSchema(this->cProducerConfig); delete schemaInfo; } From a142f94050e21555d6f6d2c3cc7ef66f4f31d873 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 25 Sep 2025 23:13:09 +0800 Subject: [PATCH 3/7] code format --- tests/producer.test.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/producer.test.js b/tests/producer.test.js index b28d67d3..d7832f4b 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -185,9 +185,7 @@ const adminUrl = 'http://localhost:8080'; // 2. Create a producer with the custom message router const producer = await client.createProducer({ topic: partitionedTopic, // Note: For producer, use the base topic name - messageRouter: (message, topicMetadata) => - // Always route to the target partition for this test - targetPartition, + messageRouter: (message, topicMetadata) => targetPartition, messageRoutingMode: 'CustomPartition', }); From 2e39c5ec13eddda8e1af88f8c3ac0fe35c7dd3cf Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 26 Sep 2025 10:58:08 +0800 Subject: [PATCH 4/7] fix unint test --- tests/producer.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/producer.test.js b/tests/producer.test.js index d7832f4b..511b1deb 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -230,7 +230,7 @@ const adminUrl = 'http://localhost:8080'; test('Custom Message Router Exception', async () => { // 1. Define a partitioned topic and a custom router - const partitionedTopicName = `test-custom-router-${Date.now()}`; + const partitionedTopicName = `test-custom-router-failed-${Date.now()}`; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; // 2. Create a producer with the custom message router From a0de66b05d3392eff9ceb66771aa819d6313742d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 26 Sep 2025 11:16:14 +0800 Subject: [PATCH 5/7] fix unit test --- tests/producer.test.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/producer.test.js b/tests/producer.test.js index 511b1deb..6ba530f5 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -225,7 +225,6 @@ const adminUrl = 'http://localhost:8080'; console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`); await producer.close(); await consumer.close(); - await client.close(); }, 30000); test('Custom Message Router Exception', async () => { @@ -248,7 +247,6 @@ const adminUrl = 'http://localhost:8080'; ).rejects.toThrow('Custom router error'); await producer.close(); - await client.close(); }, 30000); }); }); From 2247a399cb79ef7524765c4cea2558f0e6e5f016 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 26 Sep 2025 16:05:32 +0800 Subject: [PATCH 6/7] Improve unit test --- tests/producer.test.js | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/tests/producer.test.js b/tests/producer.test.js index 6ba530f5..42614784 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -162,7 +162,6 @@ const adminUrl = 'http://localhost:8080'; describe('Message Routing', () => { test('Custom Message Router', async () => { // 1. Define a partitioned topic and a custom router - const targetPartition = 1; const partitionedTopicName = `test-custom-router-${Date.now()}`; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; const numPartitions = 10; @@ -182,10 +181,36 @@ const adminUrl = 'http://localhost:8080'; // 204 No Content is success for PUT create expect(createPartitionedTopicRes.statusCode).toBe(204); + const routingKey = 'user-id-12345'; + const simpleHash = (str) => { + let hash = 0; + /* eslint-disable no-bitwise */ + for (let i = 0; i < str.length; i += 1) { + const char = str.charCodeAt(i); + hash = ((hash << 5) - hash) + char; + hash &= hash; + } + /* eslint-disable no-bitwise */ + return Math.abs(hash); + }; + const expectedPartition = simpleHash(routingKey) % numPartitions; + console.log(`Routing key '${routingKey}' will be sent to partition: ${expectedPartition}`); + // 2. Create a producer with the custom message router const producer = await client.createProducer({ - topic: partitionedTopic, // Note: For producer, use the base topic name - messageRouter: (message, topicMetadata) => targetPartition, + topic: partitionedTopic, + messageRouter: (message, topicMetadata) => { + // Get the routingKey from the message properties + const key = message.properties.routingKey; + if (key) { + // Use the metadata to get the number of partitions + const numPartitionsAvailable = topicMetadata.numPartitions; + // Calculate the target partition + return simpleHash(key) % numPartitionsAvailable; + } + // Fallback to a default partition if no key is provided + return 0; + }, messageRoutingMode: 'CustomPartition', }); @@ -203,6 +228,9 @@ const adminUrl = 'http://localhost:8080'; const msg = `message-${i}`; producer.send({ data: Buffer.from(msg), + properties: { + routingKey, + }, }); } await producer.flush(); @@ -210,12 +238,12 @@ const adminUrl = 'http://localhost:8080'; // 5. Receive messages and assert they all come from the target partition const receivedMessages = new Set(); - const expectedPartitionName = `${partitionedTopic}-partition-${targetPartition}`; + const expectedPartitionName = `${partitionedTopic}-partition-${expectedPartition}`; for (let i = 0; i < numMessages; i += 1) { const msg = await consumer.receive(10000); // eslint-disable-next-line no-underscore-dangle - expect(msg.getProperties().__partition__).toBe(String(targetPartition)); + expect(msg.getProperties().__partition__).toBe(String(expectedPartition)); expect(msg.getTopicName()).toBe(expectedPartitionName); receivedMessages.add(msg.getData().toString()); await consumer.acknowledge(msg); @@ -235,7 +263,7 @@ const adminUrl = 'http://localhost:8080'; // 2. Create a producer with the custom message router const producer = await client.createProducer({ topic: partitionedTopic, // Note: For producer, use the base topic name - messageRouter: (message, topicMetadata) => { + messageRouter: () => { throw new Error('Custom router error'); }, messageRoutingMode: 'CustomPartition', From 856f1449d2a7a7e9ce7c9730c2b401fd5745b9cd Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 26 Sep 2025 16:44:40 +0800 Subject: [PATCH 7/7] change api return type --- index.d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index.d.ts b/index.d.ts index fd8961b6..32b8d3da 100644 --- a/index.d.ts +++ b/index.d.ts @@ -185,9 +185,9 @@ export interface TopicMetadata { * A custom message router function that can be implemented by the user. * @param message The message to be routed. * @param topicMetadata Metadata for the topic. - * @returns The partition index to send the message to, or a Promise that resolves to it. + * @returns The partition index to send the message to. */ -export type MessageRouter = (message: ProducerMessage, topicMetadata: TopicMetadata) => number | Promise; +export type MessageRouter = (message: ProducerMessage, topicMetadata: TopicMetadata) => number; export interface SchemaInfo { schemaType: SchemaType;