From 7b7b8563d452bf9fe0501700fc9abb66de25000c Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 9 Sep 2024 10:11:21 +0800 Subject: [PATCH 1/2] Make cluster metadata init command support configurationStoreConfigPath --- .../pulsar/PulsarClusterMetadataSetup.java | 26 ++++++++++++++++--- .../pulsar/PulsarInitialNamespaceSetup.java | 11 ++++++-- ...arTransactionCoordinatorMetadataSetup.java | 11 ++++++-- .../zookeeper/ClusterMetadataSetupTest.java | 7 ++--- .../resources/conf/zk_client_enable_sasl.conf | 20 ++++++++++++++ 5 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index c818dee124a88..53696cd915e81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -140,6 +141,15 @@ private static class Arguments { hidden = false) private String configurationMetadataStore; + @Option(names = {"-mscp", + "--metadata-store-config-path"}, description = "Metadata Store config path", hidden = false) + private String metadataStoreConfigPath; + + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--initial-num-stream-storage-containers" }, description = "Num storage containers of BookKeeper stream storage") @@ -283,9 +293,11 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe log.info("Setting up cluster {} with metadata-store={} configuration-metadata-store={}", arguments.cluster, arguments.metadataStoreUrl, arguments.configurationMetadataStore); - MetadataStoreExtended localStore = - initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis); + MetadataStoreExtended localStore = initLocalMetadataStore(arguments.metadataStoreUrl, + arguments.metadataStoreConfigPath, + arguments.zkSessionTimeoutMillis); MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore, + arguments.configurationStoreConfigPath, arguments.zkSessionTimeoutMillis); final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl @@ -464,9 +476,12 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam } } - public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception { + public static MetadataStoreExtended initLocalMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { @@ -475,10 +490,13 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, in return store; } - public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout) + public static MetadataStoreExtended initConfigMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java index 891aa1aa42120..912f43958f469 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java @@ -44,6 +44,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -85,8 +90,10 @@ public static int doMain(String[] args) throws Exception { return 1; } - try (MetadataStore configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStore configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); for (String namespace : arguments.namespaces) { NamespaceName namespaceName = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java index 57b67b011913f..06b68decf36f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java @@ -44,6 +44,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -92,8 +97,10 @@ public static void main(String[] args) throws Exception { System.exit(1); } - try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); // Create system tenant PulsarClusterMetadataSetup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index 4267c7564fa6f..d1fe0fc497b34 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -74,10 +74,11 @@ public void testReSetupClusterMetadata() throws Exception { "--cluster", "testReSetupClusterMetadata-cluster", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", "--web-service-url", "http://127.0.0.1:8080", "--web-service-url-tls", "https://127.0.0.1:8443", "--broker-service-url", "pulsar://127.0.0.1:6650", - "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651" + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" }; PulsarClusterMetadataSetup.main(args); SortedMap data1 = localZkS.dumpData(); @@ -330,7 +331,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(args); try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup - .initLocalMetadataStore(zkConnection, 30000)) { + .initLocalMetadataStore(zkConnection, null, 30000)) { // expected not exist assertFalse(localStore.exists("/ledgers").get()); @@ -347,7 +348,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs); try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup - .initLocalMetadataStore(zkConnection, 30000)) { + .initLocalMetadataStore(zkConnection, null, 30000)) { // expected not exist assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get()); } diff --git a/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf new file mode 100644 index 0000000000000..c59e093450d39 --- /dev/null +++ b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +zookeeper.sasl.client=true From 2b8eaf5d2c17ca02ce85e46f1ecce29ebc8ead4d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 9 Sep 2024 14:41:20 +0800 Subject: [PATCH 2/2] Keep backwards compatibility --- .../apache/pulsar/PulsarClusterMetadataSetup.java | 12 +++++++++++- .../broker/zookeeper/ClusterMetadataSetupTest.java | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 53696cd915e81..96ea8877c5b61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -35,7 +35,6 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -476,6 +475,11 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam } } + public static MetadataStoreExtended initLocalMetadataStore(String connection, + int sessionTimeout) throws Exception { + return initLocalMetadataStore(connection, null, sessionTimeout); + } + public static MetadataStoreExtended initLocalMetadataStore(String connection, String configPath, int sessionTimeout) throws Exception { @@ -490,6 +494,12 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, return store; } + public static MetadataStoreExtended initConfigMetadataStore(String connection, + int sessionTimeout) + throws Exception { + return initConfigMetadataStore(connection, null, sessionTimeout); + } + public static MetadataStoreExtended initConfigMetadataStore(String connection, String configPath, int sessionTimeout) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index d1fe0fc497b34..0c402a83e4227 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -331,7 +331,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(args); try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup - .initLocalMetadataStore(zkConnection, null, 30000)) { + .initLocalMetadataStore(zkConnection, 30000)) { // expected not exist assertFalse(localStore.exists("/ledgers").get()); @@ -348,7 +348,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs); try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup - .initLocalMetadataStore(zkConnection, null, 30000)) { + .initLocalMetadataStore(zkConnection, 30000)) { // expected not exist assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get()); }