diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 1724fff4..9e2a06d3 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -237,7 +237,7 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Set the message routing modes for partitioned topics. * - * Default: UseSinglePartition + * Default: RoundRobinDistribution * * @param PartitionsRoutingMode partition routing mode. * @return diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index c635c48f..c3240209 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -34,7 +34,7 @@ struct ProducerConfigurationImpl { CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; int maxPendingMessagesAcrossPartitions{50000}; - ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; + ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; bool useLazyStartPartitionedProducers{false}; diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index c269538b..43306099 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -1697,9 +1697,11 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) { std::string subName = "sub-testSeekOnPartitionedTopic"; Producer producer; + ProducerConfiguration conf; + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); Promise producerPromise; - client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + client.createProducerAsync(topicName, conf, WaitForCallbackValue(producerPromise)); Future producerFuture = producerPromise.getFuture(); Result result = producerFuture.get(producer); ASSERT_EQ(ResultOk, result); diff --git a/tests/ProducerConfigurationTest.cc b/tests/ProducerConfigurationTest.cc index df5867c1..b6abea3b 100644 --- a/tests/ProducerConfigurationTest.cc +++ b/tests/ProducerConfigurationTest.cc @@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone); ASSERT_EQ(conf.getMaxPendingMessages(), 1000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{}); ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash); ASSERT_EQ(conf.getBlockIfQueueFull(), false); @@ -88,8 +88,8 @@ TEST(ProducerConfigurationTest, testCustomConfig) { conf.setMaxPendingMessagesAcrossPartitions(100000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000); - conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); const auto router = std::make_shared(); conf.setMessageRouter(router); diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 0371bac5..3da25e9b 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -67,7 +67,9 @@ TEST_P(ReaderTest, testSimpleReader) { ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -142,7 +144,9 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -176,7 +180,9 @@ TEST_P(ReaderTest, testMultipleReaders) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -223,7 +229,9 @@ TEST_P(ReaderTest, testReaderOnLastMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -263,7 +271,9 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) { initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + ProducerConfiguration producerConf; + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); for (int i = 0; i < 10; i++) { std::string content = "my-message-" + std::to_string(i); @@ -459,6 +469,7 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { Producer producer; ProducerConfiguration producerConf; producerConf.setBatchingEnabled(false); + producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); // 2. create reader, and expect hasMessageAvailable return false since no message produced.