Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using the remote HTTP cache when it became unreachable #24685

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ private static boolean shouldEnableRemoteOutputService(RemoteOptions options) {
return retry;
};

public static final Predicate<? super Exception> HTTP_SUCCESS_CODES =
e -> {
boolean success = false;
if (e instanceof HttpException) {
int status = ((HttpException) e).response().status().code();
success =
status == HttpResponseStatus.NOT_FOUND.code();
}
return success;
};

private void initHttpAndDiskCache(
CommandEnvironment env,
Credentials credentials,
Expand All @@ -247,7 +258,7 @@ private void initHttpAndDiskCache(
digestUtil,
executorService,
new RemoteRetrier(
remoteOptions, RETRIABLE_HTTP_ERRORS, retryScheduler, circuitBreaker));
remoteOptions, RETRIABLE_HTTP_ERRORS, retryScheduler, circuitBreaker, HTTP_SUCCESS_CODES));
} catch (IOException e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
Expand Down Expand Up @@ -474,7 +485,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
CircuitBreakerFactory.createCircuitBreaker(remoteOptions);
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker);
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker, RemoteRetrier.GRPC_SUCCESS_CODES);

if (!Strings.isNullOrEmpty(remoteOptions.remoteOutputService)) {
var bazelOutputServiceChannel =
Expand Down Expand Up @@ -648,7 +659,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
retryScheduler,
circuitBreaker);
circuitBreaker,
RemoteRetrier.GRPC_SUCCESS_CODES);
remoteExecutor =
new ExperimentalGrpcRemoteExecutor(
remoteOptions, execChannel.retain(), callCredentialsProvider, execRetrier);
Expand All @@ -658,7 +670,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
circuitBreaker);
circuitBreaker,
RemoteRetrier.GRPC_SUCCESS_CODES);
remoteExecutor =
new GrpcRemoteExecutor(execChannel.retain(), callCredentialsProvider, execRetrier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,30 @@ private static Status fromException(Exception e) {
return RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND);
};

public static final Predicate<? super Exception> GRPC_SUCCESS_CODES =
e -> {
Status s = fromException(e);
if (s == null) {
// It's not a gRPC error.
return false;
}
switch (s.getCode()) {
case INVALID_ARGUMENT:
case NOT_FOUND:
case ALREADY_EXISTS:
case OUT_OF_RANGE:
return true;
default:
return false;
}
};

public RemoteRetrier(
RemoteOptions options,
Predicate<? super Exception> shouldRetry,
ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
this(
super(
options.remoteMaxRetryAttempts > 0
? () -> new ExponentialBackoff(options)
: () -> RETRIES_DISABLED,
Expand All @@ -81,6 +99,22 @@ public RemoteRetrier(
circuitBreaker);
}

public RemoteRetrier(
RemoteOptions options,
Predicate<? super Exception> shouldRetry,
ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker,
Predicate<? super Exception> isSuccess) {
super(
options.remoteMaxRetryAttempts > 0
? () -> new ExponentialBackoff(options)
: () -> RETRIES_DISABLED,
shouldRetry,
retryScheduler,
circuitBreaker,
isSuccess);
}

public RemoteRetrier(
Supplier<Backoff> backoff,
Predicate<? super Exception> shouldRetry,
Expand Down
36 changes: 33 additions & 3 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,18 @@ public int getRetryAttempts() {
}
}

/* Treating all retriable errors as successful api call. */
public static final Predicate<? super Exception> ALWAYS_SUCCESS =
e -> {
return true;
};

private final Supplier<Backoff> backoffSupplier;
private final Predicate<? super Exception> shouldRetry;
private final CircuitBreaker circuitBreaker;
private final ListeningScheduledExecutorService retryService;
private final Sleeper sleeper;
private final Predicate<? super Exception> isSuccess;

public Retrier(
Supplier<Backoff> backoffSupplier,
Expand All @@ -191,6 +198,20 @@ public Retrier(
backoffSupplier, shouldRetry, retryScheduler, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
}

public Retrier(
Supplier<Backoff> backoffSupplier,
Predicate<? super Exception> shouldRetry,
ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker,
Predicate<? super Exception> isSuccess) {
this.backoffSupplier = backoffSupplier;
this.shouldRetry = shouldRetry;
this.retryService = retryScheduler;
this.circuitBreaker = circuitBreaker;
this.sleeper = TimeUnit.MILLISECONDS::sleep;
this.isSuccess = isSuccess;
}

@VisibleForTesting
Retrier(
Supplier<Backoff> backoffSupplier,
Expand All @@ -203,6 +224,7 @@ public Retrier(
this.retryService = retryService;
this.circuitBreaker = circuitBreaker;
this.sleeper = sleeper;
this.isSuccess = ALWAYS_SUCCESS;
}

ListeningScheduledExecutorService getRetryService() {
Expand Down Expand Up @@ -248,8 +270,12 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, InterruptedException.class);
if (!shouldRetry.test(e)) {
// A non-retriable error doesn't represent server failure.
circuitBreaker.recordSuccess();
// A non-retriable error may represent either success or server failure.
if (isSuccess.test(e)) {
circuitBreaker.recordSuccess();
} else {
circuitBreaker.recordFailure();
}
throw e;
}
circuitBreaker.recordFailure();
Expand Down Expand Up @@ -321,7 +347,11 @@ private <T> ListenableFuture<T> onExecuteAsyncFailure(
// gRPC Errors NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS etc. are non-retriable error, and they
// don't represent an
// issue in Server. So treating these errors as successful api call.
circuitBreaker.recordSuccess();
if (isSuccess.test(t)) {
circuitBreaker.recordSuccess();
} else {
circuitBreaker.recordFailure();
}
return Futures.immediateFailedFuture(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
/** Factory for {@link Retrier.CircuitBreaker} */
public class CircuitBreakerFactory {
public static final int DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE = 100;
public static final int DEFAULT_MIN_FAIL_COUNT_TO_COMPUTE_FAILURE_RATE = 12;

private CircuitBreakerFactory() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
private final int failureRateThreshold;
private final int slidingWindowSize;
private final int minCallCountToComputeFailureRate;
private final int minFailCountToComputeFailureRate;
private final ScheduledExecutorService scheduledExecutor;

/**
Expand All @@ -52,6 +53,8 @@ public FailureCircuitBreaker(int failureRateThreshold, int slidingWindowSize) {
this.slidingWindowSize = slidingWindowSize;
this.minCallCountToComputeFailureRate =
CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE;
this.minFailCountToComputeFailureRate =
CircuitBreakerFactory.DEFAULT_MIN_FAIL_COUNT_TO_COMPUTE_FAILURE_RATE;
this.state = State.ACCEPT_CALLS;
this.scheduledExecutor =
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
Expand All @@ -72,7 +75,7 @@ public void recordFailure() {
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}

if (totalCallCount < minCallCountToComputeFailureRate) {
if (totalCallCount < minCallCountToComputeFailureRate && failureCount < minFailCountToComputeFailureRate) {
// The remote call count is below the threshold required to calculate the failure rate.
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,45 @@ public void testCircuitBreakerFailureAndSuccessCallOnDifferentGrpcError() {
assertThat(cb.state).isEqualTo(State.ACCEPT_CALLS);
}

@Test
public void testCircuitBreakerFailureAndSuccessCallOnNonRetriableGrpcError() {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/ 2);
List<Status> nonRetriableFailure =
Arrays.asList(Status.PERMISSION_DENIED, Status.UNIMPLEMENTED, Status.DATA_LOSS);
List<Status> nonRetriableSuccess =
Arrays.asList(Status.NOT_FOUND, Status.OUT_OF_RANGE, Status.ALREADY_EXISTS);
TripAfterNCircuitBreaker cb =
new TripAfterNCircuitBreaker(nonRetriableFailure.size());
Retrier r = new Retrier(s, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, cb, RemoteRetrier.GRPC_SUCCESS_CODES);

int expectedConsecutiveFailures = 0;

for (Status status : nonRetriableFailure) {
ListenableFuture<Void> res =
r.executeAsync(
() -> {
throw new StatusRuntimeException(status);
});
expectedConsecutiveFailures += 1;
assertThrows(ExecutionException.class, res::get);
assertThat(cb.consecutiveFailures).isEqualTo(expectedConsecutiveFailures);
}

assertThat(cb.state).isEqualTo(State.REJECT_CALLS);
cb.trialCall();

for (Status status : nonRetriableSuccess) {
ListenableFuture<Void> res =
r.executeAsync(
() -> {
throw new StatusRuntimeException(status);
});
assertThat(cb.consecutiveFailures).isEqualTo(0);
assertThrows(ExecutionException.class, res::get);
}
assertThat(cb.state).isEqualTo(State.ACCEPT_CALLS);
}

/** Simple circuit breaker that trips after N consecutive failures. */
@ThreadSafe
private static class TripAfterNCircuitBreaker implements CircuitBreaker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void testRecordFailure_circuitTrips() throws InterruptedException {
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for windowInterval + 1ms.
Thread.sleep(windowInterval + 1 /*to compensate any delay*/);
// Sleep for windowInterval + 5ms.
Thread.sleep(windowInterval + 5 /*to compensate any delay*/);

// make calls equals to threshold number of not ignored failure calls in parallel.
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
Expand All @@ -64,21 +64,40 @@ public void testRecordFailure_circuitTrips() throws InterruptedException {

@Test
public void testRecordFailure_minCallCriteriaNotMet() throws InterruptedException {
final int failureRateThreshold = 10;
final int failureRateThreshold = 0;
final int windowInterval = 100;
final int minCallToComputeFailure =
CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE;
FailureCircuitBreaker failureCircuitBreaker =
new FailureCircuitBreaker(failureRateThreshold, windowInterval);

// make half failure call, half success call and number of total call less than
// make success calls, failure call and number of total calls less than
// minCallToComputeFailure.
IntStream.range(0, minCallToComputeFailure >> 1)
.parallel()
.forEach(i -> failureCircuitBreaker.recordFailure());
IntStream.range(0, minCallToComputeFailure >> 1)
IntStream.range(0, minCallToComputeFailure - 2)
.parallel()
.forEach(i -> failureCircuitBreaker.recordSuccess());
failureCircuitBreaker.recordFailure();
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for less than windowInterval.
Thread.sleep(windowInterval - 50);
failureCircuitBreaker.recordFailure();
assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS);
}

@Test
public void testRecordFailure_minFailCriteriaNotMet() throws InterruptedException {
final int failureRateThreshold = 10;
final int windowInterval = 100;
final int minFailToComputeFailure =
CircuitBreakerFactory.DEFAULT_MIN_FAIL_COUNT_TO_COMPUTE_FAILURE_RATE;
FailureCircuitBreaker failureCircuitBreaker =
new FailureCircuitBreaker(failureRateThreshold, windowInterval);

// make number of failure calls less than minFailToComputeFailure.
IntStream.range(0, minFailToComputeFailure - 1)
.parallel()
.forEach(i -> failureCircuitBreaker.recordFailure());
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);

// Sleep for less than windowInterval.
Expand Down
Loading