Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Set the message routing modes for partitioned topics.
*
* Default: UseSinglePartition
* Default: RoundRobinDistribution
*
* @param PartitionsRoutingMode partition routing mode.
* @return
Expand Down
2 changes: 1 addition & 1 deletion lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct ProducerConfigurationImpl {
CompressionType compressionType{CompressionNone};
int maxPendingMessages{1000};
int maxPendingMessagesAcrossPartitions{50000};
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
Expand Down
4 changes: 3 additions & 1 deletion tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1697,9 +1697,11 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {

std::string subName = "sub-testSeekOnPartitionedTopic";
Producer producer;
ProducerConfiguration conf;
conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);

Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Expand Down
6 changes: 3 additions & 3 deletions tests/ProducerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone);
ASSERT_EQ(conf.getMaxPendingMessages(), 1000);
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000);
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{});
ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash);
ASSERT_EQ(conf.getBlockIfQueueFull(), false);
Expand Down Expand Up @@ -88,8 +88,8 @@ TEST(ProducerConfigurationTest, testCustomConfig) {
conf.setMaxPendingMessagesAcrossPartitions(100000);
ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000);

conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);

const auto router = std::make_shared<MockMessageRoutingPolicy>();
conf.setMessageRouter(router);
Expand Down
17 changes: 13 additions & 4 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ TEST_P(ReaderTest, testSimpleReader) {
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ProducerConfiguration producerConf;
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -142,7 +144,9 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ProducerConfiguration producerConf;
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -176,7 +180,9 @@ TEST_P(ReaderTest, testMultipleReaders) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ProducerConfiguration producerConf;
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -263,7 +269,9 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ProducerConfiguration producerConf;
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -459,6 +467,7 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
Producer producer;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));

// 2. create reader, and expect hasMessageAvailable return false since no message produced.
Expand Down
Loading