Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,10 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if(isServerOverloaded){
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,12 @@ private void resubmit(ServerName oldServer, List<Action> toReplay,
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
asyncProcess.connectionConfiguration.getPauseMillis());
}

MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
if(metrics != null && HBaseServerException.isServerOverloaded(throwable)){
metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
}

if (numAttempt > asyncProcess.startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ private void tryScheduleRetry(Throwable error) {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
tries++;

if(HBaseServerException.isServerOverloaded(error)){
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private final Runnable completeWhenNoMoreResultsInRegion;

protected final AsyncConnectionImpl conn;

private final CompletableFuture<Boolean> future;

private final HBaseRpcController controller;
Expand Down Expand Up @@ -311,6 +313,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
Expand Down Expand Up @@ -436,6 +439,11 @@ private void onError(Throwable error) {
return;
}
tries++;

if(HBaseServerException.isServerOverloaded(error)){
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -271,10 +272,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {

this.stats = ServerStatisticTracker.create(conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);


boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Expand All @@ -300,8 +299,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.metrics = null;
}
this.metaCache = new MetaCache(this.metrics);

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

// Do we publish the status?
if (shouldListen) {
Expand Down Expand Up @@ -1018,10 +1019,15 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou
// Only relocate the parent region if necessary
relocateMeta =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);

if(metrics != null && HBaseServerException.isServerOverloaded(e)){
metrics.incrementServerOverloadedBackoffTime(
ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS);
}
} finally {
userRegionLock.unlock();
}
try{
try {
Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " +
Expand Down Expand Up @@ -2154,7 +2160,7 @@ public TableState getTableState(TableName tableName) throws IOException {
@Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory
.instantiate(conf, this.interceptor, this.getStatisticsTracker());
.instantiate(conf, this.interceptor, this.getStatisticsTracker(), metrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ private static interface NewMetric<T> {
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Histogram numActionsPerServerHist;
protected final Timer overloadedBackoffTimer;

// dynamic metrics

Expand Down Expand Up @@ -379,6 +380,8 @@ protected Ratio getRatio() {
"concurrentCallsPerServer", scope));
this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
"numActionsPerServer", scope));
this.overloadedBackoffTimer = registry.timer(name(this.getClass(),
"overloadedBackoffDurationMs", scope));

this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
Expand Down Expand Up @@ -452,6 +455,11 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}


public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit){
overloadedBackoffTimer.update(time, timeUnit);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;

/**
* Factory to create an {@link RpcRetryingCaller}
Expand All @@ -34,22 +36,29 @@ public class RpcRetryingCallerFactory {
private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final MetricsConnection metrics;

/* These below data members are UNUSED!!!*/
private final boolean enableBackPressure;
private ServerStatisticTracker stats;

public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
}

public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
this(conf, interceptor, null);
}

public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, MetricsConnection metrics){
this.conf = conf;
this.connectionConf = new ConnectionConfiguration(conf);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor;
enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
this.metrics = metrics;
}

/**
Expand All @@ -69,7 +78,7 @@ public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(),
connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, rpcTimeout);
interceptor, startLogErrorsCnt, rpcTimeout, metrics);
}

/**
Expand All @@ -83,26 +92,35 @@ public <T> RpcRetryingCaller<T> newCaller() {
connectionConf.getPauseMillisForServerOverloaded(),
connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt,
connectionConf.getRpcTimeout());
connectionConf.getRpcTimeout(), metrics);



}

//// TODO(baugenreich) should i worry about the other instantiate usages?? for example this is used in HTableMultiplexer to create a new asyncProcess
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ServerStatisticTracker stats) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, null);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats){
return instantiate(configuration, interceptor, stats, null);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor);
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(
rpcCallerFactoryClazz, new Class[] { Configuration.class },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand Down Expand Up @@ -68,14 +69,11 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker;

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
int startLogErrorsCnt) {
this(pause, pauseForServerOverloaded, retries,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
}
private final MetricsConnection metrics;

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
MetricsConnection metricsConnection){
this.pause = pause;
this.pauseForServerOverloaded = pauseForServerOverloaded;
this.maxAttempts = retries2Attempts(retries);
Expand All @@ -84,6 +82,12 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr
this.startLogErrorsCnt = startLogErrorsCnt;
this.tracker = new RetryingTimeTracker();
this.rpcTimeout = rpcTimeout;
this.metrics = metricsConnection;
}
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
int startLogErrorsCnt) {
this(pause, pauseForServerOverloaded, retries,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0, null);
}

@Override
Expand Down Expand Up @@ -162,6 +166,10 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
": " + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
}

if(metrics != null && HBaseServerException.isServerOverloaded(t)){
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
}
} finally {
interceptor.updateFailureInfo(context);
}
Expand Down