diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d5721d249b23e..2a6b89b413371 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -570,13 +570,13 @@ protected CompletableFuture internalGetPartitionedMeta // is a non-partitioned topic so we shouldn't check if the topic exists. return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) .thenCompose(brokerAllowAutoTopicCreation -> { - if (checkAllowAutoCreation) { + if (checkAllowAutoCreation && brokerAllowAutoTopicCreation) { // Whether it exists or not, auto create a non-partitioned topic by client. return CompletableFuture.completedFuture(metadata); } else { // If it does not exist, response a Not Found error. // Otherwise, response a non-partitioned metadata. - return internalCheckTopicExists(topicName).thenApply(__ -> metadata); + return internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata); } }); } @@ -724,6 +724,17 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) protected CompletableFuture internalCheckTopicExists(TopicName topicName) { return pulsar().getNamespaceService().checkTopicExists(topicName) + .thenAccept(info -> { + boolean exists = info.isExists(); + info.recycle(); + if (!exists) { + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); + } + }); + } + + protected CompletableFuture internalCheckNonPartitionedTopicExists(TopicName topicName) { + return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName) .thenAccept(exist -> { if (!exist) { throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); @@ -5541,8 +5552,10 @@ protected CompletableFuture validateShadowTopics(List shadowTopics "Only persistent topic can be set as shadow topic")); } futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName) - .thenAccept(isExists -> { - if (!isExists) { + .thenAccept(info -> { + boolean exists = info.isExists(); + info.recycle(); + if (!exists) { throw new RestException(Status.PRECONDITION_FAILED, "Shadow topic [" + shadowTopic + "] not exists."); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index cae7c651ce791..945ad69477c70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -98,8 +98,20 @@ public void getPartitionedMetadata( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Is check configuration required to automatically create topic") @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) { - super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative, - checkAllowAutoCreation); + validateTopicName(tenant, namespace, encodedTopic); + validateTopicOwnershipAsync(topicName, authoritative).whenComplete((__, ex) -> { + if (ex != null) { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (isNot307And404Exception(actEx)) { + log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, actEx); + } else { + // "super.getPartitionedMetadata" will handle error itself. + super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative, + checkAllowAutoCreation); + } + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 7b2c777414884..9a05c3d992aaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -67,16 +67,22 @@ protected CompletableFuture internalLookupTopicAsync(final TopicName .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())) .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null)) .thenCompose(__ -> { + // Case-1: Non-persistent topic. // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists // in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned, - // we'll return the true flag. - CompletableFuture existFuture = (!topicName.isPersistent() && !topicName.isPartitioned()) - ? CompletableFuture.completedFuture(true) - : pulsar().getNamespaceService().checkTopicExists(topicName) - .thenCompose(exists -> exists ? CompletableFuture.completedFuture(true) - : pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)); - - return existFuture; + // we'll return the true flag. So either it is a partitioned topic or not, the result will be true. + if (!topicName.isPersistent()) { + return CompletableFuture.completedFuture(true); + } + // Case-2: Persistent topic. + return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> { + boolean exists = info.isExists(); + info.recycle(); + if (exists) { + return CompletableFuture.completedFuture(true); + } + return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName); + }); }) .thenCompose(exist -> { if (!exist) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 626a187f46388..c62e9c52a768e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -47,6 +47,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; @@ -68,6 +69,7 @@ import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -117,6 +119,7 @@ * * @see org.apache.pulsar.broker.PulsarService */ +@Slf4j public class NamespaceService implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class); @@ -1312,40 +1315,86 @@ public CompletableFuture> getOwnedTopicListForNamespaceBundle(Names }); } - public CompletableFuture checkTopicExists(TopicName topic) { - CompletableFuture future; - // If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger. - if (topic.isPersistent() && topic.isPartitioned()) { - future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); + /*** + * Check topic exists( partitioned or non-partitioned ). + */ + public CompletableFuture checkTopicExists(TopicName topic) { + return pulsar.getBrokerService() + .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString())) + .thenCompose(metadata -> { + if (metadata.partitions > 0) { + return CompletableFuture.completedFuture( + TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); + } + return checkNonPartitionedTopicExists(topic) + .thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() + : TopicExistsInfo.newTopicNotExists()); + }); + } + + /*** + * Check non-partitioned topic exists. + */ + public CompletableFuture checkNonPartitionedTopicExists(TopicName topic) { + if (topic.isPersistent()) { + return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); } else { - future = CompletableFuture.completedFuture(false); + return checkNonPersistentNonPartitionedTopicExists(topic.toString()); } + } - return future.thenCompose(found -> { - if (found != null && found) { - return CompletableFuture.completedFuture(true); + /** + * Regarding non-persistent topic, we do not know whether it exists or not. Redirect the request to the ownership + * broker of this topic. HTTP API has implemented the mechanism that redirect to ownership broker, so just call + * HTTP API here. + */ + public CompletableFuture checkNonPersistentNonPartitionedTopicExists(String topic) { + TopicName topicName = TopicName.get(topic); + // "non-partitioned & non-persistent" topics only exist on the owner broker. + return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> { + // The current broker is the owner. + if (isOwned) { + CompletableFuture> nonPersistentTopicFuture = pulsar.getBrokerService() + .getTopic(topic, false); + if (nonPersistentTopicFuture != null) { + return nonPersistentTopicFuture.thenApply(Optional::isPresent); + } else { + return CompletableFuture.completedFuture(false); + } } - return pulsar.getBrokerService() - .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName())) - .thenCompose(metadata -> { - if (metadata.partitions > 0) { - return CompletableFuture.completedFuture(true); - } - - if (topic.isPersistent()) { - return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); - } else { - // The non-partitioned non-persistent topic only exist in the broker topics. - CompletableFuture> nonPersistentTopicFuture = - pulsar.getBrokerService().getTopics().get(topic.toString()); - if (nonPersistentTopicFuture == null) { + // Forward to the owner broker. + PulsarClientImpl pulsarClient; + try { + pulsarClient = (PulsarClientImpl) pulsar.getClient(); + } catch (Exception ex) { + // This error will never occur. + log.error("{} Failed to get partition metadata due to create internal admin client fails", topic, ex); + return FutureUtil.failedFuture(ex); + } + LookupOptions lookupOptions = LookupOptions.builder().readOnly(false).authoritative(true).build(); + return getBrokerServiceUrlAsync(TopicName.get(topic), lookupOptions) + .thenCompose(lookupResult -> { + if (!lookupResult.isPresent()) { + log.error("{} Failed to get partition metadata due can not find the owner broker", topic); + return FutureUtil.failedFuture(new ServiceUnitNotReadyException( + "No broker was available to own " + topicName)); + } + return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl()) + .getPartitionedTopicMetadata(topicName, false) + .thenApply(metadata -> true) + .exceptionallyCompose(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { return CompletableFuture.completedFuture(false); } else { - return nonPersistentTopicFuture.thenApply(Optional::isPresent); + log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); + return CompletableFuture.failedFuture(ex); } - } - }); + }); + }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java new file mode 100644 index 0000000000000..1c3f117719e8e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.namespace; + +import io.netty.util.Recycler; +import lombok.Getter; +import org.apache.pulsar.common.policies.data.TopicType; + +public class TopicExistsInfo { + + private static final Recycler RECYCLER = new Recycler<>() { + @Override + protected TopicExistsInfo newObject(Handle handle) { + return new TopicExistsInfo(handle); + } + }; + + private static TopicExistsInfo nonPartitionedExists = new TopicExistsInfo(true, 0); + + private static TopicExistsInfo notExists = new TopicExistsInfo(false, 0); + + public static TopicExistsInfo newPartitionedTopicExists(Integer partitions){ + TopicExistsInfo info = RECYCLER.get(); + info.exists = true; + info.partitions = partitions.intValue(); + return info; + } + + public static TopicExistsInfo newNonPartitionedTopicExists(){ + return nonPartitionedExists; + } + + public static TopicExistsInfo newTopicNotExists(){ + return notExists; + } + + private final Recycler.Handle handle; + + @Getter + private int partitions; + @Getter + private boolean exists; + + private TopicExistsInfo(Recycler.Handle handle) { + this.handle = handle; + } + + private TopicExistsInfo(boolean exists, int partitions) { + this.handle = null; + this.partitions = partitions; + this.exists = exists; + } + + public void recycle() { + if (this == notExists || this == nonPartitionedExists || this.handle == null) { + return; + } + this.exists = false; + this.partitions = 0; + this.handle.recycle(this); + } + + public TopicType getTopicType() { + return this.partitions > 0 ? TopicType.PARTITIONED : TopicType.NON_PARTITIONED; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9da21c35c31aa..7ce9898bb38a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3212,65 +3212,66 @@ public CompletableFuture fetchPartitionedTopicMetadata if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } - return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(policies -> pulsar.getNamespaceService().checkTopicExists(topicName) - .thenCompose(topicExists -> fetchPartitionedTopicMetadataAsync(topicName) - .thenCompose(metadata -> { - CompletableFuture future = new CompletableFuture<>(); - - // There are a couple of potentially blocking calls, which we cannot make from the - // MetadataStore callback thread. - pulsar.getExecutor().execute(() -> { - // If topic is already exist, creating partitioned topic is not allowed. - - if (metadata.partitions == 0 - && !topicExists - && !topicName.isPartitioned() - && pulsar.getBrokerService() - .isDefaultTopicTypePartitioned(topicName, policies)) { - isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> { - if (allowed) { - pulsar.getBrokerService() - .createDefaultPartitionedTopicAsync(topicName, policies) - .thenAccept(md -> future.complete(md)) - .exceptionally(ex -> { - if (ex.getCause() - instanceof MetadataStoreException - .AlreadyExistsException) { - log.info("[{}] The partitioned topic is already" - + " created, try to refresh the cache and read" - + " again.", topicName); - // The partitioned topic might be created concurrently - fetchPartitionedTopicMetadataAsync(topicName, true) - .whenComplete((metadata2, ex2) -> { - if (ex2 == null) { - future.complete(metadata2); - } else { - future.completeExceptionally(ex2); - } - }); - } else { - log.error("[{}] operation of creating partitioned" - + " topic metadata failed", - topicName, ex); - future.completeExceptionally(ex); - } - return null; - }); - } else { - future.complete(metadata); - } - }).exceptionally(ex -> { - future.completeExceptionally(ex); - return null; - }); - } else { - future.complete(metadata); - } - }); + return pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo -> { + final boolean topicExists = topicExistsInfo.isExists(); + final TopicType topicType = topicExistsInfo.getTopicType(); + final Integer partitions = topicExistsInfo.getPartitions(); + topicExistsInfo.recycle(); + + // Topic exists. + if (topicExists) { + if (topicType.equals(TopicType.PARTITIONED)) { + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(partitions)); + } + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + } - return future; - }))); + // Try created if allowed to create a partitioned topic automatically. + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenComposeAsync(policies -> { + return isAllowAutoTopicCreationAsync(topicName, policies).thenComposeAsync(allowed -> { + // Not Allow auto-creation. + if (!allowed) { + // Do not change the original behavior, or default return a non-partitioned topic. + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + } + + // Allow auto create non-partitioned topic. + boolean autoCreatePartitionedTopic = pulsar.getBrokerService() + .isDefaultTopicTypePartitioned(topicName, policies); + if (!autoCreatePartitionedTopic || topicName.isPartitioned()) { + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + } + + // Create partitioned metadata. + return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, policies) + .exceptionallyCompose(ex -> { + // The partitioned topic might be created concurrently. + if (ex.getCause() instanceof MetadataStoreException.AlreadyExistsException) { + log.info("[{}] The partitioned topic is already created, try to refresh the cache" + + " and read again.", topicName); + CompletableFuture recheckFuture = + fetchPartitionedTopicMetadataAsync(topicName, true); + recheckFuture.exceptionally(ex2 -> { + // Just for printing a log if error occurs. + log.error("[{}] Fetch partitioned topic metadata failed", topicName, ex); + return null; + }); + return recheckFuture; + } else { + log.error("[{}] operation of creating partitioned topic metadata failed", + topicName, ex); + return CompletableFuture.failedFuture(ex); + } + }); + }, pulsar.getExecutor()).exceptionallyCompose(ex -> { + log.error("[{}] operation of get partitioned metadata failed due to calling" + + " isAllowAutoTopicCreationAsync failed", + topicName, ex); + return CompletableFuture.failedFuture(ex); + }); + }, pulsar.getExecutor()); + }, pulsar.getExecutor()); } @SuppressWarnings("deprecation") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 52135163a6ab3..2514be55fffa9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -81,8 +81,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.limiter.ConnectionController; -import org.apache.pulsar.broker.resources.NamespaceResources; -import org.apache.pulsar.broker.resources.TopicResources; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -157,6 +156,7 @@ import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.CommandUtils; @@ -582,58 +582,33 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa if (isAuthorized) { // Get if exists, respond not found error if not exists. getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate -> { - boolean autoCreateIfNotExist = partitionMetadata.isMetadataAutoCreationEnabled(); + boolean autoCreateIfNotExist = partitionMetadata.isMetadataAutoCreationEnabled() + && brokerAllowAutoCreate; if (!autoCreateIfNotExist) { - final NamespaceResources namespaceResources = getBrokerService().pulsar() - .getPulsarResources().getNamespaceResources(); - final TopicResources topicResources = getBrokerService().pulsar().getPulsarResources() - .getTopicResources(); - namespaceResources.getPartitionedTopicResources() - .getPartitionedTopicMetadataAsync(topicName, false) - .handle((metadata, getMetadataEx) -> { - if (getMetadataEx != null) { - log.error("{} {} Failed to get partition metadata", topicName, - ServerCnx.this.toString(), getMetadataEx); - writeAndFlush( - Commands.newPartitionMetadataResponse(ServerError.MetadataError, - "Failed to get partition metadata", - requestId)); - } else if (metadata.isPresent()) { - commandSender.sendPartitionMetadataResponse(metadata.get().partitions, - requestId); - } else if (topicName.isPersistent()) { - topicResources.persistentTopicExists(topicName).thenAccept(exists -> { - if (exists) { - commandSender.sendPartitionMetadataResponse(0, requestId); - return; - } - writeAndFlush(Commands.newPartitionMetadataResponse( - ServerError.TopicNotFound, "", requestId)); - }).exceptionally(ex -> { - log.error("{} {} Failed to get partition metadata", topicName, - ServerCnx.this.toString(), ex); - writeAndFlush( - Commands.newPartitionMetadataResponse(ServerError.MetadataError, - "Failed to check partition metadata", - requestId)); - return null; - }); - } else { - // Regarding non-persistent topic, we do not know whether it exists or not. - // Just return a non-partitioned metadata if partitioned metadata does not - // exist. - // Broker will respond a not found error when doing subscribing or producing if - // broker not allow to auto create topics. - commandSender.sendPartitionMetadataResponse(0, requestId); - } - return null; - }).whenComplete((ignore, ignoreEx) -> { - lookupSemaphore.release(); - if (ignoreEx != null) { - log.error("{} {} Failed to handle partition metadata request", topicName, - ServerCnx.this.toString(), ignoreEx); - } - }); + NamespaceService namespaceService = getBrokerService().getPulsar().getNamespaceService(); + namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> { + lookupSemaphore.release(); + if (!topicExistsInfo.isExists()) { + writeAndFlush(Commands.newPartitionMetadataResponse( + ServerError.TopicNotFound, "", requestId)); + } else if (topicExistsInfo.getTopicType().equals(TopicType.PARTITIONED)) { + commandSender.sendPartitionMetadataResponse(topicExistsInfo.getPartitions(), + requestId); + } else { + commandSender.sendPartitionMetadataResponse(0, requestId); + } + // release resources. + topicExistsInfo.recycle(); + }).exceptionally(ex -> { + lookupSemaphore.release(); + log.error("{} {} Failed to get partition metadata", topicName, + ServerCnx.this.toString(), ex); + writeAndFlush( + Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Failed to get partition metadata", + requestId)); + return null; + }); } else { // Get if exists, create a new one if not exists. unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java new file mode 100644 index 0000000000000..28cf91ee165e2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.net.URL; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +@Slf4j +public class GetPartitionMetadataMultiBrokerTest extends GetPartitionMetadataTest { + + private PulsarService pulsar2; + private URL url2; + private PulsarAdmin admin2; + private PulsarClientImpl clientWithHttpLookup2; + private PulsarClientImpl clientWitBinaryLookup2; + + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void cleanupBrokers() throws Exception { + // Cleanup broker2. + if (clientWithHttpLookup2 != null) { + clientWithHttpLookup2.close(); + clientWithHttpLookup2 = null; + } + if (clientWitBinaryLookup2 != null) { + clientWitBinaryLookup2.close(); + clientWitBinaryLookup2 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + + // Super cleanup. + super.cleanupBrokers(); + } + + @Override + protected void setupBrokers() throws Exception { + super.setupBrokers(); + doInitConf(); + pulsar2 = new PulsarService(conf); + pulsar2.start(); + url2 = new URL(pulsar2.getWebServiceAddress()); + admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); + clientWithHttpLookup2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar2.getWebServiceAddress()).build(); + clientWitBinaryLookup2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build(); + } + + @Override + protected PulsarClientImpl[] getClientsToTest() { + return new PulsarClientImpl[] {clientWithHttpLookup1, clientWitBinaryLookup1, + clientWithHttpLookup2, clientWitBinaryLookup2}; + } + + protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) { + if (isUsingHttpLookup) { + return new PulsarClientImpl[]{clientWithHttpLookup1, clientWithHttpLookup2}; + } else { + return new PulsarClientImpl[]{clientWitBinaryLookup1, clientWitBinaryLookup2}; + } + } + + @Override + protected int getLookupRequestPermits() { + return pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits() + + pulsar2.getBrokerService().getLookupRequestSemaphore().availablePermits(); + } + + protected void verifyPartitionsNeverCreated(String topicNameStr) throws Exception { + TopicName topicName = TopicName.get(topicNameStr); + try { + List topicList = admin1.topics().getList("public/default"); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + } catch (Exception ex) { + // If the namespace bundle has not been loaded yet, it means no non-persistent topic was created. So + // this behavior is also correct. + // This error is not expected, a seperated PR is needed to fix this issue. + assertTrue(ex.getMessage().contains("Failed to find ownership for")); + } + } + + protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr) throws Exception { + TopicName topicName = TopicName.get(topicNameStr); + try { + List topicList = admin1.topics().getList("public/default"); + assertFalse(topicList.contains(topicName.getPartitionedTopicName())); + } catch (Exception ex) { + // If the namespace bundle has not been loaded yet, it means no non-persistent topic was created. So + // this behavior is also correct. + // This error is not expected, a seperated PR is needed to fix this issue. + assertTrue(ex.getMessage().contains("Failed to find ownership for")); + } + } + + protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation, + TopicType allowAutoTopicCreationType, + int defaultNumPartitions) throws Exception { + doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation, allowAutoTopicCreationType, + defaultNumPartitions); + doModifyTopicAutoCreation(admin2, pulsar2, allowAutoTopicCreation, allowAutoTopicCreationType, + defaultNumPartitions); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "topicDomains") + public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) throws Exception { + super.testAutoCreatingMetadataWhenCallingOldAPI(topicDomain); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "autoCreationParamsAll", enabled = false) + public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation, paramMetadataAutoCreationEnabled, + isUsingHttpLookup, topicDomain); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "autoCreationParamsAll") + public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation, paramMetadataAutoCreationEnabled, + isUsingHttpLookup, topicDomain); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "clients") + public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + super.testAutoCreatePartitionedTopic(isUsingHttpLookup, topicDomain); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "clients") + public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + super.testAutoCreateNonPartitionedTopic(isUsingHttpLookup, topicDomain); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "autoCreationParamsNotAllow") + public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + super.testGetMetadataIfNotAllowedCreate(configAllowAutoTopicCreation, paramMetadataAutoCreationEnabled, + isUsingHttpLookup); + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "autoCreationParamsNotAllow") + public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation, + paramMetadataAutoCreationEnabled, isUsingHttpLookup); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 51f643d2b7823..bf99b172829a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -22,70 +22,150 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Sets; +import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.Semaphore; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-admin") @Slf4j -public class GetPartitionMetadataTest extends ProducerConsumerBase { +public class GetPartitionMetadataTest { - private static final String DEFAULT_NS = "public/default"; + protected static final String DEFAULT_NS = "public/default"; - private PulsarClientImpl clientWithHttpLookup; - private PulsarClientImpl clientWitBinaryLookup; + protected String clusterName = "c1"; - @Override + protected LocalBookkeeperEnsemble bkEnsemble; + + protected ServiceConfiguration conf = new ServiceConfiguration(); + + protected PulsarService pulsar1; + protected URL url1; + protected PulsarAdmin admin1; + protected PulsarClientImpl clientWithHttpLookup1; + protected PulsarClientImpl clientWitBinaryLookup1; + + @BeforeClass(alwaysRun = true) protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - clientWithHttpLookup = - (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); - clientWitBinaryLookup = - (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + // Start broker. + setupBrokers(); + // Create default NS. + admin1.clusters().createCluster(clusterName, new ClusterDataImpl()); + admin1.tenants().createTenant(NamespaceName.get(DEFAULT_NS).getTenant(), + new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(clusterName))); + admin1.namespaces().createNamespace(DEFAULT_NS); } - @Override - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - super.internalCleanup(); - if (clientWithHttpLookup != null) { - clientWithHttpLookup.close(); + cleanupBrokers(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } + } + + protected void cleanupBrokers() throws Exception { + // Cleanup broker2. + if (clientWithHttpLookup1 != null) { + clientWithHttpLookup1.close(); + clientWithHttpLookup1 = null; + } + if (clientWitBinaryLookup1 != null) { + clientWitBinaryLookup1.close(); + clientWitBinaryLookup1 = null; } - if (clientWitBinaryLookup != null) { - clientWitBinaryLookup.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } + // Reset configs. + conf = new ServiceConfiguration(); + } + + protected void setupBrokers() throws Exception { + doInitConf(); + // Start broker. + pulsar1 = new PulsarService(conf); + pulsar1.start(); + url1 = new URL(pulsar1.getWebServiceAddress()); + admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); + clientWithHttpLookup1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build(); + clientWitBinaryLookup1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build(); } - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); + protected void doInitConf() { + conf.setClusterName(clusterName); + conf.setAdvertisedAddress("localhost"); + conf.setBrokerServicePort(Optional.of(0)); + conf.setWebServicePort(Optional.of(0)); + conf.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + conf.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort() + "/foo"); + conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerSheddingEnabled(false); } - private LookupService getLookupService(boolean isUsingHttpLookup) { + protected PulsarClientImpl[] getClientsToTest() { + return new PulsarClientImpl[] {clientWithHttpLookup1, clientWitBinaryLookup1}; + } + + protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) { if (isUsingHttpLookup) { - return clientWithHttpLookup.getLookup(); + return new PulsarClientImpl[] {clientWithHttpLookup1}; } else { - return clientWitBinaryLookup.getLookup(); + return new PulsarClientImpl[] {clientWitBinaryLookup1}; } + + } + + protected int getLookupRequestPermits() { + return pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits(); + } + + protected void verifyPartitionsNeverCreated(String topicNameStr) throws Exception { + TopicName topicName = TopicName.get(topicNameStr); + List topicList = admin1.topics().getList("public/default"); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + } + + protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr) throws Exception { + TopicName topicName = TopicName.get(topicNameStr); + List topicList = admin1.topics().getList("public/default"); + assertFalse(topicList.contains(topicName.getPartitionedTopicName())); } @DataProvider(name = "topicDomains") @@ -96,43 +176,53 @@ public Object[][] topicDomains() { }; } - @Test(dataProvider = "topicDomains") - public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(true); - setup(); - - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); - - // HTTP client. - final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - clientWithHttpLookup.getPartitionsForTopic(tp1).join(); - Optional metadata1 = pulsar.getPulsarResources().getNamespaceResources() - .getPartitionedTopicResources() - .getPartitionedTopicMetadataAsync(TopicName.get(tp1), true).join(); - assertTrue(metadata1.isPresent()); - assertEquals(metadata1.get().partitions, 3); - - // Binary client. - final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - clientWitBinaryLookup.getPartitionsForTopic(tp2).join(); - Optional metadata2 = pulsar.getPulsarResources().getNamespaceResources() - .getPartitionedTopicResources() - .getPartitionedTopicMetadataAsync(TopicName.get(tp2), true).join(); - assertTrue(metadata2.isPresent()); - assertEquals(metadata2.get().partitions, 3); - - // Verify: lookup semaphore has been releases. + protected static void doModifyTopicAutoCreation(PulsarAdmin admin1, PulsarService pulsar1, + boolean allowAutoTopicCreation, TopicType allowAutoTopicCreationType, + int defaultNumPartitions) throws Exception { + admin1.brokers().updateDynamicConfiguration( + "allowAutoTopicCreation", allowAutoTopicCreation + ""); + admin1.brokers().updateDynamicConfiguration( + "allowAutoTopicCreationType", allowAutoTopicCreationType + ""); + admin1.brokers().updateDynamicConfiguration( + "defaultNumPartitions", defaultNumPartitions + ""); Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); + assertEquals(pulsar1.getConfiguration().isAllowAutoTopicCreation(), allowAutoTopicCreation); + assertEquals(pulsar1.getConfiguration().getAllowAutoTopicCreationType(), allowAutoTopicCreationType); + assertEquals(pulsar1.getConfiguration().getDefaultNumPartitions(), defaultNumPartitions); }); + } - // Cleanup. - admin.topics().deletePartitionedTopic(tp1, false); - admin.topics().deletePartitionedTopic(tp2, false); + protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation, + TopicType allowAutoTopicCreationType, + int defaultNumPartitions) throws Exception { + doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation, allowAutoTopicCreationType, + defaultNumPartitions); + } + + @Test(dataProvider = "topicDomains") + public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) throws Exception { + modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3); + + int lookupPermitsBefore = getLookupRequestPermits(); + + for (PulsarClientImpl client : getClientsToTest()) { + // Verify: the behavior of topic creation. + final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + client.getPartitionsForTopic(tp).join(); + Optional metadata1 = pulsar1.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join(); + assertTrue(metadata1.isPresent()); + assertEquals(metadata1.get().partitions, 3); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + + // Cleanup. + admin1.topics().deletePartitionedTopic(tp, false); + } } @DataProvider(name = "autoCreationParamsAll") @@ -163,40 +253,32 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); - setup(); + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); + int lookupPermitsBefore = getLookupRequestPermits(); - LookupService lookup = getLookupService(isUsingHttpLookup); // Create topic. - final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - admin.topics().createNonPartitionedTopic(topicNameStr); - // Verify. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - PartitionedTopicMetadata response = - lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); - assertEquals(response.partitions, 0); - List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - assertFalse(partitionedTopics.contains(topicNameStr)); - List topicList = admin.topics().getList("public/default"); - for (int i = 0; i < 3; i++) { - assertFalse(topicList.contains(topicName.getPartition(i))); - } + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameStr); + + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response = + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 0); + List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + verifyPartitionsNeverCreated(topicNameStr); - // Verify: lookup semaphore has been releases. - Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); - }); + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } // Cleanup. - client.close(); - admin.topics().delete(topicNameStr, false); + admin1.topics().delete(topicNameStr, false); } @Test(dataProvider = "autoCreationParamsAll") @@ -204,36 +286,30 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); - setup(); + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); + int lookupPermitsBefore = getLookupRequestPermits(); - LookupService lookup = getLookupService(isUsingHttpLookup); // Create topic. final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - admin.topics().createPartitionedTopic(topicNameStr, 3); - // Verify. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - PartitionedTopicMetadata response = - lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); - assertEquals(response.partitions, 3); - List topicList = admin.topics().getList("public/default"); - assertFalse(topicList.contains(topicNameStr)); - - // Verify: lookup semaphore has been releases. - Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); - }); + admin1.topics().createPartitionedTopic(topicNameStr, 3); + + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response = + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 3); + verifyNonPartitionedTopicNeverCreated(topicNameStr); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } // Cleanup. - client.close(); - admin.topics().deletePartitionedTopic(topicNameStr, false); + admin1.topics().deletePartitionedTopic(topicNameStr, false); } @DataProvider(name = "clients") @@ -247,76 +323,96 @@ public Object[][] clients(){ @Test(dataProvider = "clients") public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(true); - setup(); - - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); - - LookupService lookup = getLookupService(isUsingHttpLookup); - // Create topic. - final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - // Verify. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); - assertEquals(response.partitions, 3); - List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - assertTrue(partitionedTopics.contains(topicNameStr)); - List topicList = admin.topics().getList("public/default"); - assertFalse(topicList.contains(topicNameStr)); - for (int i = 0; i < 3; i++) { + modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3); + + int lookupPermitsBefore = getLookupRequestPermits(); + + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Case-1: normal topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + assertEquals(response.partitions, 3); + // Verify: the behavior of topic creation. + List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); + assertTrue(partitionedTopics.contains(topicNameStr)); + verifyNonPartitionedTopicNeverCreated(topicNameStr); // The API "getPartitionedTopicMetadata" only creates the partitioned metadata, it will not create the // partitions. - assertFalse(topicList.contains(topicName.getPartition(i))); + verifyPartitionsNeverCreated(topicNameStr); + + // Case-2: topic with suffix "-partition-1". + final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName( + topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response2 = + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + assertEquals(response2.partitions, 0); + // Verify: the behavior of topic creation. + List partitionedTopics2 = + admin1.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix)); + assertFalse(partitionedTopics2.contains( + TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName())); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + // Cleanup. + admin1.topics().deletePartitionedTopic(topicNameStr, false); + try { + admin1.topics().delete(topicNameStrWithSuffix, false); + } catch (Exception ex) {} } - // Verify: lookup semaphore has been releases. - Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); - }); - - // Cleanup. - client.close(); - admin.topics().deletePartitionedTopic(topicNameStr, false); } @Test(dataProvider = "clients") public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - conf.setAllowAutoTopicCreation(true); - setup(); - - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); - - LookupService lookup = getLookupService(isUsingHttpLookup); - // Create topic. - final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - // Verify. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); - assertEquals(response.partitions, 0); - List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - assertFalse(partitionedTopics.contains(topicNameStr)); - List topicList = admin.topics().getList("public/default"); - assertFalse(topicList.contains(topicNameStr)); - - // Verify: lookup semaphore has been releases. - Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); - }); - - // Cleanup. - client.close(); - try { - admin.topics().delete(topicNameStr, false); - } catch (Exception ex) {} + modifyTopicAutoCreation(true, TopicType.NON_PARTITIONED, 3); + + int lookupPermitsBefore = getLookupRequestPermits(); + + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Case 1: normal topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + assertEquals(response.partitions, 0); + // Verify: the behavior of topic creation. + List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + verifyPartitionsNeverCreated(topicNameStr); + + // Case-2: topic with suffix "-partition-1". + final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName( + topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; + // Verify: the result of get partitioned topic metadata. + PartitionedTopicMetadata response2 = + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + assertEquals(response2.partitions, 0); + // Verify: the behavior of topic creation. + List partitionedTopics2 = + admin1.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix)); + assertFalse(partitionedTopics2.contains( + TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName())); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + // Cleanup. + try { + admin1.topics().delete(topicNameStr, false); + } catch (Exception ex) {} + try { + admin1.topics().delete(topicNameStrWithSuffix, false); + } catch (Exception ex) {} + } } @DataProvider(name = "autoCreationParamsNotAllow") @@ -336,64 +432,38 @@ public Object[][] autoCreationParamsNotAllow(){ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup) throws Exception { - if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { - // These test cases are for the following PR. - // Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206. - return; - } - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); - setup(); - - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); - - LookupService lookup = getLookupService(isUsingHttpLookup); - // Define topic. - final String topicNameStr = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - // Verify. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - try { - lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); - fail("Expect a not found exception"); - } catch (Exception e) { - log.warn("", e); - Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); - assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException - || unwrapEx instanceof PulsarClientException.NotFoundException); - } + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); - List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName); - assertFalse(partitionedTopics.contains(topicNameStr)); - List topicList = admin.topics().getList("public/default"); - assertFalse(topicList.contains(topicNameStr)); - for (int i = 0; i < 3; i++) { - assertFalse(topicList.contains(topicName.getPartition(i))); - } + int lookupPermitsBefore = getLookupRequestPermits(); - // Verify: lookup semaphore has been releases. - Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); - }); - - // Cleanup. - client.close(); - } + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify: the result of get partitioned topic metadata. + try { + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) + .join(); + fail("Expect a not found exception"); + } catch (Exception e) { + Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); + } + // Verify: the behavior of topic creation. + List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); + pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName); + assertFalse(partitionedTopics.contains(topicNameStr)); + verifyNonPartitionedTopicNeverCreated(topicNameStr); + verifyPartitionsNeverCreated(topicNameStr); - @DataProvider(name = "autoCreationParamsForNonPersistentTopic") - public Object[][] autoCreationParamsForNonPersistentTopic(){ - return new Object[][]{ - // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. - {true, true, true}, - {true, true, false}, - {false, true, true}, - {false, true, false}, - {false, false, true} - }; + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } } /** @@ -408,66 +478,46 @@ public Object[][] autoCreationParamsForNonPersistentTopic(){ * param-auto-create = false * HTTP API: not found error * binary API: not support - * This test only guarantees that the behavior is the same as before. The following separated PR will fix the - * incorrect behavior. + * After PIP-344, the behavior will be the same as persistent topics, which was described in PIP-344. */ - @Test(dataProvider = "autoCreationParamsForNonPersistentTopic") - public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + @Test(dataProvider = "autoCreationParamsNotAllow") + public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup) throws Exception { - conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - conf.setDefaultNumPartitions(3); - conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); - setup(); - - Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); - int lookupPermitsBefore = semaphore.availablePermits(); - - LookupService lookup = getLookupService(isUsingHttpLookup); - // Define topic. - final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); - final TopicName topicName = TopicName.get(topicNameStr); - // Verify. - // Regarding non-persistent topic, we do not know whether it exists or not. - // Broker will return a non-partitioned metadata if partitioned metadata does not exist. - PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); - - if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) { + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); + + int lookupPermitsBefore = getLookupRequestPermits(); + + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify: the result of get partitioned topic metadata. try { - lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled) + PartitionedTopicMetadata topicMetadata = client + .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) .join(); - Assert.fail("Expected a not found ex"); + log.info("Get topic metadata: {}", topicMetadata.partitions); + fail("Expected a not found ex"); } catch (Exception ex) { - // Cleanup. - client.close(); - return; + Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); } - } - PartitionedTopicMetadata metadata = lookup - .getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); - if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { - assertEquals(metadata.partitions, 3); - } else { - assertEquals(metadata.partitions, 0); - } - - List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); - pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(topicName); - if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { - assertTrue(partitionedTopics.contains(topicNameStr)); - } else { + // Verify: the behavior of topic creation. + List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); + pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName); assertFalse(partitionedTopics.contains(topicNameStr)); + verifyNonPartitionedTopicNeverCreated(topicNameStr); + verifyPartitionsNeverCreated(topicNameStr); } // Verify: lookup semaphore has been releases. Awaitility.await().untilAsserted(() -> { - int lookupPermitsAfter = semaphore.availablePermits(); - assertEquals(lookupPermitsAfter, lookupPermitsBefore); + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); }); - - // Cleanup. - client.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 9aa29f08c5ce8..c9457e1a8883f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -357,9 +358,12 @@ public void testLookUpWithException() throws Exception { CompletableFuture future = new CompletableFuture(); future.completeExceptionally(new BrokerServiceException("Fake Exception")); CompletableFuture existFuture = new CompletableFuture(); - existFuture.complete(true); + existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists()); doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any()); doReturn(existFuture).when(nameSpaceService).checkTopicExists(any()); + CompletableFuture existBooleanFuture = new CompletableFuture(); + existBooleanFuture.complete(false); + doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any()); doReturn(nameSpaceService).when(pulsar).getNamespaceService(); AsyncResponse asyncResponse = mock(AsyncResponse.class); ProducerMessages producerMessages = new ProducerMessages(); @@ -370,7 +374,7 @@ public void testLookUpWithException() throws Exception { topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class); verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getMessage(), "Can't find owner of given topic."); + Assert.assertTrue(responseCaptor.getValue().getMessage().contains(topicName + " not found")); } @Test @@ -378,8 +382,11 @@ public void testLookUpTopicNotExist() throws Exception { String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName; NamespaceService nameSpaceService = mock(NamespaceService.class); CompletableFuture existFuture = new CompletableFuture(); - existFuture.complete(false); + existFuture.complete(TopicExistsInfo.newTopicNotExists()); + CompletableFuture existBooleanFuture = new CompletableFuture(); + existBooleanFuture.complete(false); doReturn(existFuture).when(nameSpaceService).checkTopicExists(any()); + doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any()); doReturn(nameSpaceService).when(pulsar).getNamespaceService(); AsyncResponse asyncResponse = mock(AsyncResponse.class); ProducerMessages producerMessages = new ProducerMessages(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index 6a6065bc289f6..783a92b485ed8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -44,6 +44,7 @@ import org.apache.pulsar.broker.lookup.RedirectData; import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.namespace.TopicExistsInfo; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; @@ -149,9 +150,12 @@ public void testLookupTopicNotExist() throws Exception { config.setAuthorizationEnabled(true); NamespaceService namespaceService = pulsar.getNamespaceService(); - CompletableFuture future = new CompletableFuture<>(); - future.complete(false); + CompletableFuture future = new CompletableFuture<>(); + future.complete(TopicExistsInfo.newTopicNotExists()); doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + CompletableFuture booleanFuture = new CompletableFuture<>(); + booleanFuture.complete(false); + doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class)); AsyncResponse asyncResponse1 = mock(AsyncResponse.class); destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false, null, null); @@ -260,9 +264,12 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception { policies3Future.complete(Optional.of(policies3)); doReturn(policies3Future).when(namespaceResources).getPoliciesAsync(namespaceName2); NamespaceService namespaceService = pulsar.getNamespaceService(); - CompletableFuture future = new CompletableFuture<>(); - future.complete(false); + CompletableFuture future = new CompletableFuture<>(); + future.complete(TopicExistsInfo.newTopicNotExists()); doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + CompletableFuture booleanFuture = new CompletableFuture<>(); + booleanFuture.complete(false); + doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class)); destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2, "invalid-localCluster", false, null, null); verify(asyncResponse).resume(arg.capture()); @@ -278,4 +285,35 @@ public void testDataPojo() { assertEquals(data2.getRedirectLookupAddress(), url); } + @Test + public void topicNotFound() throws Exception { + MockTopicLookup destLookup = spy(MockTopicLookup.class); + doReturn(false).when(destLookup).isRequestHttps(); + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(new Semaphore(1000,true)).when(brokerService).getLookupRequestSemaphore(); + destLookup.setPulsar(pulsar); + doReturn("null").when(destLookup).clientAppId(); + Field uriField = PulsarWebResource.class.getDeclaredField("uri"); + uriField.setAccessible(true); + UriInfo uriInfo = mock(UriInfo.class); + uriField.set(destLookup, uriInfo); + URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1"); + doReturn(uri).when(uriInfo).getRequestUri(); + config.setAuthorizationEnabled(true); + NamespaceService namespaceService = pulsar.getNamespaceService(); + CompletableFuture future = new CompletableFuture<>(); + future.complete(TopicExistsInfo.newTopicNotExists()); + doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + + // Get the current semaphore first + Integer state1 = pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits(); + AsyncResponse asyncResponse1 = mock(AsyncResponse.class); + // We used a nonexistent topic to test + destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic2", false, null, null); + // Gets semaphore status + Integer state2 = pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits(); + // If it is successfully released, it should be equal + assertEquals(state1, state2); + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 2a8a849ef9c06..38a60165d5606 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -816,14 +816,15 @@ public void testCheckTopicExists(String topicDomain) throws Exception { String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); Awaitility.await().untilAsserted(() -> { - assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get()); + assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists()); }); String partitionedTopic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); admin.topics().createPartitionedTopic(partitionedTopic, 5); Awaitility.await().untilAsserted(() -> { - assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get()); - assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get()); + assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists()); + assertTrue(pulsar.getNamespaceService() + .checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get().isExists()); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java index 7790940c1327f..8fdf0723ea8d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -99,6 +100,7 @@ public void testCreateConsumerAfterOnePartDeleted() throws Exception { Consumer consumerAllPartition = pulsarClient.newConsumer(Schema.STRING).topic(topic) .subscriptionName(subscription).isAckReceiptEnabled(true).subscribe(); Message msg = consumerAllPartition.receive(2, TimeUnit.SECONDS); + assertNotNull(msg); String receivedMsgValue = msg.getValue(); log.info("received msg: {}", receivedMsgValue); consumerAllPartition.acknowledge(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 7735f66e7838a..4d6cf96a01068 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -32,6 +32,7 @@ import lombok.Getter; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -58,7 +59,6 @@ import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.FutureUtil; @Getter(AccessLevel.PUBLIC) @@ -104,6 +104,31 @@ public Consumer subscribe() throws PulsarClientException { } } + private CompletableFuture checkDlqAlreadyExists(String topic) { + CompletableFuture existsFuture = new CompletableFuture<>(); + client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata -> { + TopicName topicName = TopicName.get(topic); + if (topicName.isPersistent()) { + // Either partitioned or non-partitioned, it exists. + existsFuture.complete(true); + } else { + // If it is a non-persistent topic, return true only it is a partitioned topic. + existsFuture.complete(metadata != null && metadata.partitions > 0); + } + }).exceptionally(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { + existsFuture.complete(false); + } else { + existsFuture.completeExceptionally(ex); + } + return null; + }); + return existsFuture; + } + @Override public CompletableFuture> subscribeAsync() { if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null) { @@ -135,20 +160,18 @@ public CompletableFuture> subscribeAsync() { DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { - CompletableFuture retryLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldRetryLetterTopic, true); - CompletableFuture deadLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldDeadLetterTopic, true); + CompletableFuture retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); + CompletableFuture deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; - if (retryLetterTopicMetadata.join().partitions > 0) { + if (retryLetterTopicMetadata.join()) { retryLetterTopic = oldRetryLetterTopic; } - if (deadLetterTopicMetadata.join().partitions > 0) { + if (deadLetterTopicMetadata.join()) { deadLetterTopic = oldDeadLetterTopic; } if (deadLetterPolicy == null) {