diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 6e4ed552931f..11579fee51ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -486,6 +486,10 @@ private void tryResubmit(Stream actions, int tries, boolean immediately, } else { delayNs = getPauseTime(pauseNsToUse, tries - 1); } + if(isServerOverloaded){ + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 5b536feeb955..a0dc7ef2a90d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -754,6 +754,12 @@ private void resubmit(ServerName oldServer, List toReplay, backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.connectionConfiguration.getPauseMillis()); } + + MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics(); + if(metrics != null && HBaseServerException.isServerOverloaded(throwable)){ + metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS); + } + if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 82ab90a47fa7..5a911690c77d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -140,6 +140,11 @@ private void tryScheduleRetry(Throwable error) { delayNs = getPauseTime(pauseNsToUse, tries - 1); } tries++; + + if(HBaseServerException.isServerOverloaded(error)){ + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 8bcf77a882d6..9fcb45c5b5dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -111,6 +111,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final Runnable completeWhenNoMoreResultsInRegion; + protected final AsyncConnectionImpl conn; + private final CompletableFuture future; private final HBaseRpcController controller; @@ -311,6 +313,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; + this.conn = conn; this.scan = scan; this.scanMetrics = scanMetrics; this.scannerId = scannerId; @@ -436,6 +439,11 @@ private void onError(Throwable error) { return; } tries++; + + if(HBaseServerException.isServerOverloaded(error)){ + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 33797e763a29..5f89119d3882 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -271,10 +272,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.stats = ServerStatisticTracker.create(conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); + boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); @@ -300,8 +299,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.metrics = null; } this.metaCache = new MetaCache(this.metrics); - this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics); + this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); // Do we publish the status? if (shouldListen) { @@ -1018,10 +1019,15 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou // Only relocate the parent region if necessary relocateMeta = !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); + + if(metrics != null && HBaseServerException.isServerOverloaded(e)){ + metrics.incrementServerOverloadedBackoffTime( + ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS); + } } finally { userRegionLock.unlock(); } - try{ + try { Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + @@ -2154,7 +2160,7 @@ public TableState getTableState(TableName tableName) throws IOException { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { return RpcRetryingCallerFactory - .instantiate(conf, this.interceptor, this.getStatisticsTracker()); + .instantiate(conf, this.interceptor, this.getStatisticsTracker(), metrics); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index d73c8cea52d4..b4b93c2ecfa6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -317,6 +317,7 @@ private static interface NewMetric { protected final Counter hedgedReadWin; protected final Histogram concurrentCallsPerServerHist; protected final Histogram numActionsPerServerHist; + protected final Timer overloadedBackoffTimer; // dynamic metrics @@ -379,6 +380,8 @@ protected Ratio getRatio() { "concurrentCallsPerServer", scope)); this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); + this.overloadedBackoffTimer = registry.timer(name(this.getClass(), + "overloadedBackoffDurationMs", scope)); this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); @@ -452,6 +455,11 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) { this.runnerStats.updateDelayInterval(interval); } + + public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit){ + overloadedBackoffTimer.update(time, timeUnit); + } + /** * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 7425f8837f62..768f9161dbbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import java.util.concurrent.ExecutionException; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; /** * Factory to create an {@link RpcRetryingCaller} @@ -34,22 +36,29 @@ public class RpcRetryingCallerFactory { private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final MetricsConnection metrics; + /* These below data members are UNUSED!!!*/ private final boolean enableBackPressure; private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { - this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { + this(conf, interceptor, null); + } + + public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, MetricsConnection metrics){ this.conf = conf; this.connectionConf = new ConnectionConfiguration(conf); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, - AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, - HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + this.metrics = metrics; } /** @@ -69,7 +78,7 @@ public RpcRetryingCaller newCaller(int rpcTimeout) { connectionConf.getPauseMillis(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), - interceptor, startLogErrorsCnt, rpcTimeout); + interceptor, startLogErrorsCnt, rpcTimeout, metrics); } /** @@ -83,26 +92,35 @@ public RpcRetryingCaller newCaller() { connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), interceptor, startLogErrorsCnt, - connectionConf.getRpcTimeout()); + connectionConf.getRpcTimeout(), metrics); + + + } + //// TODO(baugenreich) should i worry about the other instantiate usages?? for example this is used in HTableMultiplexer to create a new asyncProcess public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, ServerStatisticTracker stats) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, null); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats){ + return instantiate(configuration, interceptor, stats, null); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, MetricsConnection metrics) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); RpcRetryingCallerFactory factory; if (rpcCallerFactoryClazz.equals(clazzName)) { - factory = new RpcRetryingCallerFactory(configuration, interceptor); + factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics); } else { factory = ReflectionUtils.instantiateWithCustomCtor( rpcCallerFactoryClazz, new Class[] { Configuration.class }, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 57a864174439..443e2d31c8c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -28,6 +28,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -68,14 +69,11 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; - public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - int startLogErrorsCnt) { - this(pause, pauseForServerOverloaded, retries, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); - } + private final MetricsConnection metrics; public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout, + MetricsConnection metricsConnection){ this.pause = pause; this.pauseForServerOverloaded = pauseForServerOverloaded; this.maxAttempts = retries2Attempts(retries); @@ -84,6 +82,12 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr this.startLogErrorsCnt = startLogErrorsCnt; this.tracker = new RetryingTimeTracker(); this.rpcTimeout = rpcTimeout; + this.metrics = metricsConnection; + } + public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, + int startLogErrorsCnt) { + this(pause, pauseForServerOverloaded, retries, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0, null); } @Override @@ -162,6 +166,10 @@ public T callWithRetries(RetryingCallable callable, int callTimeout) ": " + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } + + if(metrics != null && HBaseServerException.isServerOverloaded(t)){ + metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS); + } } finally { interceptor.updateFailureInfo(context); }