Skip to content

Commit

Permalink
Changing the topic creation flow and optimize heartbeat topic not tri…
Browse files Browse the repository at this point in the history
…gger compaction. (apache#14643)

### Motivation

When create persistent topic, create compaction subscription(line-1395) is before topic `initialize`(line-1397), it's better to do this after `initialize`:
https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1391-L1398
If we change this part, we can optimize the heartbeat topic not trigger the compaction.
apache#13611 has made the heartbeat topic as a system topic to avoid deleting by GC. However, system topic is compacted by default. But the heartbeat topic sends msg without a key, so trigger compaction is meaningful.  So it's better to skip the heartbeat topic to do the compaction.

### Modification
- Move `preCreateSubscriptionForCompactionIfNeeded` after `initialize`.
- Add heartbeat topic not trigger compaction.
  • Loading branch information
Technoboy- authored and nicklixinyang committed Apr 20, 2022
1 parent a9b216b commit 4dba87c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public enum State {
private volatile CompletableFuture<Void> closeFuture;
// key is listener name, value is pulsar address and pulsar ssl address
private Map<String, AdvertisedListener> advertisedListeners;
private NamespaceName heartbeatNamespaceV1;
private NamespaceName heartbeatNamespaceV2;

public PulsarService(ServiceConfiguration config) {
Expand Down Expand Up @@ -708,6 +709,7 @@ public void start() throws PulsarServerException {

this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);

// Refresh addresses and update configuration, since the port might have been dynamically assigned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,44 +1391,40 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
CompletableFuture<Void> preCreateSubForCompaction =
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
CompletableFuture<Void> replicationFuture = persistentTopic
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.checkReplication());

CompletableFuture.allOf(preCreateSubForCompaction, replicationFuture)
.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
}).thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> persistentTopic.checkReplication())
.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
})
.thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing the topic",
topic, FutureUtil.getException(topicFuture));
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
})
.exceptionally((ex) -> {
log.warn("Replication or dedup check failed."
+ " Removing topic from topics list {}, {}", topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});
return null;
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed."
+ " Removing topic from topics list {}, {}",
topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});

return null;
});
} catch (PulsarServerException e) {
log.warn("Failed to create topic {}-{}", topic, e.getMessage());
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,34 +1415,16 @@ public void checkMessageDeduplicationInfo() {
messageDeduplication.purgeInactiveProducers();
}

public CompletableFuture<Boolean> isCompactionEnabled() {
Optional<Long> topicCompactionThreshold = getTopicPolicies()
.map(TopicPolicies::getCompactionThreshold);
if (topicCompactionThreshold.isPresent() && topicCompactionThreshold.get() > 0) {
return CompletableFuture.completedFuture(true);
}

TopicName topicName = TopicName.get(topic);
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> {
if (policies.isPresent()) {
return policies.get().compaction_threshold != null
&& policies.get().compaction_threshold > 0;
} else {
// Check broker default
return brokerService.pulsar().getConfiguration()
.getBrokerServiceCompactionThresholdInBytes() > 0;
}
});
public boolean isCompactionEnabled() {
Long compactionThreshold = topicPolicies.getCompactionThreshold().get();
return compactionThreshold != null && compactionThreshold > 0;
}

public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
long compactionThreshold = topicPolicies.getCompactionThreshold().get();
if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {
if (isCompactionEnabled() && currentCompaction.isDone()) {

long backlogEstimate = 0;

Expand Down Expand Up @@ -1480,17 +1462,11 @@ public CompletableFuture<Void> preCreateSubscriptionForCompactionIfNeeded() {
}

return isCompactionEnabled()
.thenCompose(enabled -> {
if (enabled) {
// If a topic has a compaction policy setup, we must make sure that the compaction cursor
// is pre-created, in order to ensure all the data will be seen by the compactor.
return createSubscription(COMPACTION_SUBSCRIPTION,
CommandSubscribe.InitialPosition.Earliest, false)
.thenCompose(__ -> CompletableFuture.completedFuture(null));
} else {
return CompletableFuture.completedFuture(null);
}
});
// If a topic has a compaction policy setup, we must make sure that the compaction cursor
// is pre-created, in order to ensure all the data will be seen by the compactor.
? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false)
.thenCompose(__ -> CompletableFuture.completedFuture(null))
: CompletableFuture.completedFuture(null);
}

CompletableFuture<Void> startReplicator(String remoteCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;

public class SystemTopic extends PersistentTopic {

Expand Down Expand Up @@ -70,8 +72,13 @@ public CompletableFuture<Void> checkReplication() {
}

@Override
public CompletableFuture<Boolean> isCompactionEnabled() {
// All system topics are using compaction, even though is not explicitly set in the policies.
return CompletableFuture.completedFuture(true);
public boolean isCompactionEnabled() {
// All system topics are using compaction except `HealthCheck`,
// even though is not explicitly set in the policies.
TopicName name = TopicName.get(topic);
NamespaceName heartbeatNamespaceV1 = brokerService.pulsar().getHeartbeatNamespaceV1();
NamespaceName heartbeatNamespaceV2 = brokerService.pulsar().getHeartbeatNamespaceV2();
return !name.getNamespaceObject().equals(heartbeatNamespaceV1)
&& !name.getNamespaceObject().equals(heartbeatNamespaceV2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
Expand All @@ -43,12 +44,16 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand All @@ -62,6 +67,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -1655,4 +1661,21 @@ public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientExcepti
assertNull(none);
}
}

@SneakyThrows
@Test
public void testHealthCheckTopicNotCompacted() {
NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck";
NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck";
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicV1).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicV2).create();
Optional<Topic> topicReferenceV1 = pulsar.getBrokerService().getTopic(topicV1, false).join();
Optional<Topic> topicReferenceV2 = pulsar.getBrokerService().getTopic(topicV2, false).join();
assertFalse(((SystemTopic)topicReferenceV1.get()).isCompactionEnabled());
assertFalse(((SystemTopic)topicReferenceV2.get()).isCompactionEnabled());
producer1.close();
producer2.close();
}
}

0 comments on commit 4dba87c

Please sign in to comment.