From 5f55c7d1eccf40c5c3886b0ebc2898bc0da8b2e6 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Sun, 12 Apr 2020 20:04:43 +0800 Subject: [PATCH] Add consumer distribution test for Failover subscription when update partitions. (#6699) ### Motivation Add consumer distribution test for Failover subscription when update partitions. This test is related to #6610 ### Modifications Add unit test --- .../client/impl/TopicsConsumerImplTest.java | 116 ++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 1 + 2 files changed, 117 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 1bbd758dff7f9..5fd2d5b382985 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -28,6 +28,7 @@ import io.netty.util.Timeout; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.pulsar.broker.service.Topic; @@ -45,13 +47,19 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -768,6 +776,114 @@ public void testTopicAutoUpdatePartitions() throws Exception { consumer.close(); } + @Test(timeOut = testTimeout) + public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions"; + final String subName = "failover-test"; + TenantInfo tenantInfo = createDefaultTenantInfo(); + admin.tenants().createTenant("prop", tenantInfo); + admin.topics().createPartitionedTopic(topicName, 2); + assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 2); + Consumer consumer_1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Failover) + .subscriptionName(subName) + .subscribe(); + assertTrue(consumer_1 instanceof MultiTopicsConsumerImpl); + + assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 2); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .messageRouter(new MessageRouter() { + @Override + public int choosePartition(Message msg, TopicMetadata metadata) { + return Integer.parseInt(msg.getKey()) % metadata.numPartitions(); + } + }) + .create(); + + final int messages = 20; + for (int i = 0; i < messages; i++) { + producer.newMessage().key(String.valueOf(i)).value("message - " + i).send(); + } + + int received = 0; + Message lastMessage = null; + for (int i = 0; i < messages; i++) { + lastMessage = consumer_1.receive(); + received++; + } + assertEquals(received, messages); + consumer_1.acknowledgeCumulative(lastMessage); + + // 1.Update partition and check message consumption + admin.topics().updatePartitionedTopic(topicName, 4); + log.info("trigger partitionsAutoUpdateTimerTask"); + Timeout timeout = ((MultiTopicsConsumerImpl) consumer_1).getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Thread.sleep(200); + + assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 4); + for (int i = 0; i < messages; i++) { + producer.newMessage().key(String.valueOf(i)).value("message - " + i).send(); + } + + received = 0; + lastMessage = null; + for (int i = 0; i < messages; i++) { + lastMessage = consumer_1.receive(); + received++; + } + assertEquals(received, messages); + consumer_1.acknowledgeCumulative(lastMessage); + + // 2.Create a new consumer and check active consumer changed + Consumer consumer_2 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Failover) + .subscriptionName(subName) + .subscribe(); + assertTrue(consumer_2 instanceof MultiTopicsConsumerImpl); + assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 4); + + for (int i = 0; i < messages; i++) { + producer.newMessage().key(String.valueOf(i)).value("message - " + i).send(); + } + + Map activeConsumers = new HashMap<>(); + PartitionedTopicStats stats = admin.topics().getPartitionedStats(topicName, true); + for (TopicStats value : stats.partitions.values()) { + for (SubscriptionStats subscriptionStats : value.subscriptions.values()) { + assertTrue(subscriptionStats.activeConsumerName.equals(consumer_1.getConsumerName()) + || subscriptionStats.activeConsumerName.equals(consumer_2.getConsumerName())); + activeConsumers.putIfAbsent(subscriptionStats.activeConsumerName, new AtomicInteger(0)); + activeConsumers.get(subscriptionStats.activeConsumerName).incrementAndGet(); + } + } + assertEquals(activeConsumers.get(consumer_1.getConsumerName()).get(), 2); + assertEquals(activeConsumers.get(consumer_2.getConsumerName()).get(), 2); + + // 4.Check new consumer can receive half of total messages + received = 0; + lastMessage = null; + for (int i = 0; i < messages / 2; i++) { + lastMessage = consumer_1.receive(); + received++; + } + assertEquals(received, messages / 2); + consumer_1.acknowledgeCumulative(lastMessage); + + received = 0; + lastMessage = null; + for (int i = 0; i < messages / 2; i++) { + lastMessage = consumer_2.receive(); + received++; + } + assertEquals(received, messages / 2); + consumer_2.acknowledgeCumulative(lastMessage); + } + @Test(timeOut = testTimeout) public void testDefaultBacklogTTL() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e2016a95317d1..18ada348232b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1135,6 +1135,7 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa future.complete(null); return future; } else if (oldPartitionNumber < currentPartitionNumber) { + allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber); List newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber); // subscribe new added partitions List>> futureList = newPartitions