Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private ConnectionRegistryFactory() {
*/
static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class MasterRegistry implements ConnectionRegistry {
} else {
finalConf = conf;
}
finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>();
Expand Down Expand Up @@ -146,12 +147,13 @@ private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
return;
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));

return;
}
future.complete(transformResult.apply(rpcResult));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ public static Configuration getPeerClusterConfiguration(Configuration conf,
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.HbckService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
* @return the merged configuration with override properties and cluster key applied
*/
public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
String overridePrefix) throws IOException {
String overridePrefix) throws IOException {
Configuration clusterConf = HBaseConfiguration.create(baseConf);
if (clusterKey != null && !clusterKey.isEmpty()) {
applyClusterKeyToConf(clusterConf, clusterKey);
Expand All @@ -268,14 +268,21 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
* used to communicate with distant clusters
* @param conf configuration object to configure
* @param key string that contains the 3 required configuratins
* @throws IOException
*/
private static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{
throws IOException {
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
// Without the right registry, the above configs are useless. Also, we don't use setClass()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a check-then-fail that can be done to assert that the method invocation has significance? Maybe log a warning saying zk configs are being applied to a master registry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the logging. Don't think it should be warn though.

// here because the ConnectionRegistry* classes are not resolvable from this module.
// This will be broken if ZkConnectionRegistry class gets renamed or moved. Is there a better
// way?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use findClass and if there's an exception fall through to alternate or recovery code. Anyway, agreed, a reference to a class constant is not called for here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fall through to alternate or recovery code

There is no alternate or recovery from that point, no? The same error is propagated while creating the registry instance, so I guess we don't need to do it again here I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is going waste away. If user chooses zk registry, this code applies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. It is ok to leave as-is since it is a no-op?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding this method used by SyncTable, TableMapReduceUtil, TableOutputFormat, VerifyReplication, ExportSnapshot ... Per earlier discussion, replication will continue to use the ZK registry... what about the general MapReduce/Spark/Flink use-cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Good point.

TableMapReduceUtil: This happens only in initCredentialsForCluster() or if hbase.mapred.output.quorum is specified. Basically it only happens for a "peer" cluster. (same for TableOutputFormat, unless QUORUM_ADDRESS for a target is specified this doesn't happen). Overall, I think all MR jobs running on a single cluster will use master registry. I think that answers the MR/Spark/Flink usecases.

Now coming to MR jobs spanning multiple clusters (source and target) Ex: VerifyReplication / ExportSnapshot/SyncTable/TableOutputFormat etc

I think these need to be rewritten with new config params like hbase.[source|target].master.addrs for clients to pass the addresses so that they can use master registry.

What do you think? Should we rewrite them with new configs or maintain compatibility and keep using zkregistry?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think all MR jobs running on a single cluster will use master registry. I think that answers the MR/Spark/Flink usecases.

Sounds good.

I think these need to be rewritten with new config params like hbase.[source|target].master.addrs for clients to pass the addresses so that they can use master registry.

I think it's best to reduce the ZK-exposed surface area as much as possible. This seems a reasonable solution to me.

LOG.info("Overriding client registry implementation to {}",
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public enum OperationStatusCode {

public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;

/** Full class name of the Zookeeper based connection registry implementation */
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";

/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public LocalHBaseCluster(final Configuration conf)
*/
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
throws IOException {
this(conf, 1, noRegionServers, getMasterImplementation(conf),
this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}

Expand All @@ -106,7 +106,7 @@ public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers)
throws IOException {
this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}

Expand All @@ -122,6 +122,12 @@ private static Class<? extends HMaster> getMasterImplementation(final Configurat
HMaster.class);
}

public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
}

/**
* Constructor.
* @param conf Configuration to use. Post construction has the master's
Expand All @@ -134,9 +140,9 @@ private static Class<? extends HMaster> getMasterImplementation(final Configurat
*/
@SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers, final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass)
throws IOException {
final int noAlwaysStandByMasters, final int noRegionServers,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh. The final constructor args are Configuration, int, int, int, Class, Class. It's time for a builder with named arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, too long and confusing now.

final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this.conf = conf;

// When active, if a port selection is default then we switch to random
Expand Down Expand Up @@ -170,24 +176,22 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters,
this.masterClass = (Class<? extends HMaster>)
conf.getClass(HConstants.MASTER_IMPL, masterClass);
// Start the HMasters.
for (int i = 0; i < noMasters; i++) {
int i;
for (i = 0; i < noMasters; i++) {
addMaster(new Configuration(conf), i);
}

// Populate the master address host ports in the config. This is needed if a master based
// registry is configured for client metadata services (HBASE-18095)
List<String> masterHostPorts = new ArrayList<>();
getMasters().forEach(masterThread ->
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));

for (int j = 0; j < noAlwaysStandByMasters; j++) {
Configuration c = new Configuration(conf);
c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
addMaster(c, i + j);
}
// Start the HRegionServers.
this.regionServerClass =
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
regionServerClass);

for (int i = 0; i < noRegionServers; i++) {
addRegionServer(new Configuration(conf), i);
for (int j = 0; j < noRegionServers; j++) {
addRegionServer(new Configuration(conf), j);
}
}

Expand Down Expand Up @@ -233,8 +237,13 @@ public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager.
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
(Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt);
// Refresh the master address config.
List<String> masterHostPorts = new ArrayList<>();
getMasters().forEach(masterThread ->
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
return mt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public class ActiveMasterManager extends ZKListener {
final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
final AtomicBoolean clusterShutDown = new AtomicBoolean(false);

// This server's information.
private final ServerName sn;
private int infoPort;
private final Server master;
// This server's information. Package-private for child implementations.
int infoPort;
final ServerName sn;
final Server master;

// Active master's server name. Invalidated anytime active master changes (based on ZK
// notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it.
private volatile ServerName activeMasterServerName;
volatile ServerName activeMasterServerName;

/**
* @param watcher ZK watcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public HMaster(final Configuration conf) throws IOException {
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
} else {
this.metaRegionLocationCache = null;
this.activeMasterManager = null;
Expand All @@ -589,6 +589,15 @@ public HMaster(final Configuration conf) throws IOException {
}
}

/**
* Protected to have custom implementations in tests override the default ActiveMaster
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
return new ActiveMasterManager(zk, sn, server);
}

@Override
protected String getUseThisHostnameInstead(Configuration conf) {
return conf.get(MASTER_HOSTNAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,17 @@ public boolean registerService(com.google.protobuf.Service instance) {
return true;
}

private Configuration unsetClientZookeeperQuorum() {
private Configuration cleanupConfiguration() {
Configuration conf = this.conf;
// We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
// - Decouples RS and master life cycles. RegionServers can continue be up independent of
// masters' availability.
// - Configuration management for region servers (cluster internal) is much simpler when adding
// new masters or removing existing masters, since only clients' config needs to be updated.
// - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
// other internal connections too.
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone
// the conf and unset the client ZK related properties
Expand Down Expand Up @@ -824,7 +833,7 @@ public String getClusterId() {
*/
protected final synchronized void setupClusterConnection() throws IOException {
if (asyncClusterConnection == null) {
Configuration conf = unsetClientZookeeperQuorum();
Configuration conf = cleanupConfiguration();
InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
User user = userProvider.getCurrent();
asyncClusterConnection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.client.protocol.acl", AdminService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.HbckService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.ClientMetaService.BlockingInterface.class),
new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,8 +1114,9 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
Expand Down Expand Up @@ -1231,6 +1232,7 @@ public void restartHBaseCluster(int servers, List<Integer> ports)
StartMiniClusterOption option =
StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
restartHBaseCluster(option);
invalidateConnection();
}

public void restartHBaseCluster(StartMiniClusterOption option)
Expand All @@ -1244,8 +1246,9 @@ public void restartHBaseCluster(StartMiniClusterOption option)
this.asyncConnection = null;
}
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
Expand Down Expand Up @@ -3045,6 +3048,26 @@ private void initConnection() throws IOException {
this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
}

/**
* Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and
* the connection is not valid anymore.
* TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
* written, not all start() stop() calls go through this class. Most tests directly operate on
* the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
* maintain the connection state automatically. Cleaning this is a much bigger refactor.
*/
public void invalidateConnection() throws IOException {
closeConnection();
// Update the master addresses if they changed.
final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
masterConfigBefore, masterConfAfter);
conf.set(HConstants.MASTER_ADDRS_KEY,
getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
}

/**
* Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
* thread-safe).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public MiniClusterRule(final StartMiniClusterOption miniClusterOptions) {
this.miniClusterOptions = miniClusterOptions;
}

/**
* @return the underlying instance of {@link HBaseTestingUtility}
*/
public HBaseTestingUtility getTestingUtility() {
return testingUtility;
}

/**
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link AsyncConnection#close() close()} the connection when finished.
Expand Down
Loading