diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f20897634d685..d5721d249b23e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -557,19 +557,29 @@ protected CompletableFuture internalGetPartitionedMeta boolean checkAllowAutoCreation) { return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation) .thenCompose(metadata -> { - CompletableFuture ret; - if (metadata.partitions == 0 && !checkAllowAutoCreation) { + if (metadata.partitions > 1) { + // Some clients does not support partitioned topic. + return internalValidateClientVersionAsync().thenApply(__ -> metadata); + } else if (metadata.partitions == 1) { + return CompletableFuture.completedFuture(metadata); + } else { + // metadata.partitions == 0 // The topic may be a non-partitioned topic, so check if it exists here. // However, when checkAllowAutoCreation is true, the client will create the topic if // it doesn't exist. In this case, `partitions == 0` means the automatically created topic // is a non-partitioned topic so we shouldn't check if the topic exists. - ret = internalCheckTopicExists(topicName); - } else if (metadata.partitions > 1) { - ret = internalValidateClientVersionAsync(); - } else { - ret = CompletableFuture.completedFuture(null); + return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(brokerAllowAutoTopicCreation -> { + if (checkAllowAutoCreation) { + // Whether it exists or not, auto create a non-partitioned topic by client. + return CompletableFuture.completedFuture(metadata); + } else { + // If it does not exist, response a Not Found error. + // Otherwise, response a non-partitioned metadata. + return internalCheckTopicExists(topicName).thenApply(__ -> metadata); + } + }); } - return ret.thenApply(__ -> metadata); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 55e1d96b594e3..52135163a6ab3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -81,6 +81,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.limiter.ConnectionController; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -578,35 +580,93 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { - unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) - .handle((metadata, ex) -> { - if (ex == null) { - int partitions = metadata.partitions; - commandSender.sendPartitionMetadataResponse(partitions, requestId); - } else { - if (ex instanceof PulsarClientException) { - log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), - remoteAddress, topicName, ex.getMessage()); - commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, - ex.getMessage(), requestId); + // Get if exists, respond not found error if not exists. + getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate -> { + boolean autoCreateIfNotExist = partitionMetadata.isMetadataAutoCreationEnabled(); + if (!autoCreateIfNotExist) { + final NamespaceResources namespaceResources = getBrokerService().pulsar() + .getPulsarResources().getNamespaceResources(); + final TopicResources topicResources = getBrokerService().pulsar().getPulsarResources() + .getTopicResources(); + namespaceResources.getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(topicName, false) + .handle((metadata, getMetadataEx) -> { + if (getMetadataEx != null) { + log.error("{} {} Failed to get partition metadata", topicName, + ServerCnx.this.toString(), getMetadataEx); + writeAndFlush( + Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Failed to get partition metadata", + requestId)); + } else if (metadata.isPresent()) { + commandSender.sendPartitionMetadataResponse(metadata.get().partitions, + requestId); + } else if (topicName.isPersistent()) { + topicResources.persistentTopicExists(topicName).thenAccept(exists -> { + if (exists) { + commandSender.sendPartitionMetadataResponse(0, requestId); + return; + } + writeAndFlush(Commands.newPartitionMetadataResponse( + ServerError.TopicNotFound, "", requestId)); + }).exceptionally(ex -> { + log.error("{} {} Failed to get partition metadata", topicName, + ServerCnx.this.toString(), ex); + writeAndFlush( + Commands.newPartitionMetadataResponse(ServerError.MetadataError, + "Failed to check partition metadata", + requestId)); + return null; + }); + } else { + // Regarding non-persistent topic, we do not know whether it exists or not. + // Just return a non-partitioned metadata if partitioned metadata does not + // exist. + // Broker will respond a not found error when doing subscribing or producing if + // broker not allow to auto create topics. + commandSender.sendPartitionMetadataResponse(0, requestId); + } + return null; + }).whenComplete((ignore, ignoreEx) -> { + lookupSemaphore.release(); + if (ignoreEx != null) { + log.error("{} {} Failed to handle partition metadata request", topicName, + ServerCnx.this.toString(), ignoreEx); + } + }); + } else { + // Get if exists, create a new one if not exists. + unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) + .whenComplete((metadata, ex) -> { + lookupSemaphore.release(); + if (ex == null) { + int partitions = metadata.partitions; + commandSender.sendPartitionMetadataResponse(partitions, requestId); } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, - topicName, ex.getMessage(), ex); - ServerError error = ServerError.ServiceNotReady; - if (ex instanceof RestException restException){ - int responseCode = restException.getResponse().getStatus(); - if (responseCode == NOT_FOUND.getStatusCode()){ - error = ServerError.TopicNotFound; - } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()){ - error = ServerError.MetadataError; + if (ex instanceof PulsarClientException) { + log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), + remoteAddress, topicName, ex.getMessage()); + commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, + ex.getMessage(), requestId); + } else { + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, + topicName, ex.getMessage(), ex); + ServerError error = ServerError.ServiceNotReady; + if (ex instanceof RestException restException){ + int responseCode = restException.getResponse().getStatus(); + if (responseCode == NOT_FOUND.getStatusCode()){ + error = ServerError.TopicNotFound; + } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()){ + error = ServerError.MetadataError; + } } + commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), + requestId); } - commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId); } - } - lookupSemaphore.release(); - return null; - }); + }); + } + }); } else { final String msg = "Client is not authorized to Get Partition Metadata"; log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java new file mode 100644 index 0000000000000..51f643d2b7823 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Semaphore; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +@Slf4j +public class GetPartitionMetadataTest extends ProducerConsumerBase { + + private static final String DEFAULT_NS = "public/default"; + + private PulsarClientImpl clientWithHttpLookup; + private PulsarClientImpl clientWitBinaryLookup; + + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + clientWithHttpLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + clientWitBinaryLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + if (clientWithHttpLookup != null) { + clientWithHttpLookup.close(); + } + if (clientWitBinaryLookup != null) { + clientWitBinaryLookup.close(); + } + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + } + + private LookupService getLookupService(boolean isUsingHttpLookup) { + if (isUsingHttpLookup) { + return clientWithHttpLookup.getLookup(); + } else { + return clientWitBinaryLookup.getLookup(); + } + } + + @DataProvider(name = "topicDomains") + public Object[][] topicDomains() { + return new Object[][]{ + {TopicDomain.persistent}, + {TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "topicDomains") + public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + // HTTP client. + final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + clientWithHttpLookup.getPartitionsForTopic(tp1).join(); + Optional metadata1 = pulsar.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp1), true).join(); + assertTrue(metadata1.isPresent()); + assertEquals(metadata1.get().partitions, 3); + + // Binary client. + final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + clientWitBinaryLookup.getPartitionsForTopic(tp2).join(); + Optional metadata2 = pulsar.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp2), true).join(); + assertTrue(metadata2.isPresent()); + assertEquals(metadata2.get().partitions, 3); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + admin.topics().deletePartitionedTopic(tp1, false); + admin.topics().deletePartitionedTopic(tp2, false); + } + + @DataProvider(name = "autoCreationParamsAll") + public Object[][] autoCreationParamsAll(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true, TopicDomain.persistent}, + {true, true, false, TopicDomain.persistent}, + {true, false, true, TopicDomain.persistent}, + {true, false, false, TopicDomain.persistent}, + {false, true, true, TopicDomain.persistent}, + {false, true, false, TopicDomain.persistent}, + {false, false, true, TopicDomain.persistent}, + {false, false, false, TopicDomain.persistent}, + {true, true, true, TopicDomain.non_persistent}, + {true, true, false, TopicDomain.non_persistent}, + {true, false, true, TopicDomain.non_persistent}, + {true, false, false, TopicDomain.non_persistent}, + {false, true, true, TopicDomain.non_persistent}, + {false, true, false, TopicDomain.non_persistent}, + {false, false, true, TopicDomain.non_persistent}, + {false, false, false, TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "autoCreationParamsAll") + public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + admin.topics().createNonPartitionedTopic(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = + lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 0); + List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + List topicList = admin.topics().getList("public/default"); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().delete(topicNameStr, false); + } + + @Test(dataProvider = "autoCreationParamsAll") + public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup, + TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + admin.topics().createPartitionedTopic(topicNameStr, 3); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = + lookup.getPartitionedTopicMetadata(topicName, paramMetadataAutoCreationEnabled).join(); + assertEquals(response.partitions, 3); + List topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().deletePartitionedTopic(topicNameStr, false); + } + + @DataProvider(name = "clients") + public Object[][] clients(){ + return new Object[][]{ + // isUsingHttpLookup. + {true, TopicDomain.persistent}, + {false, TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "clients") + public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); + assertEquals(response.partitions, 3); + List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertTrue(partitionedTopics.contains(topicNameStr)); + List topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + for (int i = 0; i < 3; i++) { + // The API "getPartitionedTopicMetadata" only creates the partitioned metadata, it will not create the + // partitions. + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + admin.topics().deletePartitionedTopic(topicNameStr, false); + } + + @Test(dataProvider = "clients") + public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDomain topicDomain) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + conf.setAllowAutoTopicCreation(true); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Create topic. + final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + PartitionedTopicMetadata response = lookup.getPartitionedTopicMetadata(topicName, true).join(); + assertEquals(response.partitions, 0); + List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + assertFalse(partitionedTopics.contains(topicNameStr)); + List topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + try { + admin.topics().delete(topicNameStr, false); + } catch (Exception ex) {} + } + + @DataProvider(name = "autoCreationParamsNotAllow") + public Object[][] autoCreationParamsNotAllow(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, false, true}, + {true, false, false}, + {false, false, true}, + {false, false, false}, + {false, true, true}, + {false, true, false}, + }; + } + + @Test(dataProvider = "autoCreationParamsNotAllow") + public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + // These test cases are for the following PR. + // Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206. + return; + } + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + try { + lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); + fail("Expect a not found exception"); + } catch (Exception e) { + log.warn("", e); + Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); + } + + List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName); + assertFalse(partitionedTopics.contains(topicNameStr)); + List topicList = admin.topics().getList("public/default"); + assertFalse(topicList.contains(topicNameStr)); + for (int i = 0; i < 3; i++) { + assertFalse(topicList.contains(topicName.getPartition(i))); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + } + + @DataProvider(name = "autoCreationParamsForNonPersistentTopic") + public Object[][] autoCreationParamsForNonPersistentTopic(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true}, + {true, true, false}, + {false, true, true}, + {false, true, false}, + {false, false, true} + }; + } + + /** + * Regarding the API "get partitioned metadata" about non-persistent topic. + * The original behavior is: + * param-auto-create = true, broker-config-auto-create = true + * HTTP API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * binary API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()} + * param-auto-create = true, broker-config-auto-create = false + * HTTP API: {partitions: 0} + * binary API: {partitions: 0} + * param-auto-create = false + * HTTP API: not found error + * binary API: not support + * This test only guarantees that the behavior is the same as before. The following separated PR will fix the + * incorrect behavior. + */ + @Test(dataProvider = "autoCreationParamsForNonPersistentTopic") + public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation); + setup(); + + Semaphore semaphore = pulsar.getBrokerService().getLookupRequestSemaphore(); + int lookupPermitsBefore = semaphore.availablePermits(); + + LookupService lookup = getLookupService(isUsingHttpLookup); + // Define topic. + final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); + final TopicName topicName = TopicName.get(topicNameStr); + // Verify. + // Regarding non-persistent topic, we do not know whether it exists or not. + // Broker will return a non-partitioned metadata if partitioned metadata does not exist. + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + + if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) { + try { + lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled) + .join(); + Assert.fail("Expected a not found ex"); + } catch (Exception ex) { + // Cleanup. + client.close(); + return; + } + } + + PartitionedTopicMetadata metadata = lookup + .getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join(); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertEquals(metadata.partitions, 3); + } else { + assertEquals(metadata.partitions, 0); + } + + List partitionedTopics = admin.topics().getPartitionedTopicList("public/default"); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName); + if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) { + assertTrue(partitionedTopics.contains(topicNameStr)); + } else { + assertFalse(partitionedTopics.contains(topicNameStr)); + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + int lookupPermitsAfter = semaphore.availablePermits(); + assertEquals(lookupPermitsAfter, lookupPermitsBefore); + }); + + // Cleanup. + client.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index a75ae78cef393..4712682e71b57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -133,7 +134,7 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() // we want to skip the "lookup" phase, because it is blocked by the HTTP API LookupService mockLookup = mock(LookupService.class); ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); - when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer( + when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer( i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(i -> { InetSocketAddress brokerAddress = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 0a6cffc7685d4..ea5365bcf4b2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -566,13 +566,13 @@ public void testExtensibleLoadManagerImplInternalTopicAutoCreations() try { pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create(); Assert.fail("Create should have failed."); - } catch (PulsarClientException.TopicDoesNotExistException e) { + } catch (PulsarClientException.TopicDoesNotExistException | PulsarClientException.NotFoundException e) { // expected } try { pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create(); Assert.fail("Create should have failed."); - } catch (PulsarClientException.TopicDoesNotExistException e) { + } catch (PulsarClientException.TopicDoesNotExistException | PulsarClientException.NotFoundException e) { // expected } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 9f561889aa825..2ce6728e98a08 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1022,12 +1022,12 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l // for PMR // 2 lookup will succeed long reqId1 = reqId++; - ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1); + ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1, true); CompletableFuture f1 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1)); long reqId2 = reqId++; - ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2); + ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2, true); CompletableFuture f2 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> { CompletableFuture future = clientCnx.newLookup(request2, reqId2); @@ -1042,17 +1042,17 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l // 3 lookup will fail latchRef.set(new CountDownLatch(1)); long reqId3 = reqId++; - ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3); + ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3, true); f1 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3)); long reqId4 = reqId++; - ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4); + ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4, true); f2 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4)); long reqId5 = reqId++; - ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5); + ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5, true); CompletableFuture f3 = pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> { CompletableFuture future = clientCnx.newLookup(request5, reqId5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index b2cfe63e2e5b4..0d984e0675db5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -167,7 +167,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception { for (int i = 0; i < totalConsumers; i++) { long reqId = 0xdeadbeef + i; Future f = executor.submit(() -> { - ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId); + ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId, true); pool.getConnection(resolver.resolveHost()) .thenCompose(clientCnx -> clientCnx.newLookup(request, reqId)) .get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index ad7728319c9a7..626bce380b9de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3573,7 +3573,7 @@ public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception doReturn(false).when(pulsar).isRunning(); assertTrue(channel.isActive()); - ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1); + ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1, true); channel.writeInbound(clientCommand); Object response = getResponse(); assertTrue(response instanceof CommandPartitionedTopicMetadataResponse); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 3901f186d81c7..3f268c4b7c973 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -146,7 +146,7 @@ public void testTransactionBufferLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() @@ -251,7 +251,7 @@ public void testPendingAckLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 0a4c5b7a318b3..9319ea4e876b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -852,7 +852,7 @@ public void testMergeGetPartitionedMetadataRequests() throws Exception { // Verify the request is works after merge the requests. List> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { - futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName))); + futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false)); } for (CompletableFuture future : futures) { assertEquals(future.join().partitions, topicPartitions); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 78952fcaed8b3..6c46bce254f6f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -308,14 +308,33 @@ static ClientBuilder builder() { * *

This can be used to discover the partitions and create {@link Reader}, {@link Consumer} or {@link Producer} * instances directly on a particular partition. - * + * @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(String, boolean)}. * @param topic * the topic name * @return a future that will yield a list of the topic partitions or {@link PulsarClientException} if there was any * error in the operation. + * * @since 2.3.0 */ - CompletableFuture> getPartitionsForTopic(String topic); + @Deprecated + default CompletableFuture> getPartitionsForTopic(String topic) { + return getPartitionsForTopic(topic, true); + } + + /** + * 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a + * partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists. + * 2. When {@param metadataAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned + * topic does not exist. You will get an {@link PulsarClientException.NotFoundException} or a + * {@link PulsarClientException.TopicDoesNotExistException}. + * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false + * on an old broker version which does not support getting partitions without partitioned metadata auto-creation. + * 3. When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using + * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. + * For the result, see case 1. + * @version 3.3.0. + */ + CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled); /** * Close the PulsarClient and release all the resources. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9d01d863143e2..d6f4dd2dcac14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -121,12 +121,14 @@ public CompletableFuture> getBroker(T * calls broker binaryProto-lookup api to get metadata of partitioned-topic. * */ - public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { + @Override + public CompletableFuture getPartitionedTopicMetadata( + TopicName topicName, boolean metadataAutoCreationEnabled) { final MutableObject newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { - CompletableFuture newFuture = - getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName); + CompletableFuture newFuture = getPartitionedTopicMetadata( + serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled); newFutureCreated.setValue(newFuture); return newFuture; }); @@ -222,13 +224,14 @@ private CompletableFuture> findBroker } private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress, - TopicName topicName) { + TopicName topicName, boolean metadataAutoCreationEnabled) { CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); - ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); + ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, + metadataAutoCreationEnabled); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { log.warn("[{}] failed to get Partitioned metadata : {}", topicName, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 7686d0072cffb..7735f66e7838a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -136,9 +136,9 @@ public CompletableFuture> subscribeAsync() { if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { CompletableFuture retryLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldRetryLetterTopic); + client.getPartitionedTopicMetadata(oldRetryLetterTopic, true); CompletableFuture deadLetterTopicMetadata = - client.getPartitionedTopicMetadata(oldDeadLetterTopic); + client.getPartitionedTopicMetadata(oldDeadLetterTopic, true); applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 6e8c2b4314e17..ba04aaa3b3117 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -108,10 +108,11 @@ public CompletableFuture> getBroker(T } @Override - public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { + public CompletableFuture getPartitionedTopicMetadata( + TopicName topicName, boolean metadataAutoCreationEnabled) { String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; - return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", - PartitionedTopicMetadata.class); + return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=" + + metadataAutoCreationEnabled, PartitionedTopicMetadata.class); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 48ef67eae2047..978450ed6894d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -60,11 +60,30 @@ public interface LookupService extends AutoCloseable { /** * Returns {@link PartitionedTopicMetadata} for a given topic. - * - * @param topicName topic-name - * @return + * Note: this method will try to create the topic partitioned metadata if it does not exist. + * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean)}}. + */ + @Deprecated + default CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { + return getPartitionedTopicMetadata(topicName, true); + } + + /** + * 1.Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; + * return "{partition: 0}" if a non-partitioned topic exists. + * 2. When {@param metadataAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic + * does not exist. You will get a {@link PulsarClientException.NotFoundException} or + * a {@link PulsarClientException.TopicDoesNotExistException}. + * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false + * on an old broker version which does not support getting partitions without partitioned metadata + * auto-creation. + * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using + * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. + * For the result, see case 1. + * @version 3.3.0. */ - CompletableFuture getPartitionedTopicMetadata(TopicName topicName); + CompletableFuture getPartitionedTopicMetadata(TopicName topicName, + boolean metadataAutoCreationEnabled); /** * Returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f2bce59a1e68e..62b6612fa3c26 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -954,7 +954,7 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo CompletableFuture subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicName) + client.getPartitionedTopicMetadata(topicName, true) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b92e039e5facd..9a5ec8b874bb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -377,7 +377,7 @@ private CompletableFuture> createProducerAsync(String topic, ProducerInterceptors interceptors) { CompletableFuture> producerCreatedFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -519,7 +519,7 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC String topic = conf.getSingleTopic(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -659,7 +659,7 @@ protected CompletableFuture> createSingleTopicReaderAsync( CompletableFuture> readerFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -1028,11 +1028,8 @@ public void reloadLookUp() throws PulsarClientException { } } - public CompletableFuture getNumberOfPartitions(String topic) { - return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions); - } - - public CompletableFuture getPartitionedTopicMetadata(String topic) { + public CompletableFuture getPartitionedTopicMetadata( + String topic, boolean metadataAutoCreationEnabled) { CompletableFuture metadataFuture = new CompletableFuture<>(); @@ -1045,7 +1042,7 @@ public CompletableFuture getPartitionedTopicMetadata(S .setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .create(); getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, - metadataFuture, new ArrayList<>()); + metadataFuture, new ArrayList<>(), metadataAutoCreationEnabled); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -1056,15 +1053,19 @@ private void getPartitionedTopicMetadata(TopicName topicName, Backoff backoff, AtomicLong remainingTime, CompletableFuture future, - List previousExceptions) { + List previousExceptions, + boolean metadataAutoCreationEnabled) { long startTime = System.nanoTime(); - lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { + CompletableFuture queryFuture = + lookup.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled); + queryFuture.thenAccept(future::complete).exceptionally(e -> { remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to // `TooManyRequestsException` boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause()) - || e.getCause() instanceof PulsarClientException.AuthenticationException; + || e.getCause() instanceof PulsarClientException.AuthenticationException + || e.getCause() instanceof PulsarClientException.NotFoundException; if (nextDelay <= 0 || isLookupThrottling) { PulsarClientException.setPreviousExceptions(e, previousExceptions); future.completeExceptionally(e); @@ -1076,15 +1077,16 @@ private void getPartitionedTopicMetadata(TopicName topicName, log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); - getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions); + getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions, + metadataAutoCreationEnabled); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); } @Override - public CompletableFuture> getPartitionsForTopic(String topic) { - return getPartitionedTopicMetadata(topic).thenApply(metadata -> { + public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List partitions = new ArrayList<>(metadata.partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 9e79fc203c225..499627f9c73f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -79,7 +79,8 @@ public void start() throws TransactionCoordinatorClientException { @Override public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { - return pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN) + return pulsarClient.getLookup() + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true) .thenCompose(partitionMeta -> { List> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index febec2bff3285..191124bb7b002 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture; import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -153,7 +154,8 @@ private MultiTopicsConsumerImpl createMultiTopicsConsumer( int completionDelayMillis = 100; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture( + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenAnswer(invocation -> createDelayedCompletedFuture( new PartitionedTopicMetadata(), completionDelayMillis)); MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl( clientMock, consumerConfData, executorProvider, @@ -201,7 +203,8 @@ public void testConsumerCleanupOnSubscribeFailure() { int completionDelayMillis = 10; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture( + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenAnswer(invocation -> createExceptionFuture( new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); CompletableFuture> completeFuture = new CompletableFuture<>(); MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(clientMock, consumerConfData, @@ -237,7 +240,8 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // Simulate non partitioned topics PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0); - when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata)); + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(metadata)); CompletableFuture> completeFuture = new CompletableFuture<>(); MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl<>( @@ -248,7 +252,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics, // but not anymore since the topics are not partitioned. - verify(clientMock, times(3)).getPartitionedTopicMetadata(any()); + verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index e0b25db891247..e13c060a052ec 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.nullable; @@ -106,7 +107,7 @@ public void testConsumerIsClosed() throws Exception { nullable(String.class))) .thenReturn(CompletableFuture.completedFuture( new GetTopicsResult(Collections.emptyList(), null, false, true))); - when(lookup.getPartitionedTopicMetadata(any(TopicName.class))) + when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); when(lookup.getBroker(any())) .thenReturn(CompletableFuture.completedFuture( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index faa5fbcd30130..01a1bd69e07b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -188,6 +188,7 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsAuthRefresh(true); flags.setSupportsBrokerEntryMetadata(true); flags.setSupportsPartialProducer(true); + flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -880,11 +881,13 @@ public static ByteBuf newPartitionMetadataResponse(ServerError error, String err return serializeWithSize(newPartitionMetadataResponseCommand(error, errorMsg, requestId)); } - public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) { + public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, + boolean metadataAutoCreationEnabled) { BaseCommand cmd = localCmd(Type.PARTITIONED_METADATA); cmd.setPartitionMetadata() .setTopic(topic) - .setRequestId(requestId); + .setRequestId(requestId) + .setMetadataAutoCreationEnabled(metadataAutoCreationEnabled); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index f6fcb12f35939..0628d494af3af 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -199,9 +199,9 @@ public static CompletableFuture failedFuture(Throwable t) { public static Throwable unwrapCompletionException(Throwable ex) { if (ex instanceof CompletionException) { - return ex.getCause(); + return unwrapCompletionException(ex.getCause()); } else if (ex instanceof ExecutionException) { - return ex.getCause(); + return unwrapCompletionException(ex.getCause()); } else { return ex; } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..f56df6ae9d103 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -300,6 +300,7 @@ message FeatureFlags { optional bool supports_broker_entry_metadata = 2 [default = false]; optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; + optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; } message CommandConnected { @@ -413,6 +414,7 @@ message CommandPartitionedTopicMetadata { // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; + optional bool metadata_auto_creation_enabled = 6 [default = true]; } message CommandPartitionedTopicMetadataResponse {