Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public void onRejection(Exception e) {

public abstract boolean shouldRetry(Exception e);

protected long calculateDelay(long previousDelay) {
return Math.min(previousDelay * 2, Integer.MAX_VALUE);
protected long calculateDelayBound(long previousDelayBound) {
return Math.min(previousDelayBound * 2, Integer.MAX_VALUE);
}

protected long minimumDelayMillis() {
return 1L;
return 0L;
}

public void onFinished() {
Expand Down Expand Up @@ -145,10 +145,12 @@ public void onFailure(Exception e) {
} else {
addException(e);

final long nextDelayMillisBound = calculateDelay(delayMillisBound);
final long nextDelayMillisBound = calculateDelayBound(delayMillisBound);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + minimumDelayMillis();
int range = Math.toIntExact((delayMillisBound + 1) / 2);
final long delayMillis = Randomness.get().nextInt(range) + delayMillisBound - range + 1L;
assert delayMillis > 0;
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testRetryableActionTimeout() {
final AtomicInteger retryCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(1), future) {
TimeValue.timeValueMillis(randomFrom(1, 10, randomIntBetween(100, 2000))), TimeValue.timeValueSeconds(1), future) {

@Override
public void tryAction(ActionListener<Boolean> listener) {
Expand All @@ -119,6 +119,7 @@ public boolean shouldRetry(Exception e) {
return e instanceof EsRejectedExecutionException;
}
};
long begin = taskQueue.getCurrentTimeMillis();
retryableAction.run();
taskQueue.runAllRunnableTasks();
long previousDeferredTime = 0;
Expand All @@ -133,6 +134,10 @@ public boolean shouldRetry(Exception e) {
assertFalse(taskQueue.hasRunnableTasks());

expectThrows(EsRejectedExecutionException.class, future::actionGet);

long end = taskQueue.getCurrentTimeMillis();
// max 3x timeout since we minimum wait half the bound for every retry.
assertThat(end - begin, lessThanOrEqualTo(3000L));
}

public void testTimeoutOfZeroMeansNoRetry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ private abstract class MlRetryableAction<Request, Response> extends RetryableAct
final Consumer<String> msgHandler;
final BiConsumer<Request, ActionListener<Response>> action;
volatile int currentAttempt = 0;
volatile long currentMin = MIN_RETRY_SLEEP_MILLIS;
volatile long currentMax = MIN_RETRY_SLEEP_MILLIS;

MlRetryableAction(String jobId,
Expand Down Expand Up @@ -453,30 +452,21 @@ public boolean shouldRetry(Exception e) {
}

@Override
protected long calculateDelay(long previousDelay) {
// Since we exponentially increase, we don't want force randomness to have an excessively long sleep
if (currentMax < MAX_RETRY_SLEEP_MILLIS) {
currentMin = currentMax;
}
protected long calculateDelayBound(long previousDelayBound) {
// Exponential backoff calculation taken from: https://en.wikipedia.org/wiki/Exponential_backoff
int uncappedBackoff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) * (50);
currentMax = Math.min(uncappedBackoff, MAX_RETRY_SLEEP_MILLIS);
// Its good to have a random window along the exponentially increasing curve
// so that not all bulk requests rest for the same amount of time
int randBound = (int)(1 + (currentMax - currentMin));
String msg = new ParameterizedMessage(
"failed to {} after [{}] attempts. Will attempt again.",
getName(),
currentAttempt)
.getFormattedMessage();
LOGGER.warn(() -> new ParameterizedMessage("[{}] {}", jobId, msg));
msgHandler.accept(msg);
return randBound;
}

@Override
protected long minimumDelayMillis() {
return currentMin;
// RetryableAction randomizes in the interval [currentMax/2 ; currentMax].
// Its good to have a random window along the exponentially increasing curve
// so that not all bulk requests rest for the same amount of time
return currentMax;
}

@Override
Expand Down