From 5835191295371b3a5f49ed1019a8ad197554424e Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Thu, 13 Jan 2022 00:52:38 +0800 Subject: [PATCH] [ Issue 13479 ] Fixed internal topic effect by InactiveTopicPolicy. (#13611) --- .../pulsar/broker/admin/impl/BrokersBase.java | 3 +- .../service/persistent/SystemTopic.java | 5 ++ .../broker/systopic/SystemTopicClient.java | 6 ++- .../service/InactiveTopicDeleteTest.java | 49 +++++++++++++++++++ 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 756d141fab444..bb9453e09bfdd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -70,6 +70,7 @@ public class BrokersBase extends PulsarWebResource { private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10); + public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck"; @GET @Path("/{cluster}") @@ -317,7 +318,7 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse, pulsar().getConfiguration()); - topic = String.format("persistent://%s/healthcheck", heartbeatNamespace); + topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX); LOG.info("Running healthCheck with topic={}", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 3b11de6dbe77a..37f2dde7a7c11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -31,6 +31,11 @@ public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerServi super(topic, ledger, brokerService); } + @Override + public boolean isDeleteWhileInactive() { + return false; + } + @Override public boolean isSizeBacklogExceeded() { return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java index c967e7d8ae8f1..fa39f42d0e920 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -206,7 +207,10 @@ static boolean isSystemTopic(TopicName topicName) { if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) { return true; } - + // health check topic + if (StringUtils.endsWith(localName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){ + return true; + } return false; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 2acb24b1527c5..853d5ace2e0f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; @@ -571,4 +573,51 @@ public void testInactiveTopicApplied() throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), brokerLevelPolicy)); } + + @Test(timeOut = 30000) + public void testInternalTopicInactiveNotClean() throws Exception { + conf.setSystemTopicEnabled(true); + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + super.baseSetup(); + // init topic + final String healthCheckTopic = "persistent://prop/ns-abc/"+ BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX; + final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"; + + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + Producer heathCheckProducer = pulsarClient.newProducer() + .topic(healthCheckTopic) + .create(); + Consumer heathCheckConsumer = pulsarClient.newConsumer() + .topic(healthCheckTopic) + .subscriptionName("healthCheck") + .subscribe(); + + consumer.close(); + producer.close(); + heathCheckConsumer.close(); + heathCheckProducer.close(); + + Awaitility.await().untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(topic))); + Awaitility.await().untilAsserted(() -> { + Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic)); + }); + + admin.topics().deleteSubscription(topic, "sub"); + admin.topics().deleteSubscription(healthCheckTopic, "healthCheck"); + + Awaitility.await().untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc") + .contains(topic))); + Awaitility.await().pollDelay(2, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(healthCheckTopic))); + } }