Skip to content

Commit

Permalink
[ Issue 13479 ] Fixed internal topic effect by InactiveTopicPolicy. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Jan 12, 2022
1 parent 160ed8a commit 5835191
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10);
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";

@GET
@Path("/{cluster}")
Expand Down Expand Up @@ -317,7 +318,7 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse,
pulsar().getConfiguration());


topic = String.format("persistent://%s/healthcheck", heartbeatNamespace);
topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX);

LOG.info("Running healthCheck with topic={}", topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerServi
super(topic, ledger, brokerService);
}

@Override
public boolean isDeleteWhileInactive() {
return false;
}

@Override
public boolean isSizeBacklogExceeded() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -206,7 +207,10 @@ static boolean isSystemTopic(TopicName topicName) {
if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
return true;
}

// health check topic
if (StringUtils.endsWith(localName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){
return true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -571,4 +573,51 @@ public void testInactiveTopicApplied() throws Exception {
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getInactiveTopicPolicies(topic, true), brokerLevelPolicy));
}

@Test(timeOut = 30000)
public void testInternalTopicInactiveNotClean() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
// init topic
final String healthCheckTopic = "persistent://prop/ns-abc/"+ BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();

Producer<byte[]> heathCheckProducer = pulsarClient.newProducer()
.topic(healthCheckTopic)
.create();
Consumer<byte[]> heathCheckConsumer = pulsarClient.newConsumer()
.topic(healthCheckTopic)
.subscriptionName("healthCheck")
.subscribe();

consumer.close();
producer.close();
heathCheckConsumer.close();
heathCheckProducer.close();

Awaitility.await().untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(topic)));
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic));
});

admin.topics().deleteSubscription(topic, "sub");
admin.topics().deleteSubscription(healthCheckTopic, "healthCheck");

Awaitility.await().untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
.contains(topic)));
Awaitility.await().pollDelay(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(healthCheckTopic)));
}
}

0 comments on commit 5835191

Please sign in to comment.