Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.DNS.getHostname;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

Expand Down Expand Up @@ -266,18 +268,23 @@ private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocatio

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
return tracedFuture(
() -> this
.<GetMetaRegionLocationsResponse> call(
(c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
.thenApply(MasterRegistry::transformMetaRegionLocations),
"MasterRegistry.getMetaRegionLocations");
}

@Override
public CompletableFuture<String> getClusterId() {
return this
return tracedFuture(() -> this
.<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId);
.thenApply(GetClusterIdResponse::getClusterId), "MasterRegistry.getClusterId");
}

private static boolean hasActiveMaster(GetMastersResponse resp) {
Expand All @@ -300,21 +307,23 @@ private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOE

@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse)resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
return tracedFuture(() -> {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse) resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}, "MasterRegistry.getActiveMaster");
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
Expand All @@ -335,11 +344,13 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
trace(() -> {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
}, "MasterRegistry.close");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

Expand Down Expand Up @@ -94,7 +95,9 @@ private static String getClusterId(byte[] data) throws DeserializationException

@Override
public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
return tracedFuture(
() -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
"ZKConnectionRegistry.getClusterId");
}

ReadOnlyZKClient getZKClient() {
Expand Down Expand Up @@ -192,19 +195,20 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
.thenApply(children -> children.stream()
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
}, "ZKConnectionRegistry.getMetaRegionLocations");
}

private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
Expand All @@ -218,15 +222,17 @@ private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOExcep

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
return tracedFuture(
() -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
}
HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
});
}),
"ZKConnectionRegistry.getActiveMaster");
}

@Override
Expand Down