From 4b8aeac3a63ccaf6df276457beb934a4649b3e7d Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Wed, 18 Mar 2020 23:04:48 -0700 Subject: [PATCH 1/2] Fix: topic with one partition cannot be updated --- .../broker/admin/impl/PersistentTopicsBase.java | 5 ----- .../pulsar/broker/admin/IncrementPartitionsTest.java | 12 +++++++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c5dabb40523ab..8c2a0ccd4fa5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2162,11 +2162,6 @@ private CompletableFuture createSubscriptions(TopicName topicName, int num String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding()); CompletableFuture result = new CompletableFuture<>(); pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions <= 1) { - result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); - return; - } - if (partitionMetadata.partitions >= numPartitions) { result.completeExceptionally(new RestException(Status.CONFLICT, "number of partitions must be more than existing " + partitionMetadata.partitions)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index 5c5a8b341eb9f..f43d38199d763 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -77,11 +77,17 @@ public void testIncrementPartitionsOfTopicOnUnusedTopic() throws Exception { public void testIncrementPartitionsOfTopic() throws Exception { final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic-2"; - admin.topics().createPartitionedTopic(partitionedTopicName, 10); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10); + admin.topics().createPartitionedTopic(partitionedTopicName, 1); + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 1); Consumer consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("sub-1") - .subscribe(); + .subscribe(); + + admin.topics().updatePartitionedTopic(partitionedTopicName, 2); + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2); + + admin.topics().updatePartitionedTopic(partitionedTopicName, 10); + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10); admin.topics().updatePartitionedTopic(partitionedTopicName, 20); assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); From d1feb43bfff34366294656be6f91597702a041bc Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 19 Mar 2020 10:50:20 -0700 Subject: [PATCH 2/2] addressing issue --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8c2a0ccd4fa5d..8555b65b6e290 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2162,6 +2162,11 @@ private CompletableFuture createSubscriptions(TopicName topicName, int num String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding()); CompletableFuture result = new CompletableFuture<>(); pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions < 1) { + result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); + return; + } + if (partitionMetadata.partitions >= numPartitions) { result.completeExceptionally(new RestException(Status.CONFLICT, "number of partitions must be more than existing " + partitionMetadata.partitions));