Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;

/**
* Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
* uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
* By default the refresh happens periodically (configured via
* {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
* {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
* should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
*/
@InterfaceAudience.Private
public class MasterAddressRefresher implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final ExecutorService pool;
private final MasterRegistry registry;
private final long periodicRefreshMs;
private final long timeBetweenRefreshesMs;
private final Object refreshMasters = new Object();

@Override
public void close() {
pool.shutdownNow();
}

/**
* Thread that refreshes the master end points until it is interrupted via {@link #close()}.
* Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
*/
private class RefreshThread implements Runnable {
@Override
public void run() {
long lastRpcTs = 0;
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are aborting refresh by breaking out of the loop and basically, we are done refreshing master stubs. Better to log this at ERROR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also happens during a regular pool shutdown. Added another log later in the method after loop exist for observability.

break;
} catch (ExecutionException | IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
}
LOG.info("Master end point refresher loop exited.");
}
}

MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
pool.submit(new RefreshThread());
}

/**
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
* See class comment for details.
*/
void refreshNow() {
synchronized (refreshMasters) {
refreshMasters.notify();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
Expand All @@ -57,10 +59,11 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;

Expand Down Expand Up @@ -89,11 +92,14 @@ public class MasterRegistry implements ConnectionRegistry {
private final int hedgedReadFanOut;

// Configured list of masters to probe the meta information from.
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;

protected final MasterAddressRefresher masterAddressRefresher;

/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
Expand All @@ -115,20 +121,27 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
masterAddressRefresher = new MasterAddressRefresher(conf, this);
}

void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
ImmutableMap.builderWithExpectedSize(masters.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
for (ServerName masterAddr : masters) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
Expand Down Expand Up @@ -169,7 +182,13 @@ private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interfac
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
IOException failureReason = controller.getFailed();
future.completeExceptionally(failureReason);
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
// RPC has failed, trigger a refresh of master end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
masterAddressRefresher.refreshNow();
}
} else {
future.complete(resp);
}
Expand All @@ -188,8 +207,9 @@ private IOException badResponse(String debug) {
* been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
Expand All @@ -210,10 +230,10 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
new MasterRegistryFetchException(masterServers, ex));
} else {
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
isValidResp, debug, errors);
}
}
} else {
Expand All @@ -226,17 +246,20 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,

private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
Set<ServerName> masterServers = masterAddr2StubRef.keySet();
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}

/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
Expand All @@ -247,7 +270,7 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
}

@Override
Expand All @@ -259,17 +282,54 @@ public CompletableFuture<String> getClusterId() {
.thenApply(GetClusterIdResponse::getClusterId);
}

private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
private static boolean hasActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
return activeMasters.size() == 1;
}

private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
if (activeMasters.size() != 1) {
throw new IOException(String.format("Incorrect number of active masters encountered." +
" Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
}
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse)resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
s.getServerName())).collect(Collectors.toList());
}

CompletableFuture<List<ServerName>> getMasters() {
System.out.println("getMasters()");
return this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(this::transformServerName);
.<GetMastersResponse> call((c, s, d) -> s.getMasters(
c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
"getMasters()").thenApply(MasterRegistry::transformServerNames);
}

@VisibleForTesting
Expand All @@ -279,6 +339,9 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";

private static final String CNT_BASE = "rpcCount_";
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
Expand Down Expand Up @@ -303,6 +304,8 @@ private static interface NewMetric<T> {
LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
@VisibleForTesting protected final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
Expand Down Expand Up @@ -434,8 +437,7 @@ private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> f
}

/** Update call stats for non-critical-path methods */
private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
final String methodName = method.getService().getName() + "_" + method.getName();
private void updateRpcGeneric(String methodName, CallStats stats) {
getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
Expand All @@ -450,6 +452,9 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
if (callsPerServer > 0) {
concurrentCallsPerServerHist.update(callsPerServer);
}
// Update the counter that tracks RPCs by type.
final String methodName = method.getService().getName() + "_" + method.getName();
getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
// this implementation is tied directly to protobuf implementation details. would be better
// if we could dispatch based on something static, ie, request Message type.
if (method.getService() == ClientService.getDescriptor()) {
Expand Down Expand Up @@ -511,7 +516,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
}
}
// Fallback to dynamic registry lookup for DDL methods.
updateRpcGeneric(method, stats);
updateRpcGeneric(methodName, stats);
}

public void incrCacheDroppingExceptions(Object exception) {
Expand Down
Loading