Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if (isServerOverloaded) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,12 @@ private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttemp
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
asyncProcess.connectionConfiguration.getPauseMillis());
}

MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) {
metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
}

if (numAttempt > asyncProcess.startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private void tryScheduleRetry(Throwable error) {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
tries++;

if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private final Runnable completeWhenNoMoreResultsInRegion;

protected final AsyncConnectionImpl conn;

private final CompletableFuture<Boolean> future;

private final HBaseRpcController controller;
Expand Down Expand Up @@ -318,6 +320,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
Expand Down Expand Up @@ -441,6 +444,10 @@ private void onError(Throwable error) {
return;
}
tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {

this.stats = ServerStatisticTracker.create(conf);
this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);

this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

boolean shouldListen =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Expand Down Expand Up @@ -330,6 +328,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.metaCache = new MetaCache(this.metrics);

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory =
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

// Do we publish the status?
if (shouldListen) {
Expand Down Expand Up @@ -1056,6 +1058,11 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou
// Only relocate the parent region if necessary
relocateMeta =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);

if (metrics != null && HBaseServerException.isServerOverloaded(e)) {
metrics.incrementServerOverloadedBackoffTime(
ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS);
}
} finally {
userRegionLock.unlock();
}
Expand Down Expand Up @@ -2183,8 +2190,8 @@ public TableState getTableState(TableName tableName) throws IOException {

@Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor,
this.getStatisticsTracker());
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
metrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,8 +1305,9 @@ public <R extends Message> void batchCoprocessorService(
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];

AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
AsyncProcess asyncProcess = new AsyncProcess(
connection, configuration, RpcRetryingCallerFactory.instantiate(configuration,
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
RpcControllerFactory.instantiate(configuration));

Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a
this.addr = addr;
this.multiplexer = htableMultiplexer;
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
conn == null ? null : conn.getConnectionMetrics());
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;

// dynamic metrics

Expand Down Expand Up @@ -377,6 +378,9 @@ protected Ratio getRatio() {
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));

this.overloadedBackoffTimer =
registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));

this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
}
Expand Down Expand Up @@ -449,6 +453,10 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}

public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ public class RpcRetryingCallerFactory {
private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final MetricsConnection metrics;

public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
}

public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
MetricsConnection metrics) {
this.conf = conf;
this.connectionConf = new ConnectionConfiguration(conf);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor;
this.metrics = metrics;
}

/**
Expand All @@ -54,7 +57,7 @@ public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
// is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, rpcTimeout);
interceptor, startLogErrorsCnt, rpcTimeout, metrics);
}

/**
Expand All @@ -65,26 +68,30 @@ public <T> RpcRetryingCaller<T> newCaller() {
// is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout());
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ServerStatisticTracker stats) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
ServerStatisticTracker stats, MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor);
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
Expand Down Expand Up @@ -63,15 +64,11 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker;
private final MetricsConnection metrics;

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
int startLogErrorsCnt) {
this(pause, pauseForServerOverloaded, retries,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
}

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
MetricsConnection metricsConnection) {
this.pause = pause;
this.pauseForServerOverloaded = pauseForServerOverloaded;
this.maxAttempts = retries2Attempts(retries);
Expand All @@ -80,6 +77,7 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr
this.startLogErrorsCnt = startLogErrorsCnt;
this.tracker = new RetryingTimeTracker();
this.rpcTimeout = rpcTimeout;
this.metrics = metricsConnection;
}

@Override
Expand Down Expand Up @@ -158,6 +156,9 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
}
if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
}
} finally {
interceptor.updateFailureInfo(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ public Result[] call(int timeout) throws IOException {
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
regionReplication * 5);
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
cConnection == null ? null : cConnection.getConnectionMetrics()),
pool, regionReplication * 5);

AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false);
Expand Down Expand Up @@ -381,8 +382,11 @@ class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>,
// and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).<
Result[]> newCaller();
this.caller =
RpcRetryingCallerFactory
.instantiate(ScannerCallableWithReplicas.this.conf,
cConnection == null ? null : cConnection.getConnectionMetrics())
.<Result[]> newCaller();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected String rpcCall() throws Exception {
return response.getBulkToken();
}
};
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
Expand All @@ -91,7 +91,7 @@ protected Void rpcCall() throws Exception {
return null;
}
};
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller()
.callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
}
});

return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException {
Expand Down Expand Up @@ -307,7 +308,7 @@ static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> {
private final IOException e;

public CallerWithFailure(IOException e) {
super(100, 500, 100, 9);
super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
this.e = e;
}

Expand Down Expand Up @@ -412,7 +413,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
replicaCalls.incrementAndGet();
}

return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
});
});
mr.addException(REGION_INFO.getRegionName(), IOE);
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ private void itUsesSpecialPauseForServerOverloaded(
long pauseMillis = 1;
long specialPauseMillis = 2;

RpcRetryingCallerImpl<Void> caller =
new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0);
RpcRetryingCallerImpl<Void> caller = new RpcRetryingCallerImpl<>(pauseMillis,
specialPauseMillis, 2, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 0, 0, null);

RetryingCallable<Void> callable =
new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ public HRegionServer(final Configuration conf) throws IOException {
serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);

rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf,
clusterConnection == null ? null : clusterConnection.getConnectionMetrics());

// login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection c
this.sink = sink;
this.connection = connection;
this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory =
RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
this.rpcRetryingCallerFactory = RpcRetryingCallerFactory
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool;
this.tableDescriptors = tableDescriptors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]>
List<LoadQueueItem> toRetry = new ArrayList<>();
try {
Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
Expand Down
Loading