Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class AsyncConnectionImpl implements AsyncConnection {

private final User user;

final AsyncRegistry registry;
final ConnectionRegistry registry;

private final int rpcTimeout;

Expand Down Expand Up @@ -118,7 +118,7 @@ class AsyncConnectionImpl implements AsyncConnection {

private final ClusterStatusListener clusterStatusListener;

public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
User user) {
this.conf = conf;
this.user = user;
Expand Down Expand Up @@ -248,7 +248,7 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (addr, error) -> {
addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
Expand Down Expand Up @@ -342,7 +342,7 @@ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName
@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
addListener(registry.getActiveMaster(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
@InterfaceAudience.Private
class AsyncMetaRegionLocator {

private final AsyncRegistry registry;
private final ConnectionRegistry registry;

private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();

private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
new AtomicReference<>();

AsyncMetaRegionLocator(AsyncRegistry registry) {
AsyncMetaRegionLocator(ConnectionRegistry registry) {
this.registry = registry;
}

Expand All @@ -58,7 +58,7 @@ class AsyncMetaRegionLocator {
*/
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location");
registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
}

private HRegionLocation getCacheLocation(HRegionLocation loc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int repl
@Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
if (TableName.isMetaTableName(tableName)) {
return conn.registry.getMetaRegionLocation()
return conn.registry.getMetaRegionLocations()
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) {
registry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
private final AsyncRegistry registry;
private final ConnectionRegistry registry;

private final ClientBackoffPolicy backoffPolicy;

Expand Down Expand Up @@ -303,7 +303,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.conf.get(BufferedMutator.CLASSNAME_KEY);

try {
this.registry = AsyncRegistryFactory.getRegistry(conf);
this.registry = ConnectionRegistryFactory.getRegistry(conf);
retrieveClusterId();

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
Expand Down Expand Up @@ -434,7 +434,7 @@ public Admin getAdmin() throws IOException {

@Override
public Hbck getHbck() throws IOException {
return getHbck(get(registry.getMasterAddress()));
return getHbck(get(registry.getActiveMaster()));
}

@Override
Expand Down Expand Up @@ -811,7 +811,7 @@ private RegionLocations locateMeta(final TableName tableName,
}

// Look up from zookeeper
locations = get(this.registry.getMetaRegionLocation());
locations = get(this.registry.getMetaRegionLocations());
if (locations != null) {
cacheLocation(tableName, locations);
}
Expand Down Expand Up @@ -1162,7 +1162,7 @@ private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
*/
private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
throws IOException, KeeperException {
ServerName sn = get(registry.getMasterAddress());
ServerName sn = get(registry.getActiveMaster());
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
Expand Down Expand Up @@ -1211,7 +1211,7 @@ MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {

@Override
public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
return getAdmin(get(registry.getMasterAddress()));
return getAdmin(get(registry.getActiveMaster()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import org.apache.yetus.audience.InterfaceAudience;

/**
* Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only.
*/
@InterfaceAudience.Private
interface AsyncRegistry extends Closeable {
interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region.
* Get the location of meta region(s).
*/
CompletableFuture<RegionLocations> getMetaRegionLocation();
CompletableFuture<RegionLocations> getMetaRegionLocations();

/**
* Should only be called once.
Expand All @@ -43,9 +44,9 @@ interface AsyncRegistry extends Closeable {
CompletableFuture<String> getClusterId();

/**
* Get the address of HMaster.
* Get the address of active HMaster.
*/
CompletableFuture<ServerName> getMasterAddress();
CompletableFuture<ServerName> getActiveMaster();

/**
* Closes this instance and releases any system resources associated with it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,26 +18,28 @@
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Get instance of configured Registry.
* Factory class to get the instance of configured connection registry.
*/
@InterfaceAudience.Private
final class AsyncRegistryFactory {
final class ConnectionRegistryFactory {

static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.connection.registry.impl";

private AsyncRegistryFactory() {
private ConnectionRegistryFactory() {
}

/**
* @return The cluster registry implementation to use.
* @return The connection registry implementation to use.
*/
static AsyncRegistry getRegistry(Configuration conf) {
Class<? extends AsyncRegistry> clazz =
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][]
private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
Optional<byte[][]> splitKeys) {
if (TableName.isMetaTableName(tableName)) {
return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
.of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
Expand Down Expand Up @@ -882,7 +882,7 @@ public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
@Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
return connection.registry.getMetaRegionLocation()
return connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
.collect(Collectors.toList()));
} else {
Expand Down Expand Up @@ -1098,8 +1098,9 @@ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableN
if (TableName.META_TABLE_NAME.equals(tableName)) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// For meta table, we use zk to fetch all locations.
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
connection.getConfiguration());
addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (metaRegions == null || metaRegions.isEmpty() ||
Expand Down Expand Up @@ -1127,7 +1128,7 @@ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily

switch (compactType) {
case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down Expand Up @@ -2358,7 +2359,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedR
String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
// old format encodedName, should be meta region
future = connection.registry.getMetaRegionLocation()
future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
} else {
Expand All @@ -2369,7 +2370,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedR
RegionInfo regionInfo =
MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
if (regionInfo.isMetaRegion()) {
future = connection.registry.getMetaRegionLocation()
future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
.findFirst());
Expand Down Expand Up @@ -2942,7 +2943,7 @@ public CompletableFuture<CompactionState> getCompactionState(TableName tableName

switch (compactType) {
case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@
* Zookeeper based registry implementation.
*/
@InterfaceAudience.Private
class ZKAsyncRegistry implements AsyncRegistry {
class ZKConnectionRegistry implements ConnectionRegistry {

private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class);
private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);

private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;

ZKAsyncRegistry(Configuration conf) {
ZKConnectionRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
}
Expand Down Expand Up @@ -93,7 +93,7 @@ private static String getClusterId(byte[] data) throws DeserializationException

@Override
public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
}

@VisibleForTesting
Expand Down Expand Up @@ -144,7 +144,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand All @@ -162,7 +162,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
tryComplete(remaining, locs, future);
});
} else {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) {
return;
}
Expand Down Expand Up @@ -191,7 +191,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
Expand All @@ -217,8 +217,8 @@ private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOExcep
}

@Override
public CompletableFuture<ServerName> getMasterAddress() {
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
Expand Down
Loading