Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ProducerConfig {
schema?: SchemaInfo;
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
}

export class Producer {
Expand Down Expand Up @@ -176,6 +177,23 @@ export class MessageId {
toString(): string;
}

export interface TopicMetadata {
numPartitions: number;
}

/**
* @callback MessageRouter
* @description When producing messages to a partitioned topic, this router is used to select the
* target partition for each message. The router only works when the `messageRoutingMode` is set to
* `CustomPartition`. Please note that `getTopicName()` cannot be called on the `message`, otherwise
* the behavior will be undefined because the topic is unknown before sending the message.
* @param message The message to be routed.
* @param topicMetadata Metadata for the partitioned topic the message is being routed to.
* @returns {number} The index of the target partition (must be a number between 0 and
* topicMetadata.numPartitions - 1).
*/
export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number;

export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
Expand Down
4 changes: 3 additions & 1 deletion src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto producerConfig = instanceContext->producerConfig;
delete instanceContext;

if (result != pulsar_result_Ok) {
Expand All @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt

std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);

deferred->Resolve([cProducer](const Napi::Env env) {
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
producer->producerConfig = producerConfig;
return obj;
});
},
Expand Down
5 changes: 5 additions & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include <pulsar/c/producer.h>
#include <memory>
#include "ProducerConfig.h"

class Producer : public Napi::ObjectWrap<Producer> {
public:
Expand All @@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap<Producer> {

private:
std::shared_ptr<pulsar_producer_t> cProducer;
// Extend the lifetime of the producer config since it's env and router function could be used when sending
// messages
std::shared_ptr<ProducerConfig> producerConfig;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Expand Down
37 changes: 37 additions & 0 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
*/
#include "SchemaInfo.h"
#include "ProducerConfig.h"
#include "Message.h"
#include <cstdio>
#include <map>
#include "napi-inl.h"
#include "napi.h"
#include "pulsar/ProducerConfiguration.h"
#include "pulsar/c/message.h"
#include "pulsar/c/message_router.h"

static const std::string CFG_TOPIC = "topic";
static const std::string CFG_PRODUCER_NAME = "producerName";
Expand All @@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
static const std::string CFG_ACCESS_MODE = "accessMode";
static const std::string CFG_BATCHING_TYPE = "batchingType";
static const std::string CFG_MESSAGE_ROUTER = "messageRouter";

struct _pulsar_producer_configuration {
pulsar::ProducerConfiguration conf;
Expand Down Expand Up @@ -82,6 +89,25 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
};

static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metadata, void* ctx) {
auto router = static_cast<Napi::FunctionReference*>(ctx);
const auto& env = router->Env();
auto jsMessage = Message::NewInstance(Napi::Object::New(env),
std::shared_ptr<pulsar_message_t>(msg, [](pulsar_message_t*) {}));
int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata);

Napi::Object jsTopicMetadata = Napi::Object::New(env);
jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));

try {
return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value();
} catch (const Napi::Error& e) {
// TODO: how to handle the error properly? For now, return an invalid partition to fail the send
fprintf(stderr, "Error when calling messageRouter: %s\n", e.what());
return numPartitions;
}
}

ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
Expand Down Expand Up @@ -131,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(), blockIfQueueFull);
}

bool useCustomPartition = false;
if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
useCustomPartition = (messageRoutingMode == "CustomPartition");
if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(),
MESSAGE_ROUTING_MODE.at(messageRoutingMode));
Expand Down Expand Up @@ -224,6 +252,15 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
}

if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) {
auto value = producerConfig.Get(CFG_MESSAGE_ROUTER);
if (value.IsFunction()) {
messageRouter = Napi::Persistent(value.As<Napi::Function>());
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition,
&messageRouter);
}
}
}

ProducerConfig::~ProducerConfig() {}
Expand Down
7 changes: 7 additions & 0 deletions src/ProducerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

#include <napi.h>
#include <pulsar/c/producer_configuration.h>
#include <memory>

struct MessageRouterContext {
Napi::FunctionReference messageRouter;
};

class ProducerConfig {
public:
Expand All @@ -33,6 +38,8 @@ class ProducerConfig {
private:
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
std::string topic;
std::unique_ptr<MessageRouterContext> routerContext;
Napi::FunctionReference messageRouter;
};

#endif
10 changes: 5 additions & 5 deletions tests/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

const httpRequest = require('./http_utils');
const httpUtils = require('./http_utils');
const Pulsar = require('../index');

const baseUrl = 'http://localhost:8080';
Expand Down Expand Up @@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080';
const nonPartitionedTopicName = 'test-non-partitioned-topic';
const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`;
const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
const createNonPartitionedTopicRes = await httpRequest(
const createNonPartitionedTopicRes = await httpUtils.request(
nonPartitionedTopicAdminURL, {
headers: {
'Content-Type': 'application/json',
Expand All @@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080';
const partitionedTopicName = 'test-partitioned-topic-1';
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await httpRequest(
const createPartitionedTopicRes = await httpUtils.request(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
Expand All @@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080';
'persistent://public/default/test-partitioned-topic-1-partition-3',
]);

const deleteNonPartitionedTopicRes = await httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' });
const deleteNonPartitionedTopicRes = await httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' });
expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
const deletePartitionedTopicRes = await httpRequest(partitionedTopicAdminURL, { method: 'DELETE' });
const deletePartitionedTopicRes = await httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' });
expect(deletePartitionedTopicRes.statusCode).toBe(204);

await client.close();
Expand Down
16 changes: 15 additions & 1 deletion tests/http_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new Promise((resolve, r
req.end();
});

module.exports = request;
function createPartitionedTopic(topic, numPartitions) {
const url = `http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`;
return request(url, {
headers: {
'Content-Type': 'application/json',
},
data: numPartitions,
method: 'PUT',
});
}

module.exports = {
createPartitionedTopic,
request,
};
83 changes: 83 additions & 0 deletions tests/producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/

const Pulsar = require('../index');
const httpUtils = require('./http_utils');

function getPartition(msgId) {
// The message id string is in the format of "entryId,ledgerId,partition,batchIndex"
return Number(msgId.toString().split(',')[2]);
}

(() => {
describe('Producer', () => {
Expand Down Expand Up @@ -156,5 +162,82 @@
await producer2.close();
});
});
describe('Message Routing', () => {
test('Custom Message Router', async () => {
const topic = `test-custom-router-${Date.now()}`;
const numPartitions = 3;
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
expect(response.statusCode).toBe(204);

const producer = await client.createProducer({
topic,
batchingMaxMessages: 2,
messageRouter: (message, topicMetadata) => parseInt(message.getPartitionKey(), 10)
% topicMetadata.numPartitions,
messageRoutingMode: 'CustomPartition',
});

const promises = [];
const numMessages = 5;
for (let i = 0; i < numMessages; i += 1) {
const sendPromise = producer.send({
partitionKey: `${i}`,
data: Buffer.from(`msg-${i}`),
});
await sendPromise;
promises.push(sendPromise);
}
try {
const allMsgIds = await Promise.all(promises);
console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`);
for (let i = 0; i < allMsgIds.length; i += 1) {
// The message id string is in the format of "entryId,ledgerId,partition,batchIndex"
const partition = getPartition(allMsgIds[i]);
expect(i % numPartitions).toBe(partition);
}
} catch (error) {
console.error('One or more messages failed to send:', error);
}
}, 30000);
test('Exception in router', async () => {
const topic = `test-exception-in-router-${Date.now()}`;
const numPartitions = 2;
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
expect(response.statusCode).toBe(204);
const producer = await client.createProducer({
topic,
messageRouter: (message, topicMetadata) => {

Check warning on line 209 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'topicMetadata' is defined but never used

Check warning on line 209 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('Custom error in message router');
},
messageRoutingMode: 'CustomPartition',
});
await expect(
producer.send({ data: Buffer.from('test') }),
).rejects.toThrow('Failed to send message: UnknownError');
}, 30000);
test('Not CustomPartition', async () => {
const topic = `test-not-custom-part-${Date.now()}`;
const numPartitions = 2;
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
expect(response.statusCode).toBe(204);

let index = 0;
const producer = await client.createProducer({
topic,
messageRouter: (_, topicMetadata) => {
const result = index % topicMetadata.numPartitions;
index += 1;
return result;
},
messageRoutingMode: 'UseSinglePartition',
});
const partitions = new Set();
for (let i = 0; i < 10; i += 1) {
const msgId = await producer.send({ data: Buffer.from('msg') });
partitions.add(getPartition(msgId));
}
expect(partitions.size).toBe(1);
}, 30000);
});
});
})();
4 changes: 2 additions & 2 deletions tests/reader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

const lodash = require('lodash');
const Pulsar = require('../index');
const httpRequest = require('./http_utils');
const httpUtils = require('./http_utils');

const baseUrl = 'http://localhost:8080';

Expand Down Expand Up @@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080';
const partitionedTopicName = 'test-reader-partitioned-topic';
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await httpRequest(
const createPartitionedTopicRes = await httpUtils.request(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
Expand Down
Loading