Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix compaction subscription delete by inactive subscription check. #20983

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2731,7 +2731,9 @@ public void checkInactiveSubscriptions() {
.toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()
|| sub.isReplicated()
|| isCompactionSubscription(subName)) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
Expand All @@ -77,6 +78,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
Expand Down Expand Up @@ -109,6 +111,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -1218,6 +1221,69 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
}
}

@Test
public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception {
String namespace = "prop/test";

// set up broker set compaction threshold.
cleanup();
conf.setBrokerServiceCompactionThresholdInBytes(8);
setup();

try {
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ok.. (if test fails intermittently and namespace is already created)
}

// set enable subscription expiration.
admin.namespaces().setSubscriptionExpirationTime(namespace, 1);

String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete";

admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic);

CompletableFuture<Optional<Topic>> topicCf =
pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true);

Optional<Topic> topicOptional = topicCf.get();
assertTrue(topicOptional.isPresent());

PersistentTopic topic = (PersistentTopic) topicOptional.get();

PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION);
assertNotNull(sub);

topic.checkCompaction();

Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction");
currentCompaction.setAccessible(true);
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)currentCompaction.get(topic);

compactionFuture.get();

ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();

// make cursor last active time to very small to check if it will be deleted
Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive");
cursorLastActiveField.setAccessible(true);
cursorLastActiveField.set(cursor, 0);

// replace origin object. so we can check if subscription is deleted.
PersistentSubscription spySubscription = Mockito.spy(sub);
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription);

// trigger inactive check.
topic.checkInactiveSubscriptions();

// Compaction subscription should not call delete method.
Mockito.verify(spySubscription, Mockito.never()).delete();

// check if the subscription exist.
assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));

}

/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.
Expand Down