Skip to content

Commit

Permalink
[improve][broker] Make cluster metadata init command support metadata…
Browse files Browse the repository at this point in the history
… config path (apache#23269)

(cherry picked from commit 46f99b9)
(cherry picked from commit fa2e7d8)
  • Loading branch information
Demogorgon314 authored and nikhil-ctds committed Nov 6, 2024
1 parent 02e8f69 commit 8f9f701
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,17 @@ private static class Arguments {
hidden = false)
private String configurationMetadataStore;

@Parameter(names = {"-mscp",
"--metadata-store-config-path"}, description = "Metadata Store config path", hidden = false)
private String metadataStoreConfigPath;

@Parameter(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Parameter(names = {
"--initial-num-stream-storage-containers"
"--initial-num-stream-storage-containers"
}, description = "Num storage containers of BookKeeper stream storage")
private int numStreamStorageContainers = 16;

Expand Down Expand Up @@ -281,9 +290,11 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe
log.info("Setting up cluster {} with metadata-store={} configuration-metadata-store={}", arguments.cluster,
arguments.metadataStoreUrl, arguments.configurationMetadataStore);

MetadataStoreExtended localStore =
initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended localStore = initLocalMetadataStore(arguments.metadataStoreUrl,
arguments.metadataStoreConfigPath,
arguments.zkSessionTimeoutMillis);
MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis);

final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
Expand Down Expand Up @@ -462,9 +473,17 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam
}
}

public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
public static MetadataStoreExtended initLocalMetadataStore(String connection,
int sessionTimeout) throws Exception {
return initLocalMetadataStore(connection, null, sessionTimeout);
}

public static MetadataStoreExtended initLocalMetadataStore(String connection,
String configPath,
int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.configFilePath(configPath)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
Expand All @@ -473,10 +492,19 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, in
return store;
}

public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
public static MetadataStoreExtended initConfigMetadataStore(String connection,
int sessionTimeout)
throws Exception {
return initConfigMetadataStore(connection, null, sessionTimeout);
}

public static MetadataStoreExtended initConfigMetadataStore(String connection,
String configPath,
int sessionTimeout)
throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.configFilePath(configPath)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ private static class Arguments {
"--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Parameter(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Parameter(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
Expand Down Expand Up @@ -82,8 +87,10 @@ public static int doMain(String[] args) throws Exception {
return 1;
}

try (MetadataStore configStore = PulsarClusterMetadataSetup
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
try (MetadataStore configStore = PulsarClusterMetadataSetup.initConfigMetadataStore(
arguments.configurationStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ private static class Arguments {
"--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Parameter(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Parameter(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
Expand Down Expand Up @@ -90,8 +95,10 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup.initConfigMetadataStore(
arguments.configurationStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
// Create system tenant
PulsarClusterMetadataSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ public void testReSetupClusterMetadata() throws Exception {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data1 = localZkS.dumpData();
Expand Down Expand Up @@ -505,11 +506,9 @@ static class ZookeeperServerTest implements Closeable {
private ZooKeeperServer zks;
private NIOServerCnxnFactory serverFactory;
private final int zkPort;
private final String hostPort;

public ZookeeperServerTest(int zkPort) throws IOException {
this.zkPort = zkPort;
this.hostPort = "127.0.0.1:" + zkPort;
this.zkTmpDir = File.createTempFile("zookeeper", "test");
log.info("**** Start GZK on {} ****", zkTmpDir);
if (!zkTmpDir.delete() || !zkTmpDir.mkdir()) {
Expand All @@ -519,15 +518,17 @@ public ZookeeperServerTest(int zkPort) throws IOException {

public void start() throws IOException {
try {
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
zks.setMaxSessionTimeout(20000);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(zkPort), 1000);
serverFactory.configure(new InetSocketAddress("127.0.0.1", zkPort), 1000);
serverFactory.startup(zks);
} catch (Exception e) {
log.error("Exception while instantiating ZooKeeper", e);
}

String hostPort = "127.0.0.1:" + serverFactory.getLocalPort();
LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000);
log.info("ZooKeeper started at {}", hostPort);
}
Expand Down
20 changes: 20 additions & 0 deletions pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

zookeeper.sasl.client=true

0 comments on commit 8f9f701

Please sign in to comment.