Skip to content

Commit

Permalink
Only close active consumer for Failover subscription when seek(). (#7141
Browse files Browse the repository at this point in the history
)

Related to #5278 

### Motivation

Only close active consumer for Failover subscription when seek().

### Verifying this change

Unit tests added

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
  • Loading branch information
codelipenghui authored Jun 3, 2020
1 parent 842996e commit dbe5903
Show file tree
Hide file tree
Showing 18 changed files with 126 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-cli.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-function-state.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-messaging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tiered-filesystem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tiered-jcloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

public synchronized CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (activeConsumer != null) {
activeConsumer.disconnect(isResetCursor);
}
closeFuture.complete(null);
return closeFuture;
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public interface Dispatcher {

boolean isClosed();

/**
* Disconnect active consumers
*/
CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor);

/**
* disconnect all consumers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

@Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
return disconnectAllConsumers(isResetCursor);
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isRes
return closeFuture;
}

@Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
return disconnectAllConsumers(isResetCursor);
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
disconnectFuture = dispatcher.disconnectAllConsumers(true);
disconnectFuture = dispatcher.disconnectActiveConsumers(true);
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
// disable precis topic publish rate limiting
conf.setPreciseTopicPublishRateLimiterEnable(false);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti
public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception {
PublishRate publishRate = new PublishRate(1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishdRequestsPerConnection(0);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -199,4 +204,82 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
assertEquals(backlogs, 10);
}

@Test
public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Thread.sleep(1000);

consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
for (Consumer consumer : consumers) {
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
}
}

@Test
public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);
List<Consumer> consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);
Set<String> connectedSinceSet = new HashSet<>();
for (Consumer consumer : consumers) {
connectedSinceSet.add(consumer.getStats().getConnectedSince());
}
assertEquals(connectedSinceSet.size(), 2);
consumer1.seek(MessageId.earliest);
// Wait for consumer to reconnect
Thread.sleep(1000);

consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers();
assertEquals(consumers.size(), 2);

boolean hasConsumerNotDisconnected = false;
for (Consumer consumer : consumers) {
if (connectedSinceSet.contains(consumer.getStats().getConnectedSince())) {
hasConsumerNotDisconnected = true;
}
}
assertTrue(hasConsumerNotDisconnected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top
TopicPolicies initPolicy = TopicPolicies.builder()
.maxConsumerPerTopic(10)
.build();
systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy);

Assert.assertNull(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));

Thread.sleep(1000);

systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));

// Wait for all topic policies updated.
Thread.sleep(3000);

// Assert broker is cache all topic policies
Assert.assertEquals(10, systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue());
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);

// Update policy for TOPIC1
TopicPolicies policies1 = TopicPolicies.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.*;

Expand Down

0 comments on commit dbe5903

Please sign in to comment.