diff --git a/index.d.ts b/index.d.ts index 270c5e6..72c89af 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,7 @@ export interface ProducerConfig { schema?: SchemaInfo; accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; + messageRouter?: MessageRouter; } export class Producer { @@ -176,6 +177,23 @@ export class MessageId { toString(): string; } +export interface TopicMetadata { + numPartitions: number; +} + +/** + * @callback MessageRouter + * @description When producing messages to a partitioned topic, this router is used to select the + * target partition for each message. The router only works when the `messageRoutingMode` is set to + * `CustomPartition`. Please note that `getTopicName()` cannot be called on the `message`, otherwise + * the behavior will be undefined because the topic is unknown before sending the message. + * @param message The message to be routed. + * @param topicMetadata Metadata for the partitioned topic the message is being routed to. + * @returns {number} The index of the target partition (must be a number between 0 and + * topicMetadata.numPartitions - 1). + */ +export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number; + export interface SchemaInfo { schemaType: SchemaType; name?: string; diff --git a/src/Producer.cc b/src/Producer.cc index c827f9f..5874b22 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt auto instanceContext = static_cast(ctx); auto deferred = instanceContext->deferred; auto cClient = instanceContext->cClient; + auto producerConfig = instanceContext->producerConfig; delete instanceContext; if (result != pulsar_result_Ok) { @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt std::shared_ptr cProducer(rawProducer, pulsar_producer_free); - deferred->Resolve([cProducer](const Napi::Env env) { + deferred->Resolve([cProducer, producerConfig](const Napi::Env env) { Napi::Object obj = Producer::constructor.New({}); Producer *producer = Producer::Unwrap(obj); producer->SetCProducer(cProducer); + producer->producerConfig = producerConfig; return obj; }); }, diff --git a/src/Producer.h b/src/Producer.h index 70c2342..98849ed 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include "ProducerConfig.h" class Producer : public Napi::ObjectWrap { public: @@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap { private: std::shared_ptr cProducer; + // Extend the lifetime of the producer config since it's env and router function could be used when sending + // messages + std::shared_ptr producerConfig; Napi::Value Send(const Napi::CallbackInfo &info); Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 2c704bf..3889120 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -18,8 +18,14 @@ */ #include "SchemaInfo.h" #include "ProducerConfig.h" +#include "Message.h" +#include #include +#include "napi-inl.h" +#include "napi.h" #include "pulsar/ProducerConfiguration.h" +#include "pulsar/c/message.h" +#include "pulsar/c/message_router.h" static const std::string CFG_TOPIC = "topic"; static const std::string CFG_PRODUCER_NAME = "producerName"; @@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction"; static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled"; static const std::string CFG_ACCESS_MODE = "accessMode"; static const std::string CFG_BATCHING_TYPE = "batchingType"; +static const std::string CFG_MESSAGE_ROUTER = "messageRouter"; struct _pulsar_producer_configuration { pulsar::ProducerConfiguration conf; @@ -82,6 +89,25 @@ static std::map PRODUC {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, }; +static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metadata, void* ctx) { + auto router = static_cast(ctx); + const auto& env = router->Env(); + auto jsMessage = Message::NewInstance(Napi::Object::New(env), + std::shared_ptr(msg, [](pulsar_message_t*) {})); + int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata); + + Napi::Object jsTopicMetadata = Napi::Object::New(env); + jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions)); + + try { + return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value(); + } catch (const Napi::Error& e) { + // TODO: how to handle the error properly? For now, return an invalid partition to fail the send + fprintf(stderr, "Error when calling messageRouter: %s\n", e.what()); + return numPartitions; + } +} + ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -131,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(), blockIfQueueFull); } + bool useCustomPartition = false; if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); + useCustomPartition = (messageRoutingMode == "CustomPartition"); if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(), MESSAGE_ROUTING_MODE.at(messageRoutingMode)); @@ -224,6 +252,15 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { if (PRODUCER_BATCHING_TYPE.count(batchingType)) { this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType)); } + + if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) { + auto value = producerConfig.Get(CFG_MESSAGE_ROUTER); + if (value.IsFunction()) { + messageRouter = Napi::Persistent(value.As()); + pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition, + &messageRouter); + } + } } ProducerConfig::~ProducerConfig() {} diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h index 3d49557..0437768 100644 --- a/src/ProducerConfig.h +++ b/src/ProducerConfig.h @@ -22,6 +22,11 @@ #include #include +#include + +struct MessageRouterContext { + Napi::FunctionReference messageRouter; +}; class ProducerConfig { public: @@ -33,6 +38,8 @@ class ProducerConfig { private: std::shared_ptr cProducerConfig; std::string topic; + std::unique_ptr routerContext; + Napi::FunctionReference messageRouter; }; #endif diff --git a/tests/client.test.js b/tests/client.test.js index f7bc6d5..d97763e 100644 --- a/tests/client.test.js +++ b/tests/client.test.js @@ -17,7 +17,7 @@ * under the License. */ -const httpRequest = require('./http_utils'); +const httpUtils = require('./http_utils'); const Pulsar = require('../index'); const baseUrl = 'http://localhost:8080'; @@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080'; const nonPartitionedTopicName = 'test-non-partitioned-topic'; const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`; const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`; - const createNonPartitionedTopicRes = await httpRequest( + const createNonPartitionedTopicRes = await httpUtils.request( nonPartitionedTopicAdminURL, { headers: { 'Content-Type': 'application/json', @@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080'; const partitionedTopicName = 'test-partitioned-topic-1'; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; - const createPartitionedTopicRes = await httpRequest( + const createPartitionedTopicRes = await httpUtils.request( partitionedTopicAdminURL, { headers: { 'Content-Type': 'text/plain', @@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080'; 'persistent://public/default/test-partitioned-topic-1-partition-3', ]); - const deleteNonPartitionedTopicRes = await httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' }); + const deleteNonPartitionedTopicRes = await httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' }); expect(deleteNonPartitionedTopicRes.statusCode).toBe(204); - const deletePartitionedTopicRes = await httpRequest(partitionedTopicAdminURL, { method: 'DELETE' }); + const deletePartitionedTopicRes = await httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' }); expect(deletePartitionedTopicRes.statusCode).toBe(204); await client.close(); diff --git a/tests/http_utils.js b/tests/http_utils.js index 8fa94f2..81d09de 100644 --- a/tests/http_utils.js +++ b/tests/http_utils.js @@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new Promise((resolve, r req.end(); }); -module.exports = request; +function createPartitionedTopic(topic, numPartitions) { + const url = `http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`; + return request(url, { + headers: { + 'Content-Type': 'application/json', + }, + data: numPartitions, + method: 'PUT', + }); +} + +module.exports = { + createPartitionedTopic, + request, +}; diff --git a/tests/producer.test.js b/tests/producer.test.js index e6908cb..d094505 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,6 +18,12 @@ */ const Pulsar = require('../index'); +const httpUtils = require('./http_utils'); + +function getPartition(msgId) { + // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" + return Number(msgId.toString().split(',')[2]); +} (() => { describe('Producer', () => { @@ -156,5 +162,82 @@ const Pulsar = require('../index'); await producer2.close(); }); }); + describe('Message Routing', () => { + test('Custom Message Router', async () => { + const topic = `test-custom-router-${Date.now()}`; + const numPartitions = 3; + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); + expect(response.statusCode).toBe(204); + + const producer = await client.createProducer({ + topic, + batchingMaxMessages: 2, + messageRouter: (message, topicMetadata) => parseInt(message.getPartitionKey(), 10) + % topicMetadata.numPartitions, + messageRoutingMode: 'CustomPartition', + }); + + const promises = []; + const numMessages = 5; + for (let i = 0; i < numMessages; i += 1) { + const sendPromise = producer.send({ + partitionKey: `${i}`, + data: Buffer.from(`msg-${i}`), + }); + await sendPromise; + promises.push(sendPromise); + } + try { + const allMsgIds = await Promise.all(promises); + console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`); + for (let i = 0; i < allMsgIds.length; i += 1) { + // The message id string is in the format of "entryId,ledgerId,partition,batchIndex" + const partition = getPartition(allMsgIds[i]); + expect(i % numPartitions).toBe(partition); + } + } catch (error) { + console.error('One or more messages failed to send:', error); + } + }, 30000); + test('Exception in router', async () => { + const topic = `test-exception-in-router-${Date.now()}`; + const numPartitions = 2; + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); + expect(response.statusCode).toBe(204); + const producer = await client.createProducer({ + topic, + messageRouter: (message, topicMetadata) => { + throw new Error('Custom error in message router'); + }, + messageRoutingMode: 'CustomPartition', + }); + await expect( + producer.send({ data: Buffer.from('test') }), + ).rejects.toThrow('Failed to send message: UnknownError'); + }, 30000); + test('Not CustomPartition', async () => { + const topic = `test-not-custom-part-${Date.now()}`; + const numPartitions = 2; + const response = await httpUtils.createPartitionedTopic(topic, numPartitions); + expect(response.statusCode).toBe(204); + + let index = 0; + const producer = await client.createProducer({ + topic, + messageRouter: (_, topicMetadata) => { + const result = index % topicMetadata.numPartitions; + index += 1; + return result; + }, + messageRoutingMode: 'UseSinglePartition', + }); + const partitions = new Set(); + for (let i = 0; i < 10; i += 1) { + const msgId = await producer.send({ data: Buffer.from('msg') }); + partitions.add(getPartition(msgId)); + } + expect(partitions.size).toBe(1); + }, 30000); + }); }); })(); diff --git a/tests/reader.test.js b/tests/reader.test.js index 56d1b48..fb0842b 100644 --- a/tests/reader.test.js +++ b/tests/reader.test.js @@ -19,7 +19,7 @@ const lodash = require('lodash'); const Pulsar = require('../index'); -const httpRequest = require('./http_utils'); +const httpUtils = require('./http_utils'); const baseUrl = 'http://localhost:8080'; @@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080'; const partitionedTopicName = 'test-reader-partitioned-topic'; const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; - const createPartitionedTopicRes = await httpRequest( + const createPartitionedTopicRes = await httpUtils.request( partitionedTopicAdminURL, { headers: { 'Content-Type': 'text/plain',