Skip to content

Commit

Permalink
Add consumer distribution test for Failover subscription when update …
Browse files Browse the repository at this point in the history
…partitions. (apache#6699)

### Motivation

Add consumer distribution test for Failover subscription when update partitions. This test is related to apache#6610

### Modifications

Add unit test
  • Loading branch information
codelipenghui authored Apr 12, 2020
1 parent d72e383 commit 5f55c7d
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.netty.util.Timeout;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -38,20 +39,27 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -768,6 +776,114 @@ public void testTopicAutoUpdatePartitions() throws Exception {
consumer.close();
}

@Test(timeOut = testTimeout)
public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
final String subName = "failover-test";
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 2);
Consumer<String> consumer_1 = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName(subName)
.subscribe();
assertTrue(consumer_1 instanceof MultiTopicsConsumerImpl);

assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 2);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
}
})
.create();

final int messages = 20;
for (int i = 0; i < messages; i++) {
producer.newMessage().key(String.valueOf(i)).value("message - " + i).send();
}

int received = 0;
Message lastMessage = null;
for (int i = 0; i < messages; i++) {
lastMessage = consumer_1.receive();
received++;
}
assertEquals(received, messages);
consumer_1.acknowledgeCumulative(lastMessage);

// 1.Update partition and check message consumption
admin.topics().updatePartitionedTopic(topicName, 4);
log.info("trigger partitionsAutoUpdateTimerTask");
Timeout timeout = ((MultiTopicsConsumerImpl) consumer_1).getPartitionsAutoUpdateTimeout();
timeout.task().run(timeout);
Thread.sleep(200);

assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 4);
for (int i = 0; i < messages; i++) {
producer.newMessage().key(String.valueOf(i)).value("message - " + i).send();
}

received = 0;
lastMessage = null;
for (int i = 0; i < messages; i++) {
lastMessage = consumer_1.receive();
received++;
}
assertEquals(received, messages);
consumer_1.acknowledgeCumulative(lastMessage);

// 2.Create a new consumer and check active consumer changed
Consumer<String> consumer_2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName(subName)
.subscribe();
assertTrue(consumer_2 instanceof MultiTopicsConsumerImpl);
assertEquals(((MultiTopicsConsumerImpl) consumer_1).allTopicPartitionsNumber.get(), 4);

for (int i = 0; i < messages; i++) {
producer.newMessage().key(String.valueOf(i)).value("message - " + i).send();
}

Map<String, AtomicInteger> activeConsumers = new HashMap<>();
PartitionedTopicStats stats = admin.topics().getPartitionedStats(topicName, true);
for (TopicStats value : stats.partitions.values()) {
for (SubscriptionStats subscriptionStats : value.subscriptions.values()) {
assertTrue(subscriptionStats.activeConsumerName.equals(consumer_1.getConsumerName())
|| subscriptionStats.activeConsumerName.equals(consumer_2.getConsumerName()));
activeConsumers.putIfAbsent(subscriptionStats.activeConsumerName, new AtomicInteger(0));
activeConsumers.get(subscriptionStats.activeConsumerName).incrementAndGet();
}
}
assertEquals(activeConsumers.get(consumer_1.getConsumerName()).get(), 2);
assertEquals(activeConsumers.get(consumer_2.getConsumerName()).get(), 2);

// 4.Check new consumer can receive half of total messages
received = 0;
lastMessage = null;
for (int i = 0; i < messages / 2; i++) {
lastMessage = consumer_1.receive();
received++;
}
assertEquals(received, messages / 2);
consumer_1.acknowledgeCumulative(lastMessage);

received = 0;
lastMessage = null;
for (int i = 0; i < messages / 2; i++) {
lastMessage = consumer_2.receive();
received++;
}
assertEquals(received, messages / 2);
consumer_2.acknowledgeCumulative(lastMessage);
}

@Test(timeOut = testTimeout)
public void testDefaultBacklogTTL() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
future.complete(null);
return future;
} else if (oldPartitionNumber < currentPartitionNumber) {
allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
// subscribe new added partitions
List<CompletableFuture<Consumer<T>>> futureList = newPartitions
Expand Down

0 comments on commit 5f55c7d

Please sign in to comment.