diff --git a/index.d.ts b/index.d.ts index 270c5e6..32b8d3d 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,7 @@ export interface ProducerConfig { schema?: SchemaInfo; accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; + messageRouter?: MessageRouter; } export class Producer { @@ -176,6 +177,18 @@ export class MessageId { toString(): string; } +export interface TopicMetadata { + numPartitions: number; +} + +/** + * A custom message router function that can be implemented by the user. + * @param message The message to be routed. + * @param topicMetadata Metadata for the topic. + * @returns The partition index to send the message to. + */ +export type MessageRouter = (message: ProducerMessage, topicMetadata: TopicMetadata) => number; + export interface SchemaInfo { schemaType: SchemaType; name?: string; diff --git a/src/Client.js b/src/Client.js index f51b1b6..1cf9e04 100644 --- a/src/Client.js +++ b/src/Client.js @@ -20,6 +20,7 @@ const fs = require('fs'); const tls = require('tls'); const os = require('os'); const PulsarBinding = require('./pulsar-binding'); +const Producer = require('./Producer'); const certsFilePath = `${__dirname}/cert.pem`; @@ -32,8 +33,9 @@ class Client { this.client = new PulsarBinding.Client(params); } - createProducer(params) { - return this.client.createProducer(params); + async createProducer(params) { + const addonProducer = await this.client.createProducer(params); + return new Producer(this, addonProducer, params); } subscribe(params) { diff --git a/src/Producer.cc b/src/Producer.cc index c827f9f..5874b22 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt auto instanceContext = static_cast(ctx); auto deferred = instanceContext->deferred; auto cClient = instanceContext->cClient; + auto producerConfig = instanceContext->producerConfig; delete instanceContext; if (result != pulsar_result_Ok) { @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt std::shared_ptr cProducer(rawProducer, pulsar_producer_free); - deferred->Resolve([cProducer](const Napi::Env env) { + deferred->Resolve([cProducer, producerConfig](const Napi::Env env) { Napi::Object obj = Producer::constructor.New({}); Producer *producer = Producer::Unwrap(obj); producer->SetCProducer(cProducer); + producer->producerConfig = producerConfig; return obj; }); }, diff --git a/src/Producer.h b/src/Producer.h index 70c2342..1d52a58 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -23,6 +23,7 @@ #include #include #include +#include "ProducerConfig.h" class Producer : public Napi::ObjectWrap { public: @@ -35,6 +36,7 @@ class Producer : public Napi::ObjectWrap { private: std::shared_ptr cProducer; + std::shared_ptr producerConfig; Napi::Value Send(const Napi::CallbackInfo &info); Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/Producer.js b/src/Producer.js new file mode 100644 index 0000000..41339c5 --- /dev/null +++ b/src/Producer.js @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * @typedef {import('../index').ProducerMessage} ProducerMessage + * @typedef {import('../index').MessageId} MessageId + * @typedef {import('../index').ProducerConfig} ProducerConfig + * @typedef {import('../index').Client} PulsarClient + * @typedef {import('../index').Producer} AddonProducer + */ + +const PARTITION_PROP_KEY = '__partition__'; +const CACHE_TTL_MS = 60 * 1000; + +class Producer { + /** + * This class is a JavaScript wrapper around the C++ N-API Producer object. + * It should not be instantiated by users directly. + * @param {PulsarClient} client + * @param {Producer.h} addonProducer - The native addon producer instance. + * @param {ProducerConfig} config - The original producer configuration object. + */ + constructor(client, addonProducer, config) { + /** @private */ + this.client = client; + /** @private */ + this.addonProducer = addonProducer; + /** @private */ + this.producerConfig = config; + /** @private */ + this.numPartitions = undefined; + /** @private */ + this.partitionsCacheTimestamp = 0; + } + + /** + * Sends a message. If a custom message router was provided, it is called first + * to determine the partition before passing the message to the C++ addon. + * @param {ProducerMessage} message - The message object to send. + * @returns {Promise} A promise that resolves with the MessageId of the sent message. + */ + async send(message) { + // 1. Create a shallow copy of the message parameter at the beginning. + const finalMessage = { ...message }; + const config = this.producerConfig; + + // Check if custom routing mode is enabled + if (config.messageRoutingMode === 'CustomPartition') { + if (typeof config.messageRouter === 'function') { + const numPartitions = await this.getNumPartitions(); + const topicMetadata = { numPartitions }; + const partitionIndex = config.messageRouter(finalMessage, topicMetadata); + if (typeof partitionIndex === 'number' && partitionIndex >= 0) { + if (!finalMessage.properties) { + finalMessage.properties = {}; + } + finalMessage.properties[PARTITION_PROP_KEY] = String(partitionIndex); + } + } else { + throw new Error("Producer is configured with 'CustomPartition' routing mode, " + + "but a 'messageRouter' function was not provided."); + } + } + + // 3. Pass the modified copy to the C++ addon. + return this.addonProducer.send(finalMessage); + } + + /** + * Gets the number of partitions for the topic, using a cache with a TTL. + * @private + * @returns {Promise} + */ + async getNumPartitions() { + const now = Date.now(); + // Check if cache is missing or expired + if (this.numPartitions === undefined || now > this.partitionsCacheTimestamp + CACHE_TTL_MS) { + const partitions = await this.client.getPartitionsForTopic(this.getTopic()); + this.numPartitions = partitions.length; + this.partitionsCacheTimestamp = now; + } + return this.numPartitions; + } + + /** + * Flushes all the messages buffered in the client. + * @returns {Promise} + */ + async flush() { + return this.addonProducer.flush(); + } + + /** + * Closes the producer. + * @returns {Promise} + */ + async close() { + return this.addonProducer.close(); + } + + /** + * Gets the producer name. + * @returns {string} + */ + getProducerName() { + return this.addonProducer.getProducerName(); + } + + /** + * Gets the topic name. + * @returns {string} + */ + getTopic() { + return this.addonProducer.getTopic(); + } + + /** + * Checks if the producer is connected. + * @returns {boolean} + */ + isConnected() { + return this.addonProducer.isConnected(); + } +} + +module.exports = Producer; diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 2c704bf..610401b 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -82,7 +82,25 @@ static std::map PRODUC {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, }; -ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { +// Define a special key that the JS layer will use to pass the partition index. +static const std::string PARTITION_PROP_KEY = "__partition__"; + +static int internalCppMessageRouter(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata, + void *ctx) { + int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata); + if (pulsar_message_has_property(msg, PARTITION_PROP_KEY.c_str())) { + const char *partitionStr = pulsar_message_get_property(msg, PARTITION_PROP_KEY.c_str()); + try { + return std::stoi(partitionStr); + } catch (...) { + return numPartitions; + } + } + // return numPartitions to make cpp client failed callback + return numPartitions; +} + +ProducerConfig::ProducerConfig(const Napi::Object &producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -133,9 +151,14 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); - if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) + if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) { pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(), MESSAGE_ROUTING_MODE.at(messageRoutingMode)); + } + if (messageRoutingMode == "CustomPartition") { + pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), internalCppMessageRouter, + nullptr); + } } if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) { @@ -174,7 +197,7 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { } if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) { - SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject()); + SchemaInfo *schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject()); schemaInfo->SetProducerSchema(this->cProducerConfig); delete schemaInfo; } diff --git a/tests/producer.test.js b/tests/producer.test.js index e6908cb..4261478 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,6 +18,9 @@ */ const Pulsar = require('../index'); +const httpRequest = require('./http_utils'); + +const adminUrl = 'http://localhost:8080'; (() => { describe('Producer', () => { @@ -156,5 +159,123 @@ const Pulsar = require('../index'); await producer2.close(); }); }); + describe('Message Routing', () => { + test('Custom Message Router', async () => { + // 1. Define a partitioned topic and a custom router + const partitionedTopicName = `test-custom-router-${Date.now()}`; + const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; + const numPartitions = 10; + + // Use admin client to create a partitioned topic. This is more robust. + // Assuming 'adminUrl' and 'httpRequest' are available from your test setup. + const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; + const createPartitionedTopicRes = await httpRequest( + partitionedTopicAdminURL, { + headers: { + 'Content-Type': 'application/json', // Use application/json for REST API + }, + data: numPartitions, + method: 'PUT', + }, + ); + // 204 No Content is success for PUT create + expect(createPartitionedTopicRes.statusCode).toBe(204); + + const routingKey = 'user-id-12345'; + const simpleHash = (str) => { + let hash = 0; + /* eslint-disable no-bitwise */ + for (let i = 0; i < str.length; i += 1) { + const char = str.charCodeAt(i); + hash = ((hash << 5) - hash) + char; + hash &= hash; + } + /* eslint-disable no-bitwise */ + return Math.abs(hash); + }; + const expectedPartition = simpleHash(routingKey) % numPartitions; + console.log(`Routing key '${routingKey}' will be sent to partition: ${expectedPartition}`); + + // 2. Create a producer with the custom message router + const producer = await client.createProducer({ + topic: partitionedTopic, + messageRouter: (message, topicMetadata) => { + // Get the routingKey from the message properties + const key = message.properties.routingKey; + if (key) { + // Use the metadata to get the number of partitions + const numPartitionsAvailable = topicMetadata.numPartitions; + // Calculate the target partition + return simpleHash(key) % numPartitionsAvailable; + } + // Fallback to a default partition if no key is provided + return 0; + }, + messageRoutingMode: 'CustomPartition', + }); + + // 3. Create a single consumer for the entire partitioned topic + const consumer = await client.subscribe({ + topic: partitionedTopic, + subscription: 'test-sub', + subscriptionInitialPosition: 'Earliest', + }); + + // 4. Send 1000 messages in parallel for efficiency + console.log(`Sending messages to partitioned topic ${partitionedTopic}...`); + const numMessages = 1000; + for (let i = 0; i < numMessages; i += 1) { + const msg = `message-${i}`; + producer.send({ + data: Buffer.from(msg), + properties: { + routingKey, + }, + }); + } + await producer.flush(); + console.log(`Sent ${numMessages} messages.`); + + // 5. Receive messages and assert they all come from the target partition + const receivedMessages = new Set(); + const expectedPartitionName = `${partitionedTopic}-partition-${expectedPartition}`; + + for (let i = 0; i < numMessages; i += 1) { + const msg = await consumer.receive(10000); + // eslint-disable-next-line no-underscore-dangle + expect(msg.getProperties().__partition__).toBe(String(expectedPartition)); + expect(msg.getTopicName()).toBe(expectedPartitionName); + receivedMessages.add(msg.getData().toString()); + await consumer.acknowledge(msg); + } + // Final assertion to ensure all unique messages were received + expect(receivedMessages.size).toBe(numMessages); + console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`); + await producer.close(); + await consumer.close(); + }, 30000); + + test('Custom Message Router Exception', async () => { + // 1. Define a partitioned topic and a custom router + const partitionedTopicName = `test-custom-router-failed-${Date.now()}`; + const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; + + // 2. Create a producer with the custom message router + const producer = await client.createProducer({ + topic: partitionedTopic, // Note: For producer, use the base topic name + messageRouter: () => { + throw new Error('Custom router error'); + }, + messageRoutingMode: 'CustomPartition', + }); + + // 4. Send 1000 messages in parallel for efficiency + await expect( + producer.send({ data: Buffer.from('test') }), + ).rejects.toThrow('Custom router error'); + + await producer.close(); + }, 30000); + }); }); })();