Skip to content

Commit

Permalink
Add a util function to generate metadata service url
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 committed Feb 25, 2020
1 parent 5761996 commit 786b310
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
if (StringUtils.isNotBlank(conf.getBookkeeperServiceUri())) {
bkConf.setMetadataServiceUri(conf.getBookkeeperServiceUri());
} else {
PulsarService pulsar = new PulsarService(conf);
String metadataServiceUri = pulsar.getMetadataServiceUri();
String metadataServiceUri = PulsarService.bookieMetadataServiceUri(conf);
bkConf.setMetadataServiceUri(metadataServiceUri);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,19 +713,11 @@ public ZooKeeper getZkClient() {
return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
}

/**
* Get default bookkeeper metadata service uri.
*/
public String getMetadataServiceUri() {
ClientConfiguration bkConf = new ClientConfiguration();
// init bookkeeper metadata service uri
String metadataServiceUri = null;
try {
String zkServers = this.getConfiguration().getZookeeperServers();
String ledgerManagerType = bkConf.getLedgerManagerLayoutStringFromFactoryClass();
metadataServiceUri = String.format("zk+%s://%s%s", ledgerManagerType,
zkServers.replace(",", ";"), "/ledgers");
} catch (ConfigurationException e) {
LOG.error("Failed to set bookkeeper metadata service uri", e);
}
return metadataServiceUri;
return bookieMetadataServiceUri(this.getConfiguration());
}

public InternalConfigurationData getInternalConfigurationData() {
Expand Down Expand Up @@ -1071,6 +1063,27 @@ public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}

/**
* Get bookkeeper metadata service uri.
*
* @param config broker configuration
* @return the metadata service uri that bookkeeper is used
*/
public static String bookieMetadataServiceUri(ServiceConfiguration config) {
ClientConfiguration bkConf = new ClientConfiguration();
// init bookkeeper metadata service uri
String metadataServiceUri = null;
try {
String zkServers = config.getZookeeperServers();
String ledgerManagerType = bkConf.getLedgerManagerLayoutStringFromFactoryClass();
metadataServiceUri = String.format("zk+%s://%s%s", ledgerManagerType,
zkServers.replace(",", ";"), "/ledgers");
} catch (ConfigurationException e) {
LOG.error("Failed to get bookkeeper metadata service uri", e);
}
return metadataServiceUri;
}

private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|brokerClientAuthenticationParameters|||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
|bookkeeperServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location ||
|bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies ||
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
Expand Down

0 comments on commit 786b310

Please sign in to comment.