diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index f58dfba405a8..f56a82e93284 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -83,7 +83,7 @@ class AsyncConnectionImpl implements AsyncConnection { private final User user; - final AsyncRegistry registry; + final ConnectionRegistry registry; private final int rpcTimeout; @@ -118,7 +118,7 @@ class AsyncConnectionImpl implements AsyncConnection { private final ClusterStatusListener clusterStatusListener; - public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, + public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, User user) { this.conf = conf; this.user = user; @@ -248,7 +248,7 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException { CompletableFuture getMasterStub() { return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { CompletableFuture future = new CompletableFuture<>(); - addListener(registry.getMasterAddress(), (addr, error) -> { + addListener(registry.getActiveMaster(), (addr, error) -> { if (error != null) { future.completeExceptionally(error); } else if (addr == null) { @@ -342,7 +342,7 @@ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName @Override public CompletableFuture getHbck() { CompletableFuture future = new CompletableFuture<>(); - addListener(registry.getMasterAddress(), (sn, error) -> { + addListener(registry.getActiveMaster(), (sn, error) -> { if (error != null) { future.completeExceptionally(error); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 50dab24f2925..9df8efb8a63d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -36,14 +36,14 @@ @InterfaceAudience.Private class AsyncMetaRegionLocator { - private final AsyncRegistry registry; + private final ConnectionRegistry registry; private final AtomicReference metaRegionLocations = new AtomicReference<>(); private final AtomicReference> metaRelocateFuture = new AtomicReference<>(); - AsyncMetaRegionLocator(AsyncRegistry registry) { + AsyncMetaRegionLocator(ConnectionRegistry registry) { this.registry = registry; } @@ -58,7 +58,7 @@ class AsyncMetaRegionLocator { */ CompletableFuture getRegionLocations(int replicaId, boolean reload) { return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload, - registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location"); + registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location"); } private HRegionLocation getCacheLocation(HRegionLocation loc) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index 0e116ab6f3d7..fa3ea1ca4dfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -55,7 +55,7 @@ public CompletableFuture getRegionLocation(byte[] row, int repl @Override public CompletableFuture> getAllRegionLocations() { if (TableName.isMetaTableName(tableName)) { - return conn.registry.getMetaRegionLocation() + return conn.registry.getMetaRegionLocations() .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); } return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index b36485fdf011..f91b2107c6b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -281,7 +281,7 @@ public static CompletableFuture createAsyncConnection(Configura public static CompletableFuture createAsyncConnection(Configuration conf, final User user) { CompletableFuture future = new CompletableFuture<>(); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); + ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); addListener(registry.getClusterId(), (clusterId, error) -> { if (error != null) { registry.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 7bdda26e05be..06e243bf4c45 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -217,7 +217,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { /** * Cluster registry of basic info such as clusterid and meta region location. */ - private final AsyncRegistry registry; + private final ConnectionRegistry registry; private final ClientBackoffPolicy backoffPolicy; @@ -303,7 +303,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.conf.get(BufferedMutator.CLASSNAME_KEY); try { - this.registry = AsyncRegistryFactory.getRegistry(conf); + this.registry = ConnectionRegistryFactory.getRegistry(conf); retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); @@ -434,7 +434,7 @@ public Admin getAdmin() throws IOException { @Override public Hbck getHbck() throws IOException { - return getHbck(get(registry.getMasterAddress())); + return getHbck(get(registry.getActiveMaster())); } @Override @@ -811,7 +811,7 @@ private RegionLocations locateMeta(final TableName tableName, } // Look up from zookeeper - locations = get(this.registry.getMetaRegionLocation()); + locations = get(this.registry.getMetaRegionLocations()); if (locations != null) { cacheLocation(tableName, locations); } @@ -1162,7 +1162,7 @@ private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) */ private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() throws IOException, KeeperException { - ServerName sn = get(registry.getMasterAddress()); + ServerName sn = get(registry.getActiveMaster()); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); @@ -1211,7 +1211,7 @@ MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { @Override public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { - return getAdmin(get(registry.getMasterAddress())); + return getAdmin(get(registry.getActiveMaster())); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java similarity index 77% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java index 9537777db1ad..cd22d7861b4c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java @@ -24,16 +24,17 @@ import org.apache.yetus.audience.InterfaceAudience; /** - * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. + * Registry for meta information needed for connection setup to a HBase cluster. Implementations + * hold cluster information such as this cluster's id, location of hbase:meta, etc.. * Internal use only. */ @InterfaceAudience.Private -interface AsyncRegistry extends Closeable { +interface ConnectionRegistry extends Closeable { /** - * Get the location of meta region. + * Get the location of meta region(s). */ - CompletableFuture getMetaRegionLocation(); + CompletableFuture getMetaRegionLocations(); /** * Should only be called once. @@ -43,9 +44,9 @@ interface AsyncRegistry extends Closeable { CompletableFuture getClusterId(); /** - * Get the address of HMaster. + * Get the address of active HMaster. */ - CompletableFuture getMasterAddress(); + CompletableFuture getActiveMaster(); /** * Closes this instance and releases any system resources associated with it diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java similarity index 66% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index 28726ae5dd20..80d358bc71a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,26 +18,28 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; /** - * Get instance of configured Registry. + * Factory class to get the instance of configured connection registry. */ @InterfaceAudience.Private -final class AsyncRegistryFactory { +final class ConnectionRegistryFactory { - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY = + "hbase.client.connection.registry.impl"; - private AsyncRegistryFactory() { + private ConnectionRegistryFactory() { } /** - * @return The cluster registry implementation to use. + * @return The connection registry implementation to use. */ - static AsyncRegistry getRegistry(Configuration conf) { - Class clazz = - conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); + static ConnectionRegistry getRegistry(Configuration conf) { + Class clazz = conf.getClass( + CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class, + ConnectionRegistry.class); return ReflectionUtils.newInstance(clazz, conf); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index e48438390832..ee32e429eea8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; - import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; import java.io.IOException; @@ -46,7 +45,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -98,14 +96,12 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; @@ -722,7 +718,7 @@ public CompletableFuture isTableAvailable(TableName tableName, byte[][] private CompletableFuture isTableAvailable(TableName tableName, Optional splitKeys) { if (TableName.isMetaTableName(tableName)) { - return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream + return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); } CompletableFuture future = new CompletableFuture<>(); @@ -784,7 +780,8 @@ private boolean compareRegionsWithSplitKeys(List locations, byt } @Override - public CompletableFuture addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { + public CompletableFuture addColumnFamily( + TableName tableName, ColumnFamilyDescriptor columnFamily) { return this. procedureCall(tableName, RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), @@ -838,10 +835,10 @@ public CompletableFuture getNamespaceDescriptor(String name . newMasterCaller() .action( (controller, stub) -> this - . call( - controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c, - req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil - .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); + . + call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), + (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) + -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); } @Override @@ -859,13 +856,12 @@ public CompletableFuture> listNamespaces() { @Override public CompletableFuture> listNamespaceDescriptors() { return this - .> newMasterCaller() - .action( - (controller, stub) -> this - .> call( - controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, - done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil - .toNamespaceDescriptorList(resp))).call(); + .> newMasterCaller().action((controller, stub) -> this + .> call(controller, stub, + ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) -> + s.listNamespaceDescriptors(c, req, done), + (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call(); } @Override @@ -882,7 +878,7 @@ public CompletableFuture> getRegions(ServerName serverName) { @Override public CompletableFuture> getRegions(TableName tableName) { if (tableName.equals(META_TABLE_NAME)) { - return connection.registry.getMetaRegionLocation() + return connection.registry.getMetaRegionLocations() .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) .collect(Collectors.toList())); } else { @@ -1097,9 +1093,7 @@ private CompletableFuture compactRegion(byte[] regionName, byte[] columnFa private CompletableFuture> getTableHRegionLocations(TableName tableName) { if (TableName.META_TABLE_NAME.equals(tableName)) { CompletableFuture> future = new CompletableFuture<>(); - // For meta table, we use zk to fetch all locations. - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); - addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> { + addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { if (err != null) { future.completeExceptionally(err); } else if (metaRegions == null || metaRegions.isEmpty() || @@ -1108,8 +1102,6 @@ private CompletableFuture> getTableHRegionLocations(TableN } else { future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); } - // close the registry. - IOUtils.closeQuietly(registry); }); return future; } else { @@ -1127,7 +1119,7 @@ private CompletableFuture compact(TableName tableName, byte[] columnFamily switch (compactType) { case MOB: - addListener(connection.registry.getMasterAddress(), (serverName, err) -> { + addListener(connection.registry.getActiveMaster(), (serverName, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -1792,11 +1784,8 @@ private CompletableFuture> listReplicationPeers return this .> newMasterCaller() .action( - (controller, stub) -> this - .> call( - controller, - stub, - request, + (controller, stub) -> this.> call(controller, stub, request, (s, c, req, done) -> s.listReplicationPeers(c, req, done), (resp) -> resp.getPeerDescList().stream() .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) @@ -2307,11 +2296,13 @@ public CompletableFuture getLocks() { } @Override - public CompletableFuture decommissionRegionServers(List servers, boolean offload) { + public CompletableFuture decommissionRegionServers( + List servers, boolean offload) { return this. newMasterCaller() .action((controller, stub) -> this . call( - controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload), + controller, stub, + RequestConverter.buildDecommissionRegionServersRequest(servers, offload), (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) .call(); } @@ -2333,11 +2324,11 @@ List> call( public CompletableFuture recommissionRegionServer(ServerName server, List encodedRegionNames) { return this. newMasterCaller() - .action((controller, stub) -> this - . call(controller, - stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), - (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) - .call(); + .action((controller, stub) -> + this. call( + controller, stub, RequestConverter.buildRecommissionRegionServerRequest( + server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer( + c, req, done), resp -> null)).call(); } /** @@ -2358,7 +2349,7 @@ CompletableFuture getRegionLocation(byte[] regionNameOrEncodedR String encodedName = Bytes.toString(regionNameOrEncodedRegionName); if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { // old format encodedName, should be meta region - future = connection.registry.getMetaRegionLocation() + future = connection.registry.getMetaRegionLocations() .thenApply(locs -> Stream.of(locs.getRegionLocations()) .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); } else { @@ -2369,7 +2360,7 @@ CompletableFuture getRegionLocation(byte[] regionNameOrEncodedR RegionInfo regionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); if (regionInfo.isMetaRegion()) { - future = connection.registry.getMetaRegionLocation() + future = connection.registry.getMetaRegionLocations() .thenApply(locs -> Stream.of(locs.getRegionLocations()) .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) .findFirst()); @@ -2403,7 +2394,6 @@ CompletableFuture getRegionLocation(byte[] regionNameOrEncodedR * Get the region info for the passed region name. The region name may be a full region name or * encoded region name. If the region does not found, then it'll throw an UnknownRegionException * wrapped by a {@link CompletableFuture} - * @param regionNameOrEncodedRegionName * @return region info, wrapped by a {@link CompletableFuture} */ private CompletableFuture getRegionInfo(byte[] regionNameOrEncodedRegionName) { @@ -2894,10 +2884,11 @@ public CompletableFuture> getSecurityCapabilities() { .> newMasterCaller() .action( (controller, stub) -> this - .> call( - controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req, - done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil - .toSecurityCapabilityList(resp.getCapabilitiesList()))).call(); + .> + call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), + (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), + (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) + .call(); } @Override @@ -2942,7 +2933,7 @@ public CompletableFuture getCompactionState(TableName tableName switch (compactType) { case MOB: - addListener(connection.registry.getMasterAddress(), (serverName, err) -> { + addListener(connection.registry.getActiveMaster(), (serverName, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -3074,14 +3065,10 @@ public CompletableFuture> getLastMajorCompactionTimestamp(TableNa MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return this - .> newMasterCaller() - .action( - (controller, stub) -> this - .> call( - controller, stub, request, - (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), - ProtobufUtil::toOptionalTimestamp)).call(); + return this.> newMasterCaller().action((controller, stub) -> + this.> + call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp( + c, req, done), ProtobufUtil::toOptionalTimestamp)).call(); } @Override @@ -3221,11 +3208,10 @@ public CompletableFuture balance(boolean forcible) { public CompletableFuture isBalancerEnabled() { return this . newMasterCaller() - .action( - (controller, stub) -> this. call( - controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), - (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) - .call(); + .action((controller, stub) -> + this. call(controller, + stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done) + -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java similarity index 92% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index b6bacc03e60b..42a418859f18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -50,15 +50,15 @@ * Zookeeper based registry implementation. */ @InterfaceAudience.Private -class ZKAsyncRegistry implements AsyncRegistry { +class ZKConnectionRegistry implements ConnectionRegistry { - private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class); + private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class); private final ReadOnlyZKClient zk; private final ZNodePaths znodePaths; - ZKAsyncRegistry(Configuration conf) { + ZKConnectionRegistry(Configuration conf) { this.znodePaths = new ZNodePaths(conf); this.zk = new ReadOnlyZKClient(conf); } @@ -93,7 +93,7 @@ private static String getClusterId(byte[] data) throws DeserializationException @Override public CompletableFuture getClusterId() { - return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId); + return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId); } @VisibleForTesting @@ -144,7 +144,7 @@ private void getMetaRegionLocation(CompletableFuture future, int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode); String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode); if (replicaId == DEFAULT_REPLICA_ID) { - addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { + addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -162,7 +162,7 @@ private void getMetaRegionLocation(CompletableFuture future, tryComplete(remaining, locs, future); }); } else { - addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> { + addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> { if (future.isDone()) { return; } @@ -191,7 +191,7 @@ private void getMetaRegionLocation(CompletableFuture future, } @Override - public CompletableFuture getMetaRegionLocation() { + public CompletableFuture getMetaRegionLocations() { CompletableFuture future = new CompletableFuture<>(); addListener( zk.list(znodePaths.baseZNode) @@ -217,8 +217,8 @@ private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOExcep } @Override - public CompletableFuture getMasterAddress() { - return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) + public CompletableFuture getActiveMaster() { + return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto) .thenApply(proto -> { if (proto == null) { return null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 74a21afb5edd..46cef7bfc2f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionLoadStats; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionStatesCount; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -93,6 +94,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufMessageConverter; import org.apache.hadoop.hbase.quotas.QuotaScope; @@ -375,7 +377,9 @@ private static IOException makeIOExceptionOfException(Exception e) { * @see #toServerName(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName) */ public static HBaseProtos.ServerName toServerName(final ServerName serverName) { - if (serverName == null) return null; + if (serverName == null) { + return null; + } HBaseProtos.ServerName.Builder builder = HBaseProtos.ServerName.newBuilder(); builder.setHostName(serverName.getHostname()); @@ -3071,6 +3075,44 @@ public static ProcedureDescription buildProcedureDescription(String signature, S return builder.build(); } + /** + * Get the Meta region state from the passed data bytes. Can handle both old and new style + * server names. + * @param data protobuf serialized data with meta server name. + * @param replicaId replica ID for this region + * @return RegionState instance corresponding to the serialized data. + * @throws DeserializationException if the data is invalid. + */ + public static RegionState parseMetaRegionStateFrom(final byte[] data, int replicaId) + throws DeserializationException { + RegionState.State state = RegionState.State.OPEN; + ServerName serverName; + if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) { + try { + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.MetaRegionServer rl = + ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, + data.length - prefixLen); + if (rl.hasState()) { + state = RegionState.State.convert(rl.getState()); + } + HBaseProtos.ServerName sn = rl.getServer(); + serverName = ServerName.valueOf( + sn.getHostName(), sn.getPort(), sn.getStartCode()); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException("Unable to parse meta region location"); + } + } else { + // old style of meta region location? + serverName = parseServerNameFrom(data); + } + if (serverName == null) { + state = RegionState.State.OFFLINE; + } + return new RegionState(RegionReplicaUtil.getRegionInfoForReplica( + RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), state, serverName); + } + /** * Get a ServerName from the passed in data bytes. * @param data Data with a serialize server name in it; can handle the old style diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index 98d7eb72eb0b..5c49808807ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -41,7 +41,8 @@ public class ZNodePaths { // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. public static final char ZNODE_PATH_SEPARATOR = '/'; - private static final String META_ZNODE_PREFIX = "meta-region-server"; + public static final String META_ZNODE_PREFIX_CONF_KEY = "zookeeper.znode.metaserver"; + public static final String META_ZNODE_PREFIX = "meta-region-server"; private static final String DEFAULT_SNAPSHOT_CLEANUP_ZNODE = "snapshot-cleanup"; // base znode for this cluster @@ -104,7 +105,7 @@ public class ZNodePaths { public ZNodePaths(Configuration conf) { baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); ImmutableMap.Builder builder = ImmutableMap.builder(); - metaZNodePrefix = conf.get("zookeeper.znode.metaserver", META_ZNODE_PREFIX); + metaZNodePrefix = conf.get(META_ZNODE_PREFIX_CONF_KEY, META_ZNODE_PREFIX); String defaultMetaReplicaZNode = ZNodePaths.joinZNode(baseZNode, metaZNodePrefix); builder.put(DEFAULT_REPLICA_ID, defaultMetaReplicaZNode); int numMetaReplicas = conf.getInt(META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM); @@ -189,7 +190,19 @@ public String getZNodeForReplica(int replicaId) { } /** - * Parse the meta replicaId from the passed znode name. + * Parses the meta replicaId from the passed path. + * @param path the name of the full path which includes baseZNode. + * @return replicaId + */ + public int getMetaReplicaIdFromPath(String path) { + // Extract the znode from path. The prefix is of the following format. + // baseZNode + PATH_SEPARATOR. + int prefixLen = baseZNode.length() + 1; + return getMetaReplicaIdFromZnode(path.substring(prefixLen)); + } + + /** + * Parse the meta replicaId from the passed znode * @param znode the name of the znode, does not include baseZNode * @return replicaId */ diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java similarity index 89% rename from hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java rename to hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java index 8c7b07384c72..4bd66877b1b4 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java @@ -27,13 +27,13 @@ * Registry that does nothing. Otherwise, default Registry wants zookeeper up and running. */ @InterfaceAudience.Private -class DoNothingAsyncRegistry implements AsyncRegistry { +class DoNothingConnectionRegistry implements ConnectionRegistry { - public DoNothingAsyncRegistry(Configuration conf) { + public DoNothingConnectionRegistry(Configuration conf) { } @Override - public CompletableFuture getMetaRegionLocation() { + public CompletableFuture getMetaRegionLocations() { return CompletableFuture.completedFuture(null); } @@ -43,7 +43,7 @@ public CompletableFuture getClusterId() { } @Override - public CompletableFuture getMasterAddress() { + public CompletableFuture getActiveMaster() { return CompletableFuture.completedFuture(null); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java index 46d786e77e94..ce5c321767fc 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java @@ -142,7 +142,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class), any()); - conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", UserProvider.instantiate(CONF).getCurrent()) { @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java index f29c3bfcc8e2..b306500c8b13 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -43,21 +43,21 @@ public class TestAsyncMetaRegionLocatorFailFast { private static AsyncMetaRegionLocator LOCATOR; - private static final class FaultyAsyncRegistry extends DoNothingAsyncRegistry { + private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry { - public FaultyAsyncRegistry(Configuration conf) { + public FaultyConnectionRegistry(Configuration conf) { super(conf); } @Override - public CompletableFuture getMetaRegionLocation() { + public CompletableFuture getMetaRegionLocations() { return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error")); } } @BeforeClass public static void setUp() { - LOCATOR = new AsyncMetaRegionLocator(new FaultyAsyncRegistry(CONF)); + LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF)); } @Test(expected = DoNotRetryIOException.class) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 6afe0b59b82e..bf7f03f53576 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -462,7 +462,7 @@ private static interface ResponseGenerator { * Returns our async process. */ static class MyConnectionImpl extends ConnectionImplementation { - public static class TestRegistry extends DoNothingAsyncRegistry { + public static class TestRegistry extends DoNothingConnectionRegistry { public TestRegistry(Configuration conf) { super(conf); @@ -481,8 +481,8 @@ protected MyConnectionImpl(Configuration conf) throws IOException { } private static Configuration setupConf(Configuration conf) { - conf.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, - AsyncRegistry.class); + conf.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + TestRegistry.class, ConnectionRegistry.class); return conf; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 56dcf10beefa..8c22946649dc 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -175,7 +175,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); - conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", UserProvider.instantiate(CONF).getCurrent()) { @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java index f8e12954b1e9..bb065f46be23 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -58,7 +58,8 @@ public static class MyBufferedMutator extends BufferedMutatorImpl { public void testAlternateBufferedMutatorImpl() throws IOException { BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(name.getMethodName())); Configuration conf = HBaseConfiguration.create(); - conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName()); + conf.set(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + DoNothingConnectionRegistry.class.getName()); try (Connection connection = ConnectionFactory.createConnection(conf)) { BufferedMutator bm = connection.getBufferedMutator(params); // Assert we get default BM if nothing specified. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 19f98f1d8940..41504843e5c7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -127,7 +127,7 @@ public void setUp() throws Exception { /** * Simple cluster registry inserted in place of our usual zookeeper based one. */ - static class SimpleRegistry extends DoNothingAsyncRegistry { + static class SimpleRegistry extends DoNothingConnectionRegistry { final ServerName META_HOST = META_SERVERNAME; public SimpleRegistry(Configuration conf) { @@ -135,7 +135,7 @@ public SimpleRegistry(Configuration conf) { } @Override - public CompletableFuture getMetaRegionLocation() { + public CompletableFuture getMetaRegionLocations() { return CompletableFuture.completedFuture(new RegionLocations( new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java similarity index 83% rename from hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java rename to hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java index 7c210756d57e..f02ec422a922 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java @@ -38,17 +38,17 @@ import org.junit.experimental.categories.Category; @Category({ ClientTests.class, SmallTests.class }) -public class TestAsyncRegistryLeak { +public class TestConnectionRegistryLeak { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncRegistryLeak.class); + HBaseClassTestRule.forClass(TestConnectionRegistryLeak.class); - public static final class AsyncRegistryForTest extends DoNothingAsyncRegistry { + public static final class ConnectionRegistryForTest extends DoNothingConnectionRegistry { private boolean closed = false; - public AsyncRegistryForTest(Configuration conf) { + public ConnectionRegistryForTest(Configuration conf) { super(conf); CREATED.add(this); } @@ -64,14 +64,14 @@ public void close() { } } - private static final List CREATED = new ArrayList<>(); + private static final List CREATED = new ArrayList<>(); private static Configuration CONF = HBaseConfiguration.create(); @BeforeClass public static void setUp() { - CONF.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class, - AsyncRegistry.class); + CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ConnectionRegistryForTest.class, ConnectionRegistry.class); } @Test diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 45e806848fd4..7ff284717c0b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -1196,3 +1196,47 @@ service HbckService { rpc FixMeta(FixMetaRequest) returns(FixMetaResponse); } + +/** Request and response to get the clusterID for this cluster */ +message GetClusterIdRequest { +} +message GetClusterIdResponse { + /** Not set if cluster ID could not be determined. */ + optional string cluster_id = 1; +} + +/** Request and response to get the currently active master name for this cluster */ +message GetActiveMasterRequest { +} +message GetActiveMasterResponse { + /** Not set if an active master could not be determined. */ + optional ServerName server_name = 1; +} + +/** Request and response to get the current list of meta region locations */ +message GetMetaRegionLocationsRequest { +} +message GetMetaRegionLocationsResponse { + /** Not set if meta region locations could not be determined. */ + repeated RegionLocation meta_locations = 1; +} + +/** + * Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment. + */ +service ClientMetaService { + /** + * Get Cluster ID for this cluster. + */ + rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); + + /** + * Get active master server name for this cluster. + */ + rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse); + + /** + * Get current meta replicas' region locations. + */ + rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 50798ed60a0a..99cab625b85c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,25 +17,24 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master; - import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * Handles everything on master-side related to master election. @@ -57,12 +56,18 @@ public class ActiveMasterManager extends ZKListener { final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); final AtomicBoolean clusterShutDown = new AtomicBoolean(false); + // This server's information. private final ServerName sn; private int infoPort; private final Server master; + // Active master's server name. Invalidated anytime active master changes (based on ZK + // notifications) and lazily fetched on-demand. + // ServerName is immutable, so we don't need heavy synchronization around it. + private volatile ServerName activeMasterServerName; + /** - * @param watcher + * @param watcher ZK watcher * @param sn ServerName * @param master In an instance of a Master. */ @@ -106,6 +111,30 @@ void handle(final String path) { } } + /** + * Fetches the active master's ServerName from zookeeper. + */ + private void fetchAndSetActiveMasterServerName() { + LOG.debug("Attempting to fetch active master sn from zk"); + try { + activeMasterServerName = MasterAddressTracker.getMasterAddress(watcher); + } catch (IOException | KeeperException e) { + // Log and ignore for now and re-fetch later if needed. + LOG.error("Error fetching active master information", e); + } + } + + public Optional getActiveMasterServerName() { + if (!clusterHasActiveMaster.get()) { + return Optional.empty(); + } + if (activeMasterServerName == null) { + fetchAndSetActiveMasterServerName(); + } + // It could still be null, but return whatever we have. + return Optional.ofNullable(activeMasterServerName); + } + /** * Handle a change in the master node. Doesn't matter whether this was called * from a nodeCreated or nodeDeleted event because there are no guarantees @@ -134,6 +163,9 @@ private void handleMasterNodeChange() { // Notify any thread waiting to become the active master clusterHasActiveMaster.notifyAll(); } + // Reset the active master sn. Will be re-fetched later if needed. + // We don't want to make a synchronous RPC under a monitor. + activeMasterServerName = null; } } catch (KeeperException ke) { master.abort("Received an unexpected KeeperException, aborting", ke); @@ -151,8 +183,8 @@ private void handleMasterNodeChange() { * @param checkInterval the interval to check if the master is stopped * @param startupStatus the monitor status to track the progress * @return True if no issue becoming active master else false if another - * master was running or if some other problem (zookeeper, stop flag has been - * set on this Master) + * master was running or if some other problem (zookeeper, stop flag has been + * set on this Master) */ boolean blockUntilBecomingActiveMaster( int checkInterval, MonitoredTask startupStatus) { @@ -178,10 +210,14 @@ boolean blockUntilBecomingActiveMaster( // We are the master, return startupStatus.setStatus("Successfully registered as active master."); this.clusterHasActiveMaster.set(true); + activeMasterServerName = sn; LOG.info("Registered as active master=" + this.sn); return true; } + // Invalidate the active master name so that subsequent requests do not get any stale + // master information. Will be re-fetched if needed. + activeMasterServerName = null; // There is another active master running elsewhere or this is a restart // and the master ephemeral node has not expired yet. this.clusterHasActiveMaster.set(true); @@ -208,7 +244,8 @@ boolean blockUntilBecomingActiveMaster( ZKUtil.deleteNode(this.watcher, this.watcher.getZNodePaths().masterAddressZNode); // We may have failed to delete the znode at the previous step, but - // we delete the file anyway: a second attempt to delete the znode is likely to fail again. + // we delete the file anyway: a second attempt to delete the znode is likely to fail + // again. ZNodeClearer.deleteMyEphemeralNodeOnDisk(); } else { msg = "Another master is the active master, " + currentMaster + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java new file mode 100644 index 000000000000..9ca739987618 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client + * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not + * already exist. Standby masters just read the information from the file system. This class is + * thread-safe. + * + * TODO: Make it a singleton without affecting concurrent junit tests. + */ +@InterfaceAudience.Private +public class CachedClusterId { + + public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); + private static final int MAX_FETCH_TIMEOUT_MS = 10000; + + private Path rootDir; + private FileSystem fs; + + // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to + // avoid multiple fetches from FS and let only one thread fetch the information. + AtomicBoolean fetchInProgress = new AtomicBoolean(false); + + // When true, it means that the cluster ID has been fetched successfully from fs. + private AtomicBoolean isClusterIdSet = new AtomicBoolean(false); + // Immutable once set and read multiple times. + private ClusterId clusterId; + + // cache stats for testing. + private AtomicInteger cacheMisses = new AtomicInteger(0); + + public CachedClusterId(Configuration conf) throws IOException { + rootDir = FSUtils.getRootDir(conf); + fs = rootDir.getFileSystem(conf); + } + + /** + * Succeeds only once, when setting to a non-null value. Overwrites are not allowed. + */ + private void setClusterId(ClusterId id) { + if (id == null || isClusterIdSet.get()) { + return; + } + clusterId = id; + isClusterIdSet.set(true); + } + + /** + * Returns a cached copy of the cluster ID. null if the cache is not populated. + */ + private String getClusterId() { + if (!isClusterIdSet.get()) { + return null; + } + // It is ok to read without a lock since clusterId is immutable once set. + return clusterId.toString(); + } + + /** + * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress, + * synchronously fetches the cluster ID and sets it. If an attempt is already in progress, + * returns right away and the caller is expected to wait for the fetch to finish. + * @return true if the attempt is done, false if another thread is already fetching it. + */ + private boolean attemptFetch() { + if (fetchInProgress.compareAndSet(false, true)) { + // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify + // the waiting threads. + try { + cacheMisses.incrementAndGet(); + setClusterId(FSUtils.getClusterId(fs, rootDir)); + } catch (IOException e) { + LOG.warn("Error fetching cluster ID", e); + } finally { + Preconditions.checkState(fetchInProgress.compareAndSet(true, false)); + synchronized (fetchInProgress) { + fetchInProgress.notifyAll(); + } + } + return true; + } + return false; + } + + private void waitForFetchToFinish() throws InterruptedException { + synchronized (fetchInProgress) { + while (fetchInProgress.get()) { + // We don't want the fetches to block forever, for example if there are bugs + // of missing notifications. + fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS); + } + } + } + + /** + * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached + * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are + * trying get from a clean cache. + * + * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does + * not exist on the file system. + */ + public String getFromCacheOrFetch() { + String id = getClusterId(); + if (id != null) { + return id; + } + if (!attemptFetch()) { + // A fetch is in progress. + try { + waitForFetchToFinish(); + } catch (InterruptedException e) { + // pass and return whatever is in the cache. + } + } + return getClusterId(); + } + + @VisibleForTesting + public int getCacheStats() { + return cacheMisses.get(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0b78bc4d2e0a..5e28ac3c9848 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -350,6 +350,12 @@ public void run() { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; + /** + * Cache for the meta region replica's locations. Also tracks their changes to avoid stale + * cache entries. + */ + private final MetaRegionLocationCache metaRegionLocationCache; + // manager of replication private ReplicationPeerManager replicationPeerManager; @@ -444,6 +450,9 @@ public void run() { private final boolean maintenanceMode; static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode"; + // Cached clusterId on stand by masters to serve clusterID requests from clients. + private final CachedClusterId cachedClusterId; + public static class RedirectServlet extends HttpServlet { private static final long serialVersionUID = 2894774810058302473L; private final int regionServerInfoPort; @@ -498,8 +507,7 @@ public void doGet(HttpServletRequest request, * #finishActiveMasterInitialization(MonitoredTask) after * the master becomes the active one. */ - public HMaster(final Configuration conf) - throws IOException, KeeperException { + public HMaster(final Configuration conf) throws IOException { super(conf); TraceUtil.initTracer(conf); try { @@ -512,7 +520,6 @@ public HMaster(final Configuration conf) } else { maintenanceMode = false; } - this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(), @@ -560,10 +567,13 @@ public HMaster(final Configuration conf) // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { + this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); } else { + this.metaRegionLocationCache = null; this.activeMasterManager = null; } + cachedClusterId = new CachedClusterId(conf); } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. @@ -3765,7 +3775,10 @@ public HbckChore getHbckChore() { return this.hbckChore; } - @Override + public Optional getActiveMaster() { + return activeMasterManager.getActiveMasterServerName(); + } + public void runReplicationBarrierCleaner() { ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner; if (rbc != null) { @@ -3776,4 +3789,15 @@ public void runReplicationBarrierCleaner() { public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() { return this.snapshotQuotaChore; } + + public String getClusterId() { + if (activeMaster) { + return super.getClusterId(); + } + return cachedClusterId.getFromCacheOrFetch(); + } + + public MetaRegionLocationCache getMetaRegionLocationCache() { + return this.metaRegionLocationCache; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index a0b3e23ec054..b1f0c07d9238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; - import java.io.FileNotFoundException; import java.io.IOException; import java.net.BindException; @@ -30,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; @@ -116,11 +117,9 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; @@ -161,6 +160,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; @@ -185,12 +185,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; @@ -349,9 +355,10 @@ */ @InterfaceAudience.Private @SuppressWarnings("deprecation") -public class MasterRpcServices extends RSRpcServices - implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface, - LockService.BlockingInterface, HbckService.BlockingInterface { +public class MasterRpcServices extends RSRpcServices implements + MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface, + LockService.BlockingInterface, HbckService.BlockingInterface, + ClientMetaService.BlockingInterface { private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName()); private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."+MasterRpcServices.class.getName()); @@ -360,7 +367,7 @@ public class MasterRpcServices extends RSRpcServices /** * @return Subset of configuration to pass initializing regionservers: e.g. - * the filesystem to use and root directory to use. + * the filesystem to use and root directory to use. */ private RegionServerStartupResponse.Builder createConfigurationSubset() { RegionServerStartupResponse.Builder resp = addConfig( @@ -486,15 +493,17 @@ boolean synchronousBalanceSwitch(final boolean b) throws IOException { protected List getServices() { List bssi = new ArrayList<>(5); bssi.add(new BlockingServiceAndInterface( - MasterService.newReflectiveBlockingService(this), - MasterService.BlockingInterface.class)); + MasterService.newReflectiveBlockingService(this), + MasterService.BlockingInterface.class)); bssi.add(new BlockingServiceAndInterface( - RegionServerStatusService.newReflectiveBlockingService(this), - RegionServerStatusService.BlockingInterface.class)); + RegionServerStatusService.newReflectiveBlockingService(this), + RegionServerStatusService.BlockingInterface.class)); bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this), LockService.BlockingInterface.class)); bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this), HbckService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), + ClientMetaService.BlockingInterface.class)); bssi.addAll(super.getServices()); return bssi; } @@ -621,7 +630,9 @@ public AssignRegionResponse assignRegion(RpcController controller, final byte[] regionName = req.getRegion().getValue().toByteArray(); final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName); - if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + if (regionInfo == null) { + throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + } final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); if (master.cpHost != null) { @@ -666,7 +677,7 @@ public CreateNamespaceResponse createNamespace(RpcController controller, @Override public CreateTableResponse createTable(RpcController controller, CreateTableRequest req) - throws ServiceException { + throws ServiceException { TableDescriptor tableDescriptor = ProtobufUtil.toTableDescriptor(req.getTableSchema()); byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req); try { @@ -1063,7 +1074,7 @@ public GetSchemaAlterStatusResponse getSchemaAlterStatus( * Get list of TableDescriptors for requested tables. * @param c Unused (set to null). * @param req GetTableDescriptorsRequest that contains: - * - tableNames: requested tables, or if empty, all are requested + * - tableNames: requested tables, or if empty, all are requested. * @return GetTableDescriptorsResponse * @throws ServiceException */ @@ -1207,9 +1218,9 @@ public IsProcedureDoneResponse isProcedureDone(RpcController controller, /** * Checks if the specified snapshot is done. * @return true if the snapshot is in file system ready to use, - * false if the snapshot is in the process of completing + * false if the snapshot is in the process of completing * @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or - * a wrapped HBaseSnapshotException with progress failure reason. + * a wrapped HBaseSnapshotException with progress failure reason. */ @Override public IsSnapshotDoneResponse isSnapshotDone(RpcController controller, @@ -1451,7 +1462,9 @@ public OfflineRegionResponse offlineRegion(RpcController controller, final byte[] regionName = request.getRegion().getValue().toByteArray(); final RegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName); - if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + if (hri == null) { + throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + } if (master.cpHost != null) { master.cpHost.preRegionOffline(hri); @@ -2298,8 +2311,8 @@ public RegionSpaceUseReportResponse reportRegionSpaceUse(RpcController controlle report.getRegionSize(), now); } } else { - LOG.debug( - "Received region space usage report but HMaster is not ready to process it, skipping"); + LOG.debug("Received region space usage report but HMaster is not ready to process it, " + + "skipping"); } return RegionSpaceUseReportResponse.newBuilder().build(); } catch (Exception e) { @@ -2335,8 +2348,8 @@ public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes( } return builder.build(); } else { - LOG.debug( - "Received space quota region size report but HMaster is not ready to process it, skipping"); + LOG.debug("Received space quota region size report but HMaster is not ready to process it," + + "skipping"); } return builder.build(); } catch (Exception e) { @@ -2880,4 +2893,34 @@ private boolean shouldSubmitSCP(ServerName serverName) { return true; } + @Override + public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request) + throws ServiceException { + GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder(); + String clusterId = master.getClusterId(); + if (clusterId != null) { + resp.setClusterId(clusterId); + } + return resp.build(); + } + + @Override + public GetActiveMasterResponse getActiveMaster(RpcController rpcController, + GetActiveMasterRequest request) throws ServiceException { + GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder(); + Optional serverName = master.getActiveMaster(); + serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name))); + return resp.build(); + } + + @Override + public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController, + GetMetaRegionLocationsRequest request) throws ServiceException { + GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder(); + Optional> metaLocations = + master.getMetaRegionLocationCache().getMetaRegionLocations(); + metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach( + location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); + return response.build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java new file mode 100644 index 000000000000..f4e91b56051d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ThreadFactory; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +/** + * A cache of meta region location metadata. Registers a listener on ZK to track changes to the + * meta table znodes. Clients are expected to retry if the meta information is stale. This class + * is thread-safe (a single instance of this class can be shared by multiple threads without race + * conditions). + */ +@InterfaceAudience.Private +public class MetaRegionLocationCache extends ZKListener { + + private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); + + /** + * Maximum number of times we retry when ZK operation times out. + */ + private static final int MAX_ZK_META_FETCH_RETRIES = 10; + /** + * Sleep interval ms between ZK operation retries. + */ + private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000; + private static final int SLEEP_INTERVAL_MS_MAX = 10000; + private final RetryCounterFactory retryCounterFactory = + new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES); + + /** + * Cached meta region locations indexed by replica ID. + * CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during + * client requests. Even though CopyOnWriteArrayMap copies the data structure for every write, + * that should be OK since the size of the list is often small and mutations are not too often + * and we do not need to block client requests while mutations are in progress. + */ + private final CopyOnWriteArrayMap cachedMetaLocations; + + private enum ZNodeOpType { + INIT, + CREATED, + CHANGED, + DELETED + } + + public MetaRegionLocationCache(ZKWatcher zkWatcher) { + super(zkWatcher); + cachedMetaLocations = new CopyOnWriteArrayMap<>(); + watcher.registerListener(this); + // Populate the initial snapshot of data from meta znodes. + // This is needed because stand-by masters can potentially start after the initial znode + // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers + // are established. Subsequent updates are handled by the registered listener. Also, this runs + // in a separate thread in the background to not block master init. + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); + RetryCounterFactory retryFactory = new RetryCounterFactory( + Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX); + threadFactory.newThread( + ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start(); + } + + /** + * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers + * a watcher on base znode to check for any CREATE/DELETE events on the children. + * @param retryCounter controls the number of retries and sleep between retries. + */ + private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) { + List znodes = null; + while (retryCounter.shouldRetry()) { + try { + znodes = watcher.getMetaReplicaNodesAndWatchChildren(); + break; + } catch (KeeperException ke) { + LOG.debug("Error populating initial meta locations", ke); + if (!retryCounter.shouldRetry()) { + // Retries exhausted and watchers not set. This is not a desirable state since the cache + // could remain stale forever. Propagate the exception. + watcher.abort("Error populating meta locations", ke); + return; + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + LOG.error("Interrupted while loading meta locations from ZK", ie); + Thread.currentThread().interrupt(); + return; + } + } + } + if (znodes == null || znodes.isEmpty()) { + // No meta znodes exist at this point but we registered a watcher on the base znode to listen + // for updates. They will be handled via nodeChildrenChanged(). + return; + } + if (znodes.size() == cachedMetaLocations.size()) { + // No new meta znodes got added. + return; + } + for (String znode: znodes) { + String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); + updateMetaLocation(path, opType); + } + } + + /** + * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for + * future updates. + * @param replicaId ReplicaID of the region. + * @return HRegionLocation for the meta replica. + * @throws KeeperException if there is any issue fetching/parsing the serialized data. + */ + private HRegionLocation getMetaRegionLocation(int replicaId) + throws KeeperException { + RegionState metaRegionState; + try { + byte[] data = ZKUtil.getDataAndWatch(watcher, + watcher.getZNodePaths().getZNodeForReplica(replicaId)); + metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName()); + } + + private void updateMetaLocation(String path, ZNodeOpType opType) { + if (!isValidMetaZNode(path)) { + return; + } + LOG.debug("Updating meta znode for path {}: {}", path, opType.name()); + int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); + RetryCounter retryCounter = retryCounterFactory.create(); + HRegionLocation location = null; + while (retryCounter.shouldRetry()) { + try { + if (opType == ZNodeOpType.DELETED) { + if (!ZKUtil.watchAndCheckExists(watcher, path)) { + // The path does not exist, we've set the watcher and we can break for now. + break; + } + // If it is a transient error and the node appears right away, we fetch the + // latest meta state. + } + location = getMetaRegionLocation(replicaId); + break; + } catch (KeeperException e) { + LOG.debug("Error getting meta location for path {}", path, e); + if (!retryCounter.shouldRetry()) { + LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e); + break; + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + if (location == null) { + cachedMetaLocations.remove(replicaId); + return; + } + cachedMetaLocations.put(replicaId, location); + } + + /** + * @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty. + * + */ + public Optional> getMetaRegionLocations() { + ConcurrentNavigableMap snapshot = + cachedMetaLocations.tailMap(cachedMetaLocations.firstKey()); + if (snapshot.isEmpty()) { + // This could be possible if the master has not successfully initialized yet or meta region + // is stuck in some weird state. + return Optional.empty(); + } + List result = new ArrayList<>(); + // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying + // ArrayValueCollection does not implement toArray(). + snapshot.values().forEach(location -> result.add(location)); + return Optional.of(result); + } + + /** + * Helper to check if the given 'path' corresponds to a meta znode. This listener is only + * interested in changes to meta znodes. + */ + private boolean isValidMetaZNode(String path) { + return watcher.getZNodePaths().isAnyMetaReplicaZNode(path); + } + + @Override + public void nodeCreated(String path) { + updateMetaLocation(path, ZNodeOpType.CREATED); + } + + @Override + public void nodeDeleted(String path) { + updateMetaLocation(path, ZNodeOpType.DELETED); + } + + @Override + public void nodeDataChanged(String path) { + updateMetaLocation(path, ZNodeOpType.CHANGED); + } + + @Override + public void nodeChildrenChanged(String path) { + if (!path.equals(watcher.getZNodePaths().baseZNode)) { + return; + } + loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java new file mode 100644 index 000000000000..932cb3bb7e15 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.master.CachedClusterId; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestCachedClusterId { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCachedClusterId.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static String clusterId; + private static HMaster activeMaster; + private static HMaster standByMaster; + + private static class GetClusterIdThread extends TestThread { + CachedClusterId cachedClusterId; + public GetClusterIdThread(TestContext ctx, CachedClusterId clusterId) { + super(ctx); + cachedClusterId = clusterId; + } + + @Override + public void doWork() throws Exception { + assertEquals(clusterId, cachedClusterId.getFromCacheOrFetch()); + } + } + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + clusterId = activeMaster.getClusterId(); + standByMaster = TEST_UTIL.getHBaseCluster().startMaster().getMaster(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testClusterIdMatch() { + assertEquals(clusterId, standByMaster.getClusterId()); + } + + @Test + public void testMultiThreadedGetClusterId() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + CachedClusterId cachedClusterId = new CachedClusterId(conf); + TestContext context = new TestContext(conf); + int numThreads = 100; + for (int i = 0; i < numThreads; i++) { + context.addThread(new GetClusterIdThread(context, cachedClusterId)); + } + context.startThreads(); + context.stop(); + int cacheMisses = cachedClusterId.getCacheStats(); + assertEquals(cacheMisses, 1); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 3d7228ce00d0..473e23309cb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -90,6 +89,7 @@ public class TestZooKeeperTableArchiveClient { private static RegionServerServices rss; private static DirScanPool POOL; + /** * Setup the config for the cluster */ @@ -132,9 +132,13 @@ public void tearDown() throws Exception { @AfterClass public static void cleanupTest() throws Exception { - CONNECTION.close(); + if (CONNECTION != null) { + CONNECTION.close(); + } UTIL.shutdownMiniZKCluster(); - POOL.shutdownNow(); + if (POOL != null) { + POOL.shutdownNow(); + } } /** @@ -338,6 +342,7 @@ private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Pa * @throws IOException on failure * @throws KeeperException on failure */ + @SuppressWarnings("checkstyle:EmptyBlock") private List turnOnArchiving(String tableName, HFileCleaner cleaner) throws IOException, KeeperException { // turn on hfile retention diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java index 4668d150d5f4..f96daf66e128 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java @@ -58,7 +58,8 @@ protected static void startClusterAndCreateTable() throws Exception { } UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.waitTableAvailable(TABLE_NAME); - try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) { + try (ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(), registry, REGION_REPLICATION); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java index ba3e74c278d3..73924a38393f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -44,7 +44,7 @@ private RegionReplicaTestHelper() { // waits for all replicas to have region location static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf, - AsyncRegistry registry, int regionReplication) throws IOException { + ConnectionRegistry registry, int regionReplication) throws IOException { Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, new ExplainingPredicate() { @Override @@ -55,7 +55,7 @@ public String explainFailure() throws IOException { @Override public boolean evaluate() throws IOException { try { - RegionLocations locs = registry.getMetaRegionLocation().get(); + RegionLocations locs = registry.getMetaRegionLocations().get(); if (locs.size() < regionReplication) { return false; } @@ -66,7 +66,7 @@ public boolean evaluate() throws IOException { } return true; } catch (Exception e) { - TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e); + TestZKConnectionRegistry.LOG.warn("Failed to get meta region locations", e); return false; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java index 1401afeae8f4..6d7d36839824 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java @@ -53,7 +53,8 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TestAsyncAdminBase.setUpBeforeClass(); - try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { + try (ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 1f0d40b5f4f7..609a12926206 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -45,7 +45,7 @@ public class TestAsyncMetaRegionLocator { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static AsyncRegistry REGISTRY; + private static ConnectionRegistry REGISTRY; private static AsyncMetaRegionLocator LOCATOR; @@ -54,7 +54,7 @@ public static void setUp() throws Exception { TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.startMiniCluster(3); - REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); TEST_UTIL.getAdmin().balancerSwitch(false, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 6bc024ac1212..70f867a5bd68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -79,7 +79,8 @@ public class TestAsyncNonMetaRegionLocator { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().balancerSwitch(false, true); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index 8cdb4a9363f7..52242f176297 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -123,7 +123,8 @@ public static void setUp() throws Exception { conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().balancerSwitch(false, true); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index 0a94def619f5..677dc93aec91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -98,7 +98,8 @@ public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), User.getCurrent()); LOCATOR = CONN.getLocator(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 8959c1d4f3bc..4bf6bb05c1eb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -71,7 +71,8 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java index e74d3fa291f5..ed6c66f8e5dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java @@ -92,7 +92,7 @@ public static void setUp() throws Exception { conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FailPrimaryMetaScanCp.class.getName()); UTIL.startMiniCluster(3); - try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf)) { + try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) { RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3); } try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java new file mode 100644 index 000000000000..1205b05a996c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MetaRegionLocationCache; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class, MasterTests.class }) +public class TestMetaRegionLocationCache { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static ConnectionRegistry REGISTRY; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); + TEST_UTIL.startMiniCluster(3); + REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation( + TEST_UTIL.getConfiguration(), REGISTRY, 3); + TEST_UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void cleanUp() throws Exception { + IOUtils.closeQuietly(REGISTRY); + TEST_UTIL.shutdownMiniCluster(); + } + + private List getCurrentMetaLocations(ZKWatcher zk) throws Exception { + List result = new ArrayList<>(); + for (String znode: zk.getMetaReplicaNodes()) { + String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode); + int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path); + RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId); + result.add(new HRegionLocation(state.getRegion(), state.getServerName())); + } + return result; + } + + // Verifies that the cached meta locations in the given master are in sync with what is in ZK. + private void verifyCachedMetaLocations(HMaster master) throws Exception { + // Wait until initial meta locations are loaded. + int retries = 0; + while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) { + Thread.sleep(1000); + if (++retries == 10) { + break; + } + } + List metaHRLs = + master.getMetaRegionLocationCache().getMetaRegionLocations().get(); + assertFalse(metaHRLs.isEmpty()); + ZKWatcher zk = master.getZooKeeper(); + List metaZnodes = zk.getMetaReplicaNodes(); + assertEquals(metaZnodes.size(), metaHRLs.size()); + List actualHRLs = getCurrentMetaLocations(zk); + Collections.sort(metaHRLs); + Collections.sort(actualHRLs); + assertEquals(actualHRLs, metaHRLs); + } + + @Test public void testInitialMetaLocations() throws Exception { + verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + + @Test public void testStandByMetaLocations() throws Exception { + HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster(); + verifyCachedMetaLocations(standBy); + } + + /* + * Shuffles the meta region replicas around the cluster and makes sure the cache is not stale. + */ + @Test public void testMetaLocationsChange() throws Exception { + List currentMetaLocs = + getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper()); + // Move these replicas to random servers. + for (HRegionLocation location: currentMetaLocs) { + RegionReplicaTestHelper.moveRegion(TEST_UTIL, location); + } + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation( + TEST_UTIL.getConfiguration(), REGISTRY, 3); + for (JVMClusterUtil.MasterThread masterThread: + TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { + verifyCachedMetaLocations(masterThread.getMaster()); + } + } + + /** + * Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base + * znode for notifications. + */ + @Test public void testMetaRegionLocationCache() throws Exception { + final String parentZnodeName = "/randomznodename"; + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName); + ServerName sn = ServerName.valueOf("localhost", 1234, 5678); + try (ZKWatcher zkWatcher = new ZKWatcher(conf, null, null, true)) { + // A thread that repeatedly creates and drops an unrelated child znode. This is to simulate + // some ZK activity in the background. + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); + ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override public void doAnAction() throws Exception { + final String testZnode = parentZnodeName + "/child"; + ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes()); + ZKUtil.deleteNode(zkWatcher, testZnode); + } + }); + ctx.startThreads(); + try { + MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher); + // meta znodes do not exist at this point, cache should be empty. + assertFalse(metaCache.getMetaRegionLocations().isPresent()); + // Set the meta locations for a random meta replicas, simulating an active hmaster meta + // assignment. + for (int i = 0; i < 3; i++) { + // Updates the meta znodes. + MetaTableLocator.setMetaLocation(zkWatcher, sn, i, RegionState.State.OPEN); + } + // Wait until the meta cache is populated. + int iters = 0; + while (iters++ < 10) { + if (metaCache.getMetaRegionLocations().isPresent() + && metaCache.getMetaRegionLocations().get().size() == 3) { + break; + } + Thread.sleep(1000); + } + List metaLocations = metaCache.getMetaRegionLocations().get(); + assertEquals(3, metaLocations.size()); + for (HRegionLocation location : metaLocations) { + assertEquals(sn, location.getServerName()); + } + } finally { + // clean up. + ctx.stop(); + ZKUtil.deleteChildrenRecursively(zkWatcher, parentZnodeName); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java similarity index 89% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java index 3e4ca94eca98..cb45ee5bd291 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java @@ -48,16 +48,16 @@ import org.slf4j.LoggerFactory; @Category({ MediumTests.class, ClientTests.class }) -public class TestZKAsyncRegistry { +public class TestZKConnectionRegistry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); + HBaseClassTestRule.forClass(TestZKConnectionRegistry.class); - static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); + static final Logger LOG = LoggerFactory.getLogger(TestZKConnectionRegistry.class); static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static ZKAsyncRegistry REGISTRY; + private static ZKConnectionRegistry REGISTRY; @BeforeClass public static void setUp() throws Exception { @@ -67,7 +67,7 @@ public static void setUp() throws Exception { // make sure that we do not depend on this config when getting locations for meta replicas, see // HBASE-21658. conf.setInt(META_REPLICAS_NUM, 1); - REGISTRY = new ZKAsyncRegistry(conf); + REGISTRY = new ZKConnectionRegistry(conf); } @AfterClass @@ -84,10 +84,10 @@ public void test() throws InterruptedException, ExecutionException, IOException assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, clusterId); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), - REGISTRY.getMasterAddress().get()); + REGISTRY.getActiveMaster().get()); RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); - RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); + RegionLocations locs = REGISTRY.getMetaRegionLocations().get(); assertEquals(3, locs.getRegionLocations().length); IntStream.range(0, 3).forEach(i -> { HRegionLocation loc = locs.getRegionLocation(i); @@ -102,7 +102,7 @@ public void testIndependentZKConnections() throws IOException { try (ReadOnlyZKClient zk1 = REGISTRY.getZKClient()) { Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); - try (ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) { + try (ZKConnectionRegistry otherRegistry = new ZKConnectionRegistry(otherConf)) { ReadOnlyZKClient zk2 = otherRegistry.getZKClient(); assertNotSame("Using a different configuration / quorum should result in different " + "backing zk connection.", zk1, zk2); @@ -119,9 +119,9 @@ public void testIndependentZKConnections() throws IOException { public void testNoMetaAvailable() throws InterruptedException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set("zookeeper.znode.metaserver", "whatever"); - try (ZKAsyncRegistry registry = new ZKAsyncRegistry(conf)) { + try (ZKConnectionRegistry registry = new ZKConnectionRegistry(conf)) { try { - registry.getMetaRegionLocation().get(); + registry.getMetaRegionLocations().get(); fail("Should have failed since we set an incorrect meta znode prefix"); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(IOException.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 55d00f0bead8..ad766421a361 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -91,6 +92,7 @@ public static void tearDownAfterClass() throws Exception { ActiveMasterManager activeMasterManager = dummyMaster.getActiveMasterManager(); assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); // First test becoming the active master uninterrupted MonitoredTask status = Mockito.mock(MonitoredTask.class); @@ -99,6 +101,7 @@ public static void tearDownAfterClass() throws Exception { activeMasterManager.blockUntilBecomingActiveMaster(100, status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); // Now pretend master restart DummyMaster secondDummyMaster = new DummyMaster(zk,master); @@ -108,6 +111,8 @@ public static void tearDownAfterClass() throws Exception { activeMasterManager.blockUntilBecomingActiveMaster(100, status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); + assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get()); } /** @@ -135,6 +140,7 @@ public void testActiveMasterManagerFromZK() throws Exception { ActiveMasterManager activeMasterManager = ms1.getActiveMasterManager(); assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); // First test becoming the active master uninterrupted ClusterStatusTracker clusterStatusTracker = @@ -144,6 +150,7 @@ public void testActiveMasterManagerFromZK() throws Exception { Mockito.mock(MonitoredTask.class)); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, firstMasterAddress); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); // New manager will now try to become the active master in another thread WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress); @@ -161,6 +168,8 @@ public void testActiveMasterManagerFromZK() throws Exception { assertTrue(t.manager.clusterHasActiveMaster.get()); // But secondary one should not be the active master assertFalse(t.isActiveMaster); + // Verify the active master ServerName is populated in standby master. + assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get()); // Close the first server and delete it's master node ms1.stop("stopping first server"); @@ -189,6 +198,7 @@ public void testActiveMasterManagerFromZK() throws Exception { assertTrue(t.manager.clusterHasActiveMaster.get()); assertTrue(t.isActiveMaster); + assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get()); LOG.info("Deleting master node"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java new file mode 100644 index 000000000000..428aee2a142c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import static org.junit.Assert.assertEquals; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; + +@Category({MediumTests.class, MasterTests.class}) +public class TestClientMetaServiceRPCs { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientMetaServiceRPCs.class); + + // Total number of masters (active + stand by) for the purpose of this test. + private static final int MASTER_COUNT = 3; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static int rpcTimeout; + private static RpcClient rpcClient; + + @BeforeClass + public static void setUp() throws Exception { + // Start the mini cluster with stand-by masters. + StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); + builder.numMasters(MASTER_COUNT).numRegionServers(3); + TEST_UTIL.startMiniCluster(builder.build()); + conf = TEST_UTIL.getConfiguration(); + rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos( + conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + rpcClient = RpcClientFactory.createClient(conf, + TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId()); + } + + @AfterClass + public static void tearDown() throws Exception { + if (rpcClient != null) { + rpcClient.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + private static ClientMetaService.BlockingInterface getMasterStub(ServerName server) + throws IOException { + return ClientMetaService.newBlockingStub( + rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout)); + } + + private static HBaseRpcController getRpcController() { + return RpcControllerFactory.instantiate(conf).newController(); + } + + /** + * Verifies the cluster ID from all running masters. + */ + @Test public void TestClusterID() throws Exception { + HBaseRpcController rpcController = getRpcController(); + String clusterID = TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId(); + int rpcCount = 0; + for (JVMClusterUtil.MasterThread masterThread: + TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { + ClientMetaService.BlockingInterface stub = + getMasterStub(masterThread.getMaster().getServerName()); + GetClusterIdResponse resp = + stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance()); + assertEquals(clusterID, resp.getClusterId()); + rpcCount++; + } + assertEquals(MASTER_COUNT, rpcCount); + } + + /** + * Verifies the active master ServerName as seen by all masters. + */ + @Test public void TestActiveMaster() throws Exception { + HBaseRpcController rpcController = getRpcController(); + ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName(); + int rpcCount = 0; + for (JVMClusterUtil.MasterThread masterThread: + TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { + ClientMetaService.BlockingInterface stub = + getMasterStub(masterThread.getMaster().getServerName()); + GetActiveMasterResponse resp = + stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance()); + assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName())); + rpcCount++; + } + assertEquals(MASTER_COUNT, rpcCount); + } + + /** + * Verifies that the meta region locations RPC returns consistent results across all masters. + */ + @Test public void TestMetaLocations() throws Exception { + HBaseRpcController rpcController = getRpcController(); + List metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster() + .getMetaRegionLocationCache().getMetaRegionLocations().get(); + Collections.sort(metaLocations); + int rpcCount = 0; + for (JVMClusterUtil.MasterThread masterThread: + TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { + ClientMetaService.BlockingInterface stub = + getMasterStub(masterThread.getMaster().getServerName()); + GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations( + rpcController, GetMetaRegionLocationsRequest.getDefaultInstance()); + List result = new ArrayList<>(); + resp.getMetaLocationsList().forEach( + location -> result.add(ProtobufUtil.toRegionLocation(location))); + Collections.sort(result); + assertEquals(metaLocations, result); + rpcCount++; + } + assertEquals(MASTER_COUNT, rpcCount); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java index ba4d53510faa..492222464f99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCloseAnOpeningRegion.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -65,7 +64,7 @@ public class TestCloseAnOpeningRegion { public static final class MockHMaster extends HMaster { - public MockHMaster(Configuration conf) throws IOException, KeeperException { + public MockHMaster(Configuration conf) throws IOException { super(conf); } @@ -141,4 +140,4 @@ public void test() throws IOException, InterruptedException { table.put(new Put(Bytes.toBytes(0)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(0))); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java index 338173e473c9..0a83fa5c7e18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterRestartFailover.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.zookeeper.KeeperException; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -164,7 +163,7 @@ private void setupTable() throws Exception { public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java index 22554d355b9a..d29e061d07fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryConfigManager.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -120,7 +119,7 @@ public void testChoreSchedule() throws Exception { // Make it public so that JVMClusterUtil can access it. public static class TestHMaster extends HMaster { - public TestHMaster(Configuration conf) throws IOException, KeeperException { + public TestHMaster(Configuration conf) throws IOException { super(conf); } } @@ -144,4 +143,4 @@ public boolean isStopped() { } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestShutdownBackupMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestShutdownBackupMaster.java index d3a85209ccac..a42a4046d54f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestShutdownBackupMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestShutdownBackupMaster.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -56,7 +55,7 @@ public class TestShutdownBackupMaster { public static final class MockHMaster extends HMaster { - public MockHMaster(Configuration conf) throws IOException, KeeperException { + public MockHMaster(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java index 9f5b6fc118b2..be9dadaefa69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -74,7 +73,7 @@ void persistToMeta(RegionStateNode regionNode) throws IOException { public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java index 0463721656b4..a25368f4e420 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java @@ -105,7 +105,7 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition( public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java index 0d8202b69f26..44af256b92af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionAssignedToMultipleRegionServers.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -110,7 +109,7 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition( public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java index acad88cda9dd..4dede8988648 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -106,7 +105,7 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition( public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java index 6c9e5eb34ba6..1de806fe0ffb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -117,7 +116,7 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition( public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java index 6c191c9be8a1..71c4693cfa60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionRetry.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -84,7 +83,7 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition( public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java index cbbdbdc8d157..d676af929392 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSCPGetRegionsRace.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -130,7 +129,7 @@ public List getRegionsOnServer(ServerName serverName) { public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java index 47c70a156997..62e31615def4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -198,7 +197,7 @@ public List createDestinationServersList() { public static final class HMasterForTest extends HMaster { - public HMasterForTest(Configuration conf) throws IOException, KeeperException { + public HMasterForTest(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index ff29df838b5d..69e656fbe52c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; @@ -29,13 +28,17 @@ import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; @@ -51,11 +54,12 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer; /** * Class to test ProtobufUtil. */ -@Category({MiscTests.class, SmallTests.class}) +@Category({ MiscTests.class, SmallTests.class}) public class TestProtobufUtil { @ClassRule @@ -348,4 +352,32 @@ public void testToCell() throws Exception { ProtobufUtil.toCell(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY), cell); assertTrue(CellComparatorImpl.COMPARATOR.compare(offheapKV, newOffheapKV) == 0); } + + @Test + public void testMetaRegionState() throws Exception { + ServerName serverName = ServerName.valueOf("localhost", 1234, 5678); + // New region state style. + for (RegionState.State state: RegionState.State.values()) { + RegionState regionState = + new RegionState(RegionInfoBuilder.FIRST_META_REGIONINFO, state, serverName); + MetaRegionServer metars = MetaRegionServer.newBuilder() + .setServer(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(serverName)) + .setRpcVersion(HConstants.RPC_CURRENT_VERSION) + .setState(state.convert()).build(); + // Serialize + byte[] data = ProtobufUtil.prependPBMagic(metars.toByteArray()); + ProtobufUtil.prependPBMagic(data); + // Deserialize + RegionState regionStateNew = + org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.parseMetaRegionStateFrom(data, 1); + assertEquals(regionState.getServerName(), regionStateNew.getServerName()); + assertEquals(regionState.getState(), regionStateNew.getState()); + } + // old style. + RegionState rs = + org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.parseMetaRegionStateFrom( + serverName.getVersionedBytes(), 1); + assertEquals(serverName, rs.getServerName()); + assertEquals(rs.getState(), RegionState.State.OPEN); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java index aaf2d2eb9a04..f61a77ec3937 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java @@ -122,7 +122,7 @@ public void stopCapturing() { * This test HMaster class will always throw ServerNotRunningYetException if checked. */ public static class NeverInitializedMaster extends HMaster { - public NeverInitializedMaster(Configuration conf) throws IOException, KeeperException { + public NeverInitializedMaster(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java index a2ae0b434249..e9fcc6634664 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -134,7 +133,7 @@ public static final class MockHMaster extends HMaster { private ReplicationPeerManager manager; - public MockHMaster(Configuration conf) throws IOException, KeeperException { + public MockHMaster(Configuration conf) throws IOException { super(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 2787f44422cd..39a2789d39a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -73,9 +72,7 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; - import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; /** diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 9105d7e149fb..c7b45fe9b656 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -34,12 +34,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer; /** @@ -266,40 +261,17 @@ public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperExcepti * @throws KeeperException if a ZooKeeper operation fails */ public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId) - throws KeeperException { - RegionState.State state = RegionState.State.OPEN; - ServerName serverName = null; + throws KeeperException { + RegionState regionState = null; try { byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId)); - if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) { - try { - int prefixLen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.MetaRegionServer rl = - ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen, - data.length - prefixLen); - if (rl.hasState()) { - state = RegionState.State.convert(rl.getState()); - } - HBaseProtos.ServerName sn = rl.getServer(); - serverName = ServerName.valueOf( - sn.getHostName(), sn.getPort(), sn.getStartCode()); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException("Unable to parse meta region location"); - } - } else { - // old style of meta region location? - serverName = ProtobufUtil.parseServerNameFrom(data); - } + regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId); } catch (DeserializationException e) { throw ZKUtil.convert(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - if (serverName == null) { - state = RegionState.State.OFFLINE; - } - return new RegionState(RegionReplicaUtil.getRegionInfoForReplica( - RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), state, serverName); + return regionState; } /** diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index e0b34e4f8007..a39c4135c513 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -23,10 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.AuthUtil; @@ -81,10 +79,6 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { // listeners to be notified private final List listeners = new CopyOnWriteArrayList<>(); - // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL - // negotiation to complete - private CountDownLatch saslLatch = new CountDownLatch(1); - private final Configuration conf; /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ @@ -383,13 +377,32 @@ public String prefix(final String str) { */ public List getMetaReplicaNodes() throws KeeperException { List childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); + return filterMetaReplicaNodes(childrenOfBaseNode); + } + + /** + * Same as {@link #getMetaReplicaNodes()} except that this also registers a watcher on base znode + * for subsequent CREATE/DELETE operations on child nodes. + */ + public List getMetaReplicaNodesAndWatchChildren() throws KeeperException { + List childrenOfBaseNode = + ZKUtil.listChildrenAndWatchForNewChildren(this, znodePaths.baseZNode); + return filterMetaReplicaNodes(childrenOfBaseNode); + } + + /** + * @param nodes Input list of znodes + * @return Filtered list of znodes from nodes that belong to meta replica(s). + */ + private List filterMetaReplicaNodes(List nodes) { + if (nodes == null || nodes.isEmpty()) { + return new ArrayList<>(); + } List metaReplicaNodes = new ArrayList<>(2); - if (childrenOfBaseNode != null) { - String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); - for (String child : childrenOfBaseNode) { - if (child.startsWith(pattern)) { - metaReplicaNodes.add(child); - } + String pattern = conf.get(ZNodePaths.META_ZNODE_PREFIX_CONF_KEY, ZNodePaths.META_ZNODE_PREFIX); + for (String child : nodes) { + if (child.startsWith(pattern)) { + metaReplicaNodes.add(child); } } return metaReplicaNodes;