Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only close active consumer for Failover subscription when seek(). #7141

Merged
merged 4 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

public synchronized CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (activeConsumer != null) {
activeConsumer.disconnect(isResetCursor);
}
closeFuture.complete(null);
return closeFuture;
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public interface Dispatcher {

boolean isClosed();

/**
* Disconnect active consumers
*/
CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor);

/**
* disconnect all consumers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

@Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
return disconnectAllConsumers(isResetCursor);
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

@Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
return disconnectAllConsumers(isResetCursor);
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
disconnectFuture = dispatcher.disconnectAllConsumers(true);
disconnectFuture = dispatcher.disconnectActiveConsumers(true);
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
// disable precis topic publish rate limiting
conf.setPreciseTopicPublishRateLimiterEnable(false);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti
public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -199,4 +204,82 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
assertEquals(backlogs, 10);
}

@Test
public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Thread.sleep(1000);

consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
for (Consumer consumer : consumers) {
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
}
}

@Test
public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Thread.sleep(1000);

consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);

boolean hasConsumerNotDisconnected = false;
for (Consumer consumer : consumers) {
if (connectedSinceSet.contains(consumer.getStats().getConnectedSince())) {
hasConsumerNotDisconnected = true;
}
}
assertTrue(hasConsumerNotDisconnected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.*;

Expand Down