diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index c417604d4885e..f4160edf3737b 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -425,10 +425,13 @@ public List getChildren(String path, boolean watch) throws KeeperExcepti if (path.length() >= item.length()) { continue; } - - String child = item.substring(path.length() + 1); - if (!child.contains("/")) { - children.add(child); + String child = item.substring(path.length()); + if (child.indexOf("/") == 0) { + child = child.substring(1); + log.debug("child: '{}'", child); + if (!child.contains("/")) { + children.add(child); + } } } } @@ -465,10 +468,13 @@ public void getChildren(final String path, boolean watcher, final Children2Callb } else if (item.equals(path)) { continue; } else { - String child = item.substring(path.length() + 1); - log.debug("child: '{}'", child); - if (!child.contains("/")) { - children.add(child); + String child = item.substring(path.length()); + if (child.indexOf("/") == 0) { + child = child.substring(1); + log.debug("child: '{}'", child); + if (!child.contains("/")) { + children.add(child); + } } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 02ebd064aa4d3..e9d559ef014e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.servlet.ServletContext; @@ -66,9 +68,9 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -111,6 +113,11 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } + protected void zkCreateOptimisticAsync(String path, byte[] content, AsyncCallback.StringCallback callback) { + ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, callback, null); + } + protected boolean zkPathExists(String path) throws KeeperException, InterruptedException { Stat stat = globalZk().exists(path, false); if (null != stat) { @@ -119,6 +126,21 @@ protected boolean zkPathExists(String path) throws KeeperException, InterruptedE return false; } + protected void zkSync(String path) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue()); + globalZk().sync(path, (rc2, s, ctx) -> { + if (KeeperException.Code.OK.intValue() != rc2) { + rc.set(rc2); + } + latch.countDown(); + }, null); + latch.await(); + if (KeeperException.Code.OK.intValue() != rc.get()) { + throw KeeperException.create(KeeperException.Code.get(rc.get())); + } + } + /** * Get the domain of the topic (whether it's persistent or non-persistent) */ @@ -233,6 +255,37 @@ protected List getListOfNamespaces(String property) throws Exception { return namespaces; } + protected void tryCreatePartitionsAsync(int numPartitions) { + if (!topicName.isPersistent()) { + return; + } + for (int i = 0; i < numPartitions; i++) { + tryCreatePartitionAsync(i); + } + } + + private void tryCreatePartitionAsync(final int partition) { + zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], + (rc, s, o, s1) -> { + if (KeeperException.Code.OK.intValue() == rc) { + if (log.isDebugEnabled()) { + log.debug("[{}] Topic partition {} created.", clientAppId(), + topicName.getPartition(partition)); + } + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { + log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(), + topicName.getPartition(partition)); + } else if (KeeperException.Code.BADVERSION.intValue() == rc) { + log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.", + clientAppId(), topicName.getPartition(partition)); + tryCreatePartitionAsync(partition); + } else { + log.error("[{}] Fail to create topic partition {}", clientAppId(), + topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc))); + } + }); + } + protected NamespaceName namespaceName; protected void validateNamespaceName(String property, String namespace) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java index c60422d16995f..95954f610d25e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java @@ -31,6 +31,10 @@ public static String partitionedTopicPath(TopicName name) { name.getNamespace(), name.getDomain().value(), name.getEncodedLocalName()); } + public static String managedLedgerPath(TopicName name) { + return "/managed-ledgers/" + name.getPersistenceNamingEncoding(); + } + public static String namespacePoliciesPath(NamespaceName name) { return adminPath(POLICIES, name.toString()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 972e1c3207bd9..7d4c8eb288d0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -116,7 +116,6 @@ public class PersistentTopicsBase extends AdminResource { private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class); - public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000; private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10; private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v"; private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21); @@ -414,8 +413,9 @@ protected void internalCreatePartitionedTopic(int numPartitions) { String path = ZkAdminPaths.partitionedTopicPath(topicName); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + tryCreatePartitionsAsync(numPartitions); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); @@ -533,6 +533,13 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL } } + protected void internalCreateMissedPartitions() { + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false); + if (metadata != null) { + tryCreatePartitionsAsync(metadata.partitions); + } + } + private CompletableFuture updatePartitionInOtherCluster(int numPartitions, Set clusters) { List> results = new ArrayList<>(clusters.size() -1); clusters.forEach(cluster -> { @@ -620,8 +627,8 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole try { globalZk().delete(path, -1); globalZkCache().invalidate(path); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); return; @@ -1839,24 +1846,28 @@ private CompletableFuture createSubscriptions(TopicName topicName, int num } admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { - stats.subscriptions.keySet().forEach(subscription -> { - List> subscriptionFutures = new ArrayList<>(); - for (int i = partitionMetadata.partitions; i < numPartitions; i++) { - final String topicNamePartition = topicName.getPartition(i).toString(); + if (stats.subscriptions.size() == 0) { + result.complete(null); + } else { + stats.subscriptions.keySet().forEach(subscription -> { + List> subscriptionFutures = new ArrayList<>(); + for (int i = partitionMetadata.partitions; i < numPartitions; i++) { + final String topicNamePartition = topicName.getPartition(i).toString(); - subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, - subscription, MessageId.latest)); - } + subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, + subscription, MessageId.latest)); + } - FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { - log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName); - result.complete(null); - }).exceptionally(ex -> { - log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex); - result.completeExceptionally(ex); - return null; + FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { + log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName); + result.complete(null); + }).exceptionally(ex -> { + log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex); + result.completeExceptionally(ex); + return null; + }); }); - }); + } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { // The first partition doesn't exist, so there are currently to subscriptions to recreate diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 76671672d42b8..0179847590bb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -144,8 +144,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index eeaeb96a649ee..a41db33b9f19c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -190,8 +190,8 @@ public void createPartitionedTopic( topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 9eec8ae23ef84..08411dd5a33d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -246,7 +246,7 @@ public void createNonPartitionedTopic( */ @POST @Path("/{tenant}/{namespace}/{topic}/partitions") - @ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") + @ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -270,6 +270,30 @@ public void updatePartitionedTopic( internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly); } + + @POST + @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions") + @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant does not exist"), + @ApiResponse(code = 409, message = "Partitioned topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void createMissedPartitions( + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + + validatePartitionedTopicName(tenant, namespace, encodedTopic); + internalCreateMissedPartitions(); + } + @GET @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Get partitioned topic metadata.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aa0e1982577ca..b9fd0a7b0e027 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -83,7 +82,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -1821,10 +1819,15 @@ private CompletableFuture createDefaultPartitionedTopi partitionedTopicPath(topicName), content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> { if (rc == KeeperException.Code.OK.intValue()) { - // we wait for the data to be synced in all quorums and the observers - executor().schedule( - SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)), - PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS); + // Sync data to all quorums and the observers + pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName), + (rc2, path2, ctx2) -> { + if (rc2 == KeeperException.Code.OK.intValue()) { + partitionedTopicFuture.complete(configMetadata); + } else { + partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2)); + } + }, null); } else { partitionedTopicFuture.completeExceptionally(KeeperException.create(rc)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 855a20daf2686..a0b48cfd5bc56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -815,9 +815,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4); - // check if the virtual topic doesn't get created List topics = admin.topics().getList("prop-xyz/ns1"); - assertEquals(topics.size(), 0); + assertEquals(topics.size(), 4); assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions, 0); @@ -829,15 +828,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).partitions.size(), 0); - try { - admin.topics().getSubscriptions(partitionedTopicName); - fail("should have failed"); - } catch (PulsarAdminException e) { - // ok - assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); - } catch (Exception e) { - fail(e.getMessage()); - } + List subscriptions = admin.topics().getSubscriptions(partitionedTopicName); + assertEquals(subscriptions.size(), 0); // create consumer and subscription PulsarClient client = PulsarClient.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index cdeb41886e38f..a4cd3258198ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -159,17 +159,7 @@ public void testGetSubscriptions() { // 3) Create the partitioned topic persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3); - // 4) Confirm that the topic partitions has not been created yet - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); - Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), - Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic partitions were not yet created"); - - // 5) Create a subscription + // 4) Create a subscription response = mock(AsyncResponse.class); persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl) MessageId.earliest, false); @@ -177,26 +167,26 @@ public void testGetSubscriptions() { verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - // 6) Confirm that the subscription exists + // 5) Confirm that the subscription exists response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test")); - // 7) Delete the subscription + // 6) Delete the subscription response = mock(AsyncResponse.class); persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - // 8) Confirm that the subscription does not exist + // 7) Confirm that the subscription does not exist response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); verify(response, timeout(5000).times(1)).resume(Lists.newArrayList()); - // 9) Delete the partitioned topic + // 8) Delete the partitioned topic response = mock(AsyncResponse.class); persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true); responseCaptor = ArgumentCaptor.forClass(Response.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index e3965bd56e139..cf5db2b660220 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -782,9 +782,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4); - // check if the virtual topic doesn't get created List topics = admin.topics().getList("prop-xyz/use/ns1"); - assertEquals(topics.size(), 0); + assertEquals(topics.size(), 4); assertEquals( admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2").partitions, @@ -809,6 +808,9 @@ public void partitionedTopics(String topicName) throws Exception { fail(e.getMessage()); } + List subscriptions = admin.topics().getSubscriptions(partitionedTopicName); + assertEquals(subscriptions.size(), 1); + Consumer consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1") .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java new file mode 100644 index 0000000000000..2647bbba380a8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.admin.ZkAdminPaths; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper; + +public class PartitionCreationTest extends ProducerConsumerBase { + + @DataProvider(name = "topicDomainProvider") + public Object[][] topicDomainProvider() { + return new Object[][] { + { TopicDomain.persistent }, + { TopicDomain.non_persistent } + }; + } + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setManagedLedgerCacheEvictionFrequency(0.1); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = domain.value() + "://public/default/testCreateConsumerWhenDisableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation"; + try { + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + if (domain == TopicDomain.persistent) { + Assert.fail("should be failed"); + } else { + // passed non persistent topic here since we can not avoid auto creation on non persistent topic now. + Assert.assertNotNull(consumer); + } + } catch (PulsarClientException.TopicDoesNotExistException e) { + //ok + } + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(true); + final String topic = domain.value() + "://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarClientException { + conf.setAllowAutoTopicCreation(true); + final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation"; + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + + @Test + public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertEquals(consumer.getConsumers().size(), 3); + consumer.close(); + admin.topics().updatePartitionedTopic(topic, 5); + consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertEquals(consumer.getConsumers().size(), 5); + } + + @Test + public void testCreateMissedPartitions() throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateMissedPartitions"; + String path = ZkAdminPaths.partitionedTopicPath(TopicName.get(topic)); + int numPartitions = 3; + byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); + // simulate partitioned topic without partitions + ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Consumer consumer = null; + try { + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribeAsync().get(3, TimeUnit.SECONDS); + } catch (Exception e) { + //ok here, consumer will create failed with 'Topic does not exist' + } + Assert.assertNull(consumer); + admin.topics().createMissedPartitions(topic); + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl); + Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), 3); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 7413825d9668a..a5b20f3586929 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -487,11 +487,11 @@ public void testStartEmptyPatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); - // 3. verify consumer get methods, to get 0 number of partitions and topics. + // 3. verify consumer get methods, to get 5 number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 0); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 5); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 5); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 2); // 4. create producer String messagePredicate = "my-message-" + key + "-"; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 39c007c7ffba0..c5ec04120c022 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -211,6 +211,16 @@ List getListInBundle(String namespace, String bundleRange) */ void createNonPartitionedTopic(String topic) throws PulsarAdminException; + /** + * Create missed partitions for partitioned topic. + *

+ * When disable topic auto creation, use this method to try create missed partitions while + * partitions create failed or users already have partitioned topic without partitions. + * + * @param topic partitioned topic name + */ + void createMissedPartitions(String topic) throws PulsarAdminException; + /** * Create a partitioned topic asynchronously. *

@@ -233,6 +243,16 @@ List getListInBundle(String namespace, String bundleRange) */ CompletableFuture createNonPartitionedTopicAsync(String topic); + /** + * Create missed partitions for partitioned topic asynchronously. + *

+ * When disable topic auto creation, use this method to try create missed partitions while + * partitions create failed or users already have partitioned topic without partitions. + * + * @param topic partitioned topic name + */ + CompletableFuture createMissedPartitionsAsync(String topic); + /** * Update number of partitions of a non-global partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 3c318453eefac..75c3d59a55d7c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -224,6 +224,20 @@ public void createNonPartitionedTopic(String topic) throws PulsarAdminException } } + @Override + public void createMissedPartitions(String topic) throws PulsarAdminException { + try { + createMissedPartitionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public CompletableFuture createNonPartitionedTopicAsync(String topic){ TopicName tn = validateTopic(topic); @@ -239,6 +253,13 @@ public CompletableFuture createPartitionedTopicAsync(String topic, int num return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture createMissedPartitionsAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "createMissedPartitions"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void updatePartitionedTopic(String topic, int numPartitions) throws PulsarAdminException { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 06775e1d38d0e..f751cc234a997 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -713,6 +713,8 @@ public static PulsarClientException unwrap(Throwable t) { return new ChecksumException(msg); } else if (cause instanceof CryptoException) { return new CryptoException(msg); + } else if (cause instanceof TopicDoesNotExistException) { + return new TopicDoesNotExistException(msg); } else { return new PulsarClientException(t); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9a2ae1b6149f0..af5aa02e7a2a3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -82,6 +82,7 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions()); jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd()); + jcommander.addCommand("create-missed-partitions", new CreateMissedPartitionsCmd()); jcommander.addCommand("create", new CreateNonPartitionedCmd()); jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); @@ -214,6 +215,21 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Try to create partitions for partitioned topic. \n" + + "\t\t The partitions of partition topic has to be created, can be used by repair partitions when \n" + + "\t\t topic auto creation is disabled") + private class CreateMissedPartitionsCmd extends CliCommand { + + @Parameter(description = "persistent://tenant/namespace/topic\n", required = true) + private java.util.List params; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + topics.createMissedPartitions(topic); + } + } + @Parameters(commandDescription = "Create a non-partitioned topic.") private class CreateNonPartitionedCmd extends CliCommand { diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md index fe0ab9853d35a..a6507e073b759 100644 --- a/site2/docs/admin-api-partitioned-topics.md +++ b/site2/docs/admin-api-partitioned-topics.md @@ -62,6 +62,34 @@ int numPartitions = 4; admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); ``` +### Create missed partitions + +Try to create partitions for partitioned topic. The partitions of partition topic has to be created, +can be used by repair partitions when topic auto creation is disabled + +#### pulsar-admin + +You can create missed partitions using the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions) +command and specifying the topic name as an argument. + +Here's an example: + +```shell +$ bin/pulsar-admin topics create-missed-partitions \ + persistent://my-tenant/my-namespace/my-topic \ +``` + +#### REST API + +{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic|operation/createMissedPartitions} + +#### Java + +```java +String topicName = "persistent://my-tenant/my-namespace/my-topic"; +admin.persistentTopics().createMissedPartitions(topicName); +``` + ### Get metadata Partitioned topics have metadata associated with them that you can fetch as a JSON object. diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 6e9f7a33ff6d5..af8c88b50ad69 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1603,6 +1603,7 @@ Subcommands * `offload` * `offload-status` * `create-partitioned-topic` +* `create-missed-partitions` * `delete-partitioned-topic` * `create` * `get-partitioned-topic-metadata` @@ -1704,6 +1705,15 @@ Options |---|---|---| |`-p`, `--partitions`|The number of partitions for the topic|0| +### `create-missed-partitions` +Try to create partitions for partitioned topic. The partitions of partition topic has to be created, +can be used by repair partitions when topic auto creation is disabled + +Usage +```bash +$ pulsar-admin topics create-missed-partitions persistent://tenant/namespace/topic +``` + ### `delete-partitioned-topic` Delete a partitioned topic. This will also delete all the partitions of the topic if they exist.