Skip to content

Commit

Permalink
Fixed increasing number of partitions with attached readers (apache#7077
Browse files Browse the repository at this point in the history
)

When we're increasing the number of partitions in a topic, we typically also pre-create the subscriptions on those topics. In this case, we're also pre-creating the non-durable subscriptions of the readers.

These are getting created as durable subscriptions on the new partitions and will be leaked and causing backlogs.

Instead, we don't need to pre-create anything for readers (non-durable subscriptions).

* Fixed increasing number of partitions with attached readers

* Fixed test expectation after merge
  • Loading branch information
merlimat authored and Huanli-Meng committed Jun 1, 2020
1 parent 458cb80 commit a0e07de
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -2404,28 +2405,32 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
}

admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
if (stats.subscriptions.size() == 0) {
result.complete(null);
} else {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();

subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
stats.subscriptions.entrySet().forEach(e -> {
String subscription = e.getKey();
SubscriptionStats ss = e.getValue();
if (!ss.isDurable) {
// We must not re-create non-durable subscriptions on the new partitions
return;
}

FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
});
}
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();

subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
});

FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.isReplicated = isReplicated();
subStats.isDurable = cursor.isDurable();
return subStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -35,6 +39,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.util.Collections;

import lombok.Cleanup;

public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -123,4 +129,32 @@ public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
}

@Test
public void testIncrementPartitionsWithReaders() throws Exception {
TopicName partitionedTopicName = TopicName.get("persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime());

admin.topics().createPartitionedTopic(partitionedTopicName.toString(), 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName.toString()).partitions, 1);

@Cleanup
Producer<String> consumer = pulsarClient.newProducer(Schema.STRING)
.topic(partitionedTopicName.toString())
.create();

@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(partitionedTopicName.getPartition(0).toString())
.startMessageId(MessageId.earliest)
.create();

admin.topics().updatePartitionedTopic(partitionedTopicName.toString(), 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName.toString()).partitions, 2);

assertEquals(admin.topics().getSubscriptions(partitionedTopicName.getPartition(0).toString()).size(), 1);

// Partition-1 should not have subscriptions
assertEquals(admin.topics().getSubscriptions(partitionedTopicName.getPartition(1).toString()),
Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class SubscriptionStats {
/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;

/** Tells whether this subscription is durable or ephemeral (eg.: from a reader). */
public boolean isDurable;

/** Mark that the subscription state is kept in sync across different regions. */
public boolean isReplicated;

Expand Down Expand Up @@ -117,6 +120,7 @@ public SubscriptionStats add(SubscriptionStats stats) {
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
this.isReplicated |= stats.isReplicated;
this.isDurable |= stats.isDurable;
if (this.consumers.size() != stats.consumers.size()) {
for (int i = 0; i < stats.consumers.size(); i++) {
ConsumerStats consumerStats = new ConsumerStats();
Expand Down

0 comments on commit a0e07de

Please sign in to comment.