Skip to content

Commit

Permalink
optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jun 13, 2024
1 parent 1352e1f commit 40ce065
Showing 1 changed file with 37 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,12 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c

/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 2. If the topic metadata not exists, and {@code createIfMissing} is false,
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException.
* 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 3. If the topic metadata not exists, and {@code createIfMissing} is false,
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
* 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* Any exceptions will remove the topicFuture from the map.
*
* @param topicName The name of the topic, potentially including partition information.
Expand All @@ -1025,6 +1026,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
Expand All @@ -1039,36 +1047,40 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
CompletableFuture<Optional<Topic>> topicFuture = topics.computeIfAbsent(topicName.toString(),
(tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies));
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
// Tips: Do not convert the `topicFuture` here; we need to ensure that the future
// placed in the map is consistent with the one returned by `loadOrCreatePersistentTopic`.
// Otherwise, if any exceptions occur, it will not be correctly removed from the topics.
});
return topicFuture;
}
});
} else {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies));
}
});
});
} else {
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
}
Expand Down Expand Up @@ -1253,14 +1265,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
topicFuture.completeExceptionally(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
try {
Expand Down Expand Up @@ -1533,15 +1537,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(
"Broker is unable to load persistent topic"));
return topicFuture;
}

checkTopicNsOwnership(topic)
.thenRun(() -> {
Expand Down

0 comments on commit 40ce065

Please sign in to comment.