Skip to content

Commit

Permalink
[fix] [broker] response not-found error if topic does not exist when …
Browse files Browse the repository at this point in the history
…calling getPartitionedTopicMetadata (apache#22838)

(cherry picked from commit 9aed736)
(cherry picked from commit 1d2959b)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jul 1, 2024
1 parent 87013b4 commit b9c9930
Show file tree
Hide file tree
Showing 14 changed files with 928 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,13 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
// is a non-partitioned topic so we shouldn't check if the topic exists.
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(brokerAllowAutoTopicCreation -> {
if (checkAllowAutoCreation) {
if (checkAllowAutoCreation && brokerAllowAutoTopicCreation) {
// Whether it exists or not, auto create a non-partitioned topic by client.
return CompletableFuture.completedFuture(metadata);
} else {
// If it does not exist, response a Not Found error.
// Otherwise, response a non-partitioned metadata.
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
return internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata);
}
});
}
Expand Down Expand Up @@ -724,6 +724,17 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
if (!exists) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}

protected CompletableFuture<Void> internalCheckNonPartitionedTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
.thenAccept(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
Expand Down Expand Up @@ -5541,8 +5552,10 @@ protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics
"Only persistent topic can be set as shadow topic"));
}
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
.thenAccept(isExists -> {
if (!isExists) {
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
if (!exists) {
throw new RestException(Status.PRECONDITION_FAILED,
"Shadow topic [" + shadowTopic + "] not exists.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,20 @@ public void getPartitionedMetadata(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
checkAllowAutoCreation);
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOwnershipAsync(topicName, authoritative).whenComplete((__, ex) -> {
if (ex != null) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (isNot307And404Exception(actEx)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, actEx);
} else {
// "super.getPartitionedMetadata" will handle error itself.
super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
checkAllowAutoCreation);
}
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,22 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null))
.thenCompose(__ -> {
// Case-1: Non-persistent topic.
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
// we'll return the true flag.
CompletableFuture<Boolean> existFuture = (!topicName.isPersistent() && !topicName.isPartitioned())
? CompletableFuture.completedFuture(true)
: pulsar().getNamespaceService().checkTopicExists(topicName)
.thenCompose(exists -> exists ? CompletableFuture.completedFuture(true)
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));

return existFuture;
// we'll return the true flag. So either it is a partitioned topic or not, the result will be true.
if (!topicName.isPersistent()) {
return CompletableFuture.completedFuture(true);
}
// Case-2: Persistent topic.
return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
boolean exists = info.isExists();
info.recycle();
if (exists) {
return CompletableFuture.completedFuture(true);
}
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName);
});
})
.thenCompose(exist -> {
if (!exist) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -68,6 +69,7 @@
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -117,6 +119,7 @@
*
* @see org.apache.pulsar.broker.PulsarService
*/
@Slf4j
public class NamespaceService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);

Expand Down Expand Up @@ -1312,40 +1315,86 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
});
}

public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
CompletableFuture<Boolean> future;
// If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger.
if (topic.isPersistent() && topic.isPartitioned()) {
future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
/***
* Check topic exists( partitioned or non-partitioned ).
*/
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
}
return checkNonPartitionedTopicExists(topic)
.thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists()
: TopicExistsInfo.newTopicNotExists());
});
}

/***
* Check non-partitioned topic exists.
*/
public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topic) {
if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
future = CompletableFuture.completedFuture(false);
return checkNonPersistentNonPartitionedTopicExists(topic.toString());
}
}

return future.thenCompose(found -> {
if (found != null && found) {
return CompletableFuture.completedFuture(true);
/**
* Regarding non-persistent topic, we do not know whether it exists or not. Redirect the request to the ownership
* broker of this topic. HTTP API has implemented the mechanism that redirect to ownership broker, so just call
* HTTP API here.
*/
public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String topic) {
TopicName topicName = TopicName.get(topic);
// "non-partitioned & non-persistent" topics only exist on the owner broker.
return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> {
// The current broker is the owner.
if (isOwned) {
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = pulsar.getBrokerService()
.getTopic(topic, false);
if (nonPersistentTopicFuture != null) {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
} else {
return CompletableFuture.completedFuture(false);
}
}

return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(true);
}

if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
// The non-partitioned non-persistent topic only exist in the broker topics.
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (nonPersistentTopicFuture == null) {
// Forward to the owner broker.
PulsarClientImpl pulsarClient;
try {
pulsarClient = (PulsarClientImpl) pulsar.getClient();
} catch (Exception ex) {
// This error will never occur.
log.error("{} Failed to get partition metadata due to create internal admin client fails", topic, ex);
return FutureUtil.failedFuture(ex);
}
LookupOptions lookupOptions = LookupOptions.builder().readOnly(false).authoritative(true).build();
return getBrokerServiceUrlAsync(TopicName.get(topic), lookupOptions)
.thenCompose(lookupResult -> {
if (!lookupResult.isPresent()) {
log.error("{} Failed to get partition metadata due can not find the owner broker", topic);
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(
"No broker was available to own " + topicName));
}
return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
.getPartitionedTopicMetadata(topicName, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof PulsarClientException.NotFoundException
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
return CompletableFuture.failedFuture(ex);
}
}
});
});
});
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;

import io.netty.util.Recycler;
import lombok.Getter;
import org.apache.pulsar.common.policies.data.TopicType;

public class TopicExistsInfo {

private static final Recycler<TopicExistsInfo> RECYCLER = new Recycler<>() {
@Override
protected TopicExistsInfo newObject(Handle<TopicExistsInfo> handle) {
return new TopicExistsInfo(handle);
}
};

private static TopicExistsInfo nonPartitionedExists = new TopicExistsInfo(true, 0);

private static TopicExistsInfo notExists = new TopicExistsInfo(false, 0);

public static TopicExistsInfo newPartitionedTopicExists(Integer partitions){
TopicExistsInfo info = RECYCLER.get();
info.exists = true;
info.partitions = partitions.intValue();
return info;
}

public static TopicExistsInfo newNonPartitionedTopicExists(){
return nonPartitionedExists;
}

public static TopicExistsInfo newTopicNotExists(){
return notExists;
}

private final Recycler.Handle<TopicExistsInfo> handle;

@Getter
private int partitions;
@Getter
private boolean exists;

private TopicExistsInfo(Recycler.Handle<TopicExistsInfo> handle) {
this.handle = handle;
}

private TopicExistsInfo(boolean exists, int partitions) {
this.handle = null;
this.partitions = partitions;
this.exists = exists;
}

public void recycle() {
if (this == notExists || this == nonPartitionedExists || this.handle == null) {
return;
}
this.exists = false;
this.partitions = 0;
this.handle.recycle(this);
}

public TopicType getTopicType() {
return this.partitions > 0 ? TopicType.PARTITIONED : TopicType.NON_PARTITIONED;
}
}
Loading

0 comments on commit b9c9930

Please sign in to comment.