From b34444245ce116fc9571615ded449f0fee523e78 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 9 Oct 2025 10:58:50 +0800 Subject: [PATCH 1/5] [improve] change RoutingMode default from UseSinglePartition to RoundRobinDistribution --- include/pulsar/ProducerConfiguration.h | 2 +- lib/ProducerConfigurationImpl.h | 2 +- tests/ProducerConfigurationTest.cc | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 1724fff4..9e2a06d3 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -237,7 +237,7 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Set the message routing modes for partitioned topics. * - * Default: UseSinglePartition + * Default: RoundRobinDistribution * * @param PartitionsRoutingMode partition routing mode. * @return diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index c635c48f..c3240209 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -34,7 +34,7 @@ struct ProducerConfigurationImpl { CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; int maxPendingMessagesAcrossPartitions{50000}; - ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; + ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; bool useLazyStartPartitionedProducers{false}; diff --git a/tests/ProducerConfigurationTest.cc b/tests/ProducerConfigurationTest.cc index df5867c1..b6abea3b 100644 --- a/tests/ProducerConfigurationTest.cc +++ b/tests/ProducerConfigurationTest.cc @@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone); ASSERT_EQ(conf.getMaxPendingMessages(), 1000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{}); ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash); ASSERT_EQ(conf.getBlockIfQueueFull(), false); @@ -88,8 +88,8 @@ TEST(ProducerConfigurationTest, testCustomConfig) { conf.setMaxPendingMessagesAcrossPartitions(100000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000); - conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); const auto router = std::make_shared(); conf.setMessageRouter(router); From bbc96b3f8899aa49567b53675a0c54bea0c718cf Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 9 Oct 2025 15:59:29 +0800 Subject: [PATCH 2/5] fix tests --- tests/BasicEndToEndTest.cc | 4 +++- tests/ReaderTest.cc | 17 +++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index c269538b..e24034e8 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -1697,9 +1697,11 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) { std::string subName = "sub-testSeekOnPartitionedTopic"; Producer producer; + ProducerConfiguration conf; + conf.setRoutingMode(ProducerRoutingMode::UseSinglePartition); Promise producerPromise; - client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + client.createProducerAsync(topicName, conf, WaitForCallbackValue(producerPromise)); Future producerFuture = producerPromise.getFuture(); Result result = producerFuture.get(producer); ASSERT_EQ(ResultOk, result); diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 0371bac5..4b1ae540 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -67,7 +67,9 @@ TEST_P(ReaderTest, testSimpleReader) { ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -142,7 +144,9 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -176,7 +180,9 @@ TEST_P(ReaderTest, testMultipleReaders) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -263,7 +269,9 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -459,6 +467,7 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { Producer producer; ProducerConfiguration producerConf; producerConf.setBatchingEnabled(false); + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); // 2. create reader, and expect hasMessageAvailable return false since no message produced. From 18d75bdf33e9ebe0e802cadd1aae727dc0ae4e3d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 9 Oct 2025 16:25:54 +0800 Subject: [PATCH 3/5] fix build --- tests/BasicEndToEndTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index e24034e8..e3c65fdc 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -1698,7 +1698,7 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) { std::string subName = "sub-testSeekOnPartitionedTopic"; Producer producer; ProducerConfiguration conf; - conf.setRoutingMode(ProducerRoutingMode::UseSinglePartition); + conf.setRoutingMode(ProducerConfiguration::UseSinglePartition); Promise producerPromise; client.createProducerAsync(topicName, conf, WaitForCallbackValue(producerPromise)); From d9480428b3a1781f2dd79d7ab87d7242439a0905 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 9 Oct 2025 19:11:11 +0800 Subject: [PATCH 4/5] Fix build --- tests/BasicEndToEndTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index e3c65fdc..43306099 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -1698,7 +1698,7 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) { std::string subName = "sub-testSeekOnPartitionedTopic"; Producer producer; ProducerConfiguration conf; - conf.setRoutingMode(ProducerConfiguration::UseSinglePartition); + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); Promise producerPromise; client.createProducerAsync(topicName, conf, WaitForCallbackValue(producerPromise)); From cc13d08cbeae5213eb319b4407bddad1caf6955c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 10 Oct 2025 09:46:44 +0800 Subject: [PATCH 5/5] fix test --- tests/ReaderTest.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 4b1ae540..3da25e9b 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -229,7 +229,9 @@ TEST_P(ReaderTest, testReaderOnLastMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i);