From 27cc86b469d919d39e65187bdfa02719e59262d3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 23 May 2024 23:07:28 +0800 Subject: [PATCH 1/4] [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation --- .../pulsar/client/impl/ClientCnxTest.java | 46 +++++++++++++++++++ .../client/impl/BinaryProtoLookupService.java | 5 ++ .../apache/pulsar/client/impl/ClientCnx.java | 5 ++ .../pulsar/common/protocol/Commands.java | 1 + 4 files changed, 57 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index dfd52d494ae15..83547bc7b1381 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -20,17 +20,23 @@ import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -124,4 +130,44 @@ public void testClientVersion() throws Exception { producer.close(); consumer.close(); } + + @Test + public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { + final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); + admin.topics().createNonPartitionedTopic(topic); + PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) PulsarClient.builder() + .maxNumberOfRejectedRequestPerConnection(1) + .connectionMaxIdleSeconds(Integer.MAX_VALUE) + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + ProducerImpl producer = (ProducerImpl) clientWitBinaryLookup.newProducer().topic(topic).create(); + + // Verify: the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker is true. + Awaitility.await().untilAsserted(() -> { + ClientCnx clientCnx = producer.getClientCnx(); + Assert.assertNotNull(clientCnx); + Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()); + }); + Assert.assertEquals( + clientWitBinaryLookup.getPartitionsForTopic(topic, true).get().size(), 1); + + // Inject a "false" value for the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation". + // Verify: client will get a not support error. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (CompletableFuture clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) { + field.set(clientCnxFuture.get(), false); + } + try { + clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); + Assert.fail("Expected an error that the broker version is too old."); + } catch (Exception ex) { + Assert.assertTrue(ex.getMessage().contains("version is too old to support getting partitions")); + } + + // cleanup. + producer.close(); + clientWitBinaryLookup.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index b363d6e4366ad..55a5c88e658b7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -254,6 +254,11 @@ private CompletableFuture getPartitionedTopicMetadata( CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { + partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The broker" + + " version is too old to support getting partitions without auto-creation")); + return; + } long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, metadataAutoCreationEnabled); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 03e0f406dd2f2..6f343a2ee5855 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -191,6 +191,8 @@ public class ClientCnx extends PulsarHandler { protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; private boolean supportsTopicWatchers; + @Getter + private boolean supportsGetPartitionedMetadataWithoutAutoCreation; /** Idle stat. **/ @Getter @@ -400,6 +402,9 @@ protected void handleConnected(CommandConnected connected) { supportsTopicWatchers = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers(); + supportsGetPartitionedMetadataWithoutAutoCreation = + connected.hasFeatureFlags() + && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cbee7f354c4df..224e093baf112 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -301,6 +301,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setProtocolVersion(versionToAdvertise); connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); + connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); return cmd; } From 2136979fe4fc52a627c826eb6aa6db448eec8d98 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 31 May 2024 09:16:58 +0800 Subject: [PATCH 2/4] address comment --- .../apache/pulsar/client/impl/BinaryProtoLookupService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 55a5c88e658b7..bf015c564b9cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -255,8 +255,9 @@ private CompletableFuture getPartitionedTopicMetadata( client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { - partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The broker" - + " version is too old to support getting partitions without auto-creation")); + partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of" + + " getting partitions without auto-creation is not supported from the broker," + + " please upgrade the broker to the latest version.")); return; } long requestId = client.newRequestId(); From 4fb89861ff8264446cfc8cc2995e55144ab14e81 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 4 Jun 2024 16:33:40 +0800 Subject: [PATCH 3/4] checkstyle --- .../test/java/org/apache/pulsar/client/impl/ClientCnxTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 83547bc7b1381..56744a1053054 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -36,7 +35,6 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; From 38cfadd5e47b3e8d89cb5ced7f68ea6f7a81cc1c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 6 Jun 2024 11:28:44 +0800 Subject: [PATCH 4/4] fix test --- .../test/java/org/apache/pulsar/client/impl/ClientCnxTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 56744a1053054..df6b1b8a8f92f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -160,7 +160,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); Assert.fail("Expected an error that the broker version is too old."); } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().contains("version is too old to support getting partitions")); + Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); } // cleanup.