diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index c818dee124a88..96ea8877c5b61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -140,6 +140,15 @@ private static class Arguments { hidden = false) private String configurationMetadataStore; + @Option(names = {"-mscp", + "--metadata-store-config-path"}, description = "Metadata Store config path", hidden = false) + private String metadataStoreConfigPath; + + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--initial-num-stream-storage-containers" }, description = "Num storage containers of BookKeeper stream storage") @@ -283,9 +292,11 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe log.info("Setting up cluster {} with metadata-store={} configuration-metadata-store={}", arguments.cluster, arguments.metadataStoreUrl, arguments.configurationMetadataStore); - MetadataStoreExtended localStore = - initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis); + MetadataStoreExtended localStore = initLocalMetadataStore(arguments.metadataStoreUrl, + arguments.metadataStoreConfigPath, + arguments.zkSessionTimeoutMillis); MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore, + arguments.configurationStoreConfigPath, arguments.zkSessionTimeoutMillis); final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl @@ -464,9 +475,17 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam } } - public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception { + public static MetadataStoreExtended initLocalMetadataStore(String connection, + int sessionTimeout) throws Exception { + return initLocalMetadataStore(connection, null, sessionTimeout); + } + + public static MetadataStoreExtended initLocalMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { @@ -475,10 +494,19 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, in return store; } - public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout) + public static MetadataStoreExtended initConfigMetadataStore(String connection, + int sessionTimeout) + throws Exception { + return initConfigMetadataStore(connection, null, sessionTimeout); + } + + public static MetadataStoreExtended initConfigMetadataStore(String connection, + String configPath, + int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .configFilePath(configPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java index 891aa1aa42120..912f43958f469 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java @@ -44,6 +44,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -85,8 +90,10 @@ public static int doMain(String[] args) throws Exception { return 1; } - try (MetadataStore configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStore configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); for (String namespace : arguments.namespaces) { NamespaceName namespaceName = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java index 57b67b011913f..06b68decf36f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java @@ -44,6 +44,11 @@ private static class Arguments { "--configuration-store" }, description = "Configuration Store connection string", required = true) private String configurationStore; + @Option(names = {"-cmscp", + "--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path", + hidden = false) + private String configurationStoreConfigPath; + @Option(names = { "--zookeeper-session-timeout-ms" }, description = "Local zookeeper session timeout ms") @@ -92,8 +97,10 @@ public static void main(String[] args) throws Exception { System.exit(1); } - try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup - .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup.initConfigMetadataStore( + arguments.configurationStore, + arguments.configurationStoreConfigPath, + arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); // Create system tenant PulsarClusterMetadataSetup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index 4267c7564fa6f..0c402a83e4227 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -74,10 +74,11 @@ public void testReSetupClusterMetadata() throws Exception { "--cluster", "testReSetupClusterMetadata-cluster", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", "--web-service-url", "http://127.0.0.1:8080", "--web-service-url-tls", "https://127.0.0.1:8443", "--broker-service-url", "pulsar://127.0.0.1:6650", - "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651" + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" }; PulsarClusterMetadataSetup.main(args); SortedMap data1 = localZkS.dumpData(); diff --git a/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf new file mode 100644 index 0000000000000..c59e093450d39 --- /dev/null +++ b/pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +zookeeper.sasl.client=true