diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 0baebc31cf127d..793bdcb13fc291 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -8,6 +8,7 @@ package( filegroup( name = "srcs", srcs = glob(["*"]) + [ + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/main/java/com/google/devtools/build/lib/remote/common:srcs", "//src/main/java/com/google/devtools/build/lib/remote/disk:srcs", "//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs", @@ -85,6 +86,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/exec/local", "//src/main/java/com/google/devtools/build/lib/packages/semantics", "//src/main/java/com/google/devtools/build/lib/profiler", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 9add1cd34bfdcd..470d3845dc3273 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -500,4 +500,8 @@ ListenableFuture uploadChunker( MoreExecutors.directExecutor()); return f; } + + Retrier getRetrier() { + return this.retrier; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index d30dff698408e7..97abfd36eefbf2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -252,4 +252,8 @@ public void close() { } channel.release(); } + + RemoteRetrier getRetrier() { + return this.retrier; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 8291eb06e6e68c..84e7571c697ad2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -60,6 +60,7 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry; import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; @@ -510,12 +511,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { GoogleAuthUtils.newCallCredentialsProvider(credentials); CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); + Retrier.CircuitBreaker circuitBreaker = + CircuitBreakerFactory.createCircuitBreaker(remoteOptions); RemoteRetrier retrier = new RemoteRetrier( - remoteOptions, - RemoteRetrier.RETRIABLE_GRPC_ERRORS, - retryScheduler, - Retrier.ALLOW_ALL_CALLS); + remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker); // We only check required capabilities for a given endpoint. // @@ -636,7 +636,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally retryScheduler, - Retrier.ALLOW_ALL_CALLS); + circuitBreaker); remoteExecutor = new ExperimentalGrpcRemoteExecutor( executionCapabilities, @@ -650,7 +650,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions, RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS, retryScheduler, - Retrier.ALLOW_ALL_CALLS); + circuitBreaker); remoteExecutor = new GrpcRemoteExecutor( executionCapabilities, execChannel.retain(), callCredentialsProvider, execRetrier); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 7a42a266b7550f..9b2101ac95aba7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -57,6 +57,7 @@ import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult; import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs; +import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.OperationObserver; import com.google.devtools.build.lib.remote.options.RemoteOptions; @@ -655,6 +656,8 @@ private void report(Event evt) { private static RemoteRetrier createExecuteRetrier( RemoteOptions options, ListeningScheduledExecutorService retryService) { return new ExecuteRetrier( - options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS); + options.remoteMaxRetryAttempts, + retryService, + CircuitBreakerFactory.createCircuitBreaker(options)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java index 4711a06eb9e454..457880268764d5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -100,7 +100,7 @@ enum State { State state(); /** Called after an execution failed. */ - void recordFailure(); + void recordFailure(Exception e); /** Called after an execution succeeded. */ void recordSuccess(); @@ -130,7 +130,7 @@ public State state() { } @Override - public void recordFailure() {} + public void recordFailure(Exception e) {} @Override public void recordSuccess() {} @@ -245,7 +245,7 @@ public T execute(Callable call, Backoff backoff) throws Exception { circuitBreaker.recordSuccess(); return r; } catch (Exception e) { - circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(e); Throwables.throwIfInstanceOf(e, InterruptedException.class); if (State.TRIAL_CALL.equals(circuitState)) { throw e; @@ -272,19 +272,35 @@ public ListenableFuture executeAsync(AsyncCallable call) { * backoff. */ public ListenableFuture executeAsync(AsyncCallable call, Backoff backoff) { + final State circuitState = circuitBreaker.state(); + if (State.REJECT_CALLS.equals(circuitState)) { + return Futures.immediateFailedFuture(new CircuitBreakerException()); + } try { + ListenableFuture future = + Futures.transformAsync( + call.call(), + (f) -> { + circuitBreaker.recordSuccess(); + return Futures.immediateFuture(f); + }, + MoreExecutors.directExecutor()); return Futures.catchingAsync( - call.call(), + future, Exception.class, - t -> onExecuteAsyncFailure(t, call, backoff), + t -> onExecuteAsyncFailure(t, call, backoff, circuitState), MoreExecutors.directExecutor()); } catch (Exception e) { - return onExecuteAsyncFailure(e, call, backoff); + return onExecuteAsyncFailure(e, call, backoff, circuitState); } } private ListenableFuture onExecuteAsyncFailure( - Exception t, AsyncCallable call, Backoff backoff) { + Exception t, AsyncCallable call, Backoff backoff, State circuitState) { + circuitBreaker.recordFailure(t); + if (circuitState.equals(State.TRIAL_CALL)) { + return Futures.immediateFailedFuture(t); + } if (isRetriable(t)) { long waitMillis = backoff.nextDelayMillis(t); if (waitMillis >= 0) { @@ -310,4 +326,8 @@ public Backoff newBackoff() { public boolean isRetriable(Exception e) { return shouldRetry.test(e); } + + CircuitBreaker getCircuitBreaker() { + return this.circuitBreaker; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD new file mode 100644 index 00000000000000..caa358ec8bbd44 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD @@ -0,0 +1,23 @@ +load("@rules_java//java:defs.bzl", "java_library") + +package( + default_applicable_licenses = ["//:license"], + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + srcs = glob(["*"]), + visibility = ["//src:__subpackages__"], +) + +java_library( + name = "circuitbreaker", + srcs = glob(["*.java"]), + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", + "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/options", + "//third_party:guava", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java new file mode 100644 index 00000000000000..be8835b7c0b2d3 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java @@ -0,0 +1,45 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.remote.Retrier; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.options.RemoteOptions; + +/** Factory for {@link Retrier.CircuitBreaker} */ +public class CircuitBreakerFactory { + + public static final ImmutableSet> DEFAULT_IGNORED_ERRORS = + ImmutableSet.of(CacheNotFoundException.class); + + private CircuitBreakerFactory() {} + + /** + * Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in + * {@link RemoteOptions}. In case of undefined strategy defaults to {@link + * Retrier.ALLOW_ALL_CALLS} implementation. + * + * @param remoteOptions The configuration for the CircuitBreaker implementation. + * @return an instance of CircuitBreaker. + */ + public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) { + if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) { + return new FailureCircuitBreaker( + remoteOptions.remoteFailureThreshold, + (int) remoteOptions.remoteFailureWindowInterval.toMillis()); + } + return Retrier.ALLOW_ALL_CALLS; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java new file mode 100644 index 00000000000000..b1b5739fd44c96 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java @@ -0,0 +1,83 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.remote.Retrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents + * further calls to a remote cache once the number of failures within a given window exceeds a + * specified threshold for a build. In the context of Bazel, a new instance of {@link + * Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during + * a build, the remote cache will be disabled for that build. However, it will be enabled again for + * the next build as a new instance of {@link Retrier.CircuitBreaker} will be created. + */ +public class FailureCircuitBreaker implements Retrier.CircuitBreaker { + + private State state; + private final AtomicInteger failures; + private final int failureThreshold; + private final int slidingWindowSize; + private final ScheduledExecutorService scheduledExecutor; + private final ImmutableSet> ignoredErrors; + + /** + * Creates a {@link FailureCircuitBreaker}. + * + * @param failureThreshold is used to set the number of failures required to trip the circuit + * breaker in given time window. + * @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number + * of failures. + */ + public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) { + this.failureThreshold = failureThreshold; + this.failures = new AtomicInteger(0); + this.slidingWindowSize = slidingWindowSize; + this.state = State.ACCEPT_CALLS; + this.scheduledExecutor = + slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null; + this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS; + } + + @Override + public State state() { + return this.state; + } + + @Override + public void recordFailure(Exception e) { + if (!ignoredErrors.contains(e.getClass())) { + int failureCount = failures.incrementAndGet(); + if (slidingWindowSize > 0) { + var unused = + scheduledExecutor.schedule( + failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS); + } + // Since the state can only be changed to the open state, synchronization is not required. + if (failureCount > this.failureThreshold) { + this.state = State.REJECT_CALLS; + } + } + } + + @Override + public void recordSuccess() { + // do nothing, implement if we need to set threshold on failure rate instead of count. + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java index cd5f89b77fc875..10254348cf4527 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java @@ -660,6 +660,43 @@ public RemoteOutputsStrategyConverter() { + "cache misses and retries.") public boolean remoteDiscardMerkleTrees; + @Option( + name = "experimental_circuit_breaker_strategy", + documentationCategory = OptionDocumentationCategory.REMOTE, + defaultValue = "null", + effectTags = {OptionEffectTag.EXECUTION}, + converter = CircuitBreakerStrategy.Converter.class, + help = + "Specifies the strategy for the circuit breaker to use. Available strategies are" + + " \"failure\". On invalid value for the option the behavior same as the option is" + + " not set.") + public CircuitBreakerStrategy circuitBreakerStrategy; + + @Option( + name = "experimental_remote_failure_threshold", + defaultValue = "100", + documentationCategory = OptionDocumentationCategory.REMOTE, + effectTags = {OptionEffectTag.EXECUTION}, + help = + "Sets the allowed number of failures in a specific time window after which it stops" + + " calling to the remote cache/executor. By default the value is 100. Setting this" + + " to 0 or negative means no limitation.") + public int remoteFailureThreshold; + + @Option( + name = "experimental_remote_failure_window_interval", + defaultValue = "60s", + documentationCategory = OptionDocumentationCategory.REMOTE, + effectTags = {OptionEffectTag.EXECUTION}, + converter = RemoteDurationConverter.class, + help = + "The interval in which the failure count of the remote requests are computed. On zero or" + + " negative value the failure duration is computed the whole duration of the" + + " execution.Following units can be used: Days (d), hours (h), minutes (m), seconds" + + " (s), and milliseconds (ms). If the unit is omitted, the value is interpreted as" + + " seconds.") + public Duration remoteFailureWindowInterval; + // The below options are not configurable by users, only tests. // This is part of the effort to reduce the overall number of flags. @@ -749,4 +786,16 @@ public boolean shouldPrintMessages(boolean success) { || this == ExecutionMessagePrintMode.ALL); } } + + /** An enum for specifying different strategy for circuit breaker. */ + public enum CircuitBreakerStrategy { + FAILURE; + + /** Converts to {@link CircuitBreakerStrategy}. */ + public static class Converter extends EnumConverter { + public Converter() { + super(CircuitBreakerStrategy.class, "CircuitBreaker strategy"); + } + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index a010b14eb4341d..2f3da6d42a7562 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -9,6 +9,7 @@ filegroup( name = "srcs", testonly = 0, srcs = glob(["**"]) + [ + "//src/test/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs", "//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs", "//src/test/java/com/google/devtools/build/lib/remote/http:srcs", @@ -74,6 +75,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher", "//src/main/java/com/google/devtools/build/lib/remote:remote_output_checker", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java index a56673388d1779..f94953092b1bdc 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java @@ -42,6 +42,7 @@ import com.google.devtools.build.lib.exec.BinTools; import com.google.devtools.build.lib.exec.ExecutionOptions; import com.google.devtools.build.lib.pkgcache.PackageOptions; +import com.google.devtools.build.lib.remote.circuitbreaker.FailureCircuitBreaker; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.runtime.BlazeRuntime; import com.google.devtools.build.lib.runtime.BlazeServerStartupOptions; @@ -72,6 +73,7 @@ import java.net.URI; import java.time.Duration; import java.util.ArrayList; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -79,7 +81,8 @@ /** Tests for {@link RemoteModule}. */ @RunWith(JUnit4.class) public final class RemoteModuleTest { - + private static final String EXECUTION_SERVER_NAME = "execution-server"; + private static final String CACHE_SERVER_NAME = "cache-server"; private static final ServerCapabilities CACHE_ONLY_CAPS = ServerCapabilities.newBuilder() .setLowApiVersion(ApiVersion.current.toSemVer()) @@ -106,6 +109,32 @@ public final class RemoteModuleTest { CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) .build(); + private static final ServerCapabilities EXEC_ONLY_CAPS = + ServerCapabilities.newBuilder() + .setLowApiVersion(ApiVersion.current.toSemVer()) + .setHighApiVersion(ApiVersion.current.toSemVer()) + .setExecutionCapabilities( + ExecutionCapabilities.newBuilder() + .setExecEnabled(true) + .setDigestFunction(Value.SHA256) + .build()) + .build(); + + private static final ServerCapabilities NONE_CAPS = + ServerCapabilities.newBuilder() + .setLowApiVersion(ApiVersion.current.toSemVer()) + .setHighApiVersion(ApiVersion.current.toSemVer()) + .build(); + + private static final CapabilitiesImpl INACCESSIBLE_GRPC_REMOTE = + new CapabilitiesImpl(null) { + @Override + public void getCapabilities( + GetCapabilitiesRequest request, StreamObserver responseObserver) { + responseObserver.onError(new UnsupportedOperationException()); + } + }; + private static CommandEnvironment createTestCommandEnvironment( RemoteModule remoteModule, RemoteOptions remoteOptions) throws IOException, AbruptExitException { @@ -188,21 +217,27 @@ private static Server createFakeServer(String serverName, BindableService... ser .build(); } + private RemoteModule remoteModule; + private RemoteOptions remoteOptions; + + @Before + public void initialize() { + remoteModule = new RemoteModule(); + remoteModule.setChannelFactory( + (target, proxy, options, interceptors) -> + InProcessChannelBuilder.forName(target).directExecutor().build()); + remoteOptions = Options.getDefaults(RemoteOptions.class); + } + @Test public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws Exception { CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -210,6 +245,7 @@ public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws E assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); executionServer.awaitTermination(); @@ -219,18 +255,11 @@ public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws E @Test public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -238,6 +267,7 @@ public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { assertThat(Thread.interrupted()).isFalse(); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { cacheServer.shutdownNow(); cacheServer.awaitTermination(); @@ -246,37 +276,18 @@ public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { @Test public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() throws Exception { - ServerCapabilities caps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setExecutionCapabilities( - ExecutionCapabilities.newBuilder() - .setExecEnabled(true) - .setDigestFunction(Value.SHA256) - .build()) - .setCacheCapabilities( - CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) - .build(); - CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(caps); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(caps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -285,6 +296,7 @@ public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() thro assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); cacheServer.shutdownNow(); @@ -296,42 +308,18 @@ public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() thro @Test public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws Exception { - ServerCapabilities executionOnlyCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setExecutionCapabilities( - ExecutionCapabilities.newBuilder() - .setExecEnabled(true) - .setDigestFunction(Value.SHA256) - .build()) - .build(); - CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(executionOnlyCaps); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_ONLY_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); - ServerCapabilities cacheOnlyCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setCacheCapabilities( - CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) - .build(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(cacheOnlyCaps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -340,6 +328,7 @@ public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws E assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); cacheServer.shutdownNow(); @@ -352,24 +341,13 @@ public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws E @Test public void testLocalFallback_shouldErrorForRemoteCacheWithoutRequiredCapabilities() throws Exception { - ServerCapabilities noneCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .build(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(noneCaps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(NONE_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -383,26 +361,12 @@ public void testLocalFallback_shouldErrorForRemoteCacheWithoutRequiredCapabiliti @Test public void testLocalFallback_shouldErrorInaccessibleGrpcRemoteCacheIfFlagNotSet() throws Exception { - String cacheServerName = "cache-server"; - CapabilitiesImplBase cacheServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = false; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -415,26 +379,12 @@ public void getCapabilities( @Test public void testLocalFallback_shouldIgnoreInaccessibleGrpcRemoteCache() throws Exception { - String cacheServerName = "cache-server"; - CapabilitiesImplBase cacheServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -445,6 +395,7 @@ public void getCapabilities( assertThat(actionContextProvider).isNotNull(); assertThat(actionContextProvider.getRemoteCache()).isNull(); assertThat(actionContextProvider.getRemoteExecutionClient()).isNull(); + assertCircuitBreakerInstance(); } finally { cacheServer.shutdownNow(); cacheServer.awaitTermination(); @@ -453,26 +404,12 @@ public void getCapabilities( @Test public void testLocalFallback_shouldIgnoreInaccessibleGrpcRemoteExecutor() throws Exception { - CapabilitiesImplBase executionServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = createFakeServer(EXECUTION_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -483,6 +420,7 @@ public void getCapabilities( assertThat(actionContextProvider).isNotNull(); assertThat(actionContextProvider.getRemoteCache()).isNull(); assertThat(actionContextProvider.getRemoteExecutionClient()).isNull(); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); executionServer.awaitTermination(); @@ -496,8 +434,6 @@ public void testNetrc_netrcWithoutRemoteCache() throws Exception { Scratch scratch = new Scratch(fileSystem); scratch.file(netrc, "machine foo.example.org login baruser password barpass"); AuthAndTLSOptions authAndTLSOptions = Options.getDefaults(AuthAndTLSOptions.class); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - Cache>> credentialCache = Caffeine.newBuilder().build(); @@ -523,18 +459,11 @@ public void testNetrc_netrcWithoutRemoteCache() throws Exception { @Test public void testCacheCapabilities_propagatedToRemoteCache() throws Exception { CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -555,18 +484,12 @@ public void testCacheCapabilities_propagatedToRemoteCache() throws Exception { @Test public void testCacheCapabilities_propagatedToRemoteExecutionCache() throws Exception { CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -583,4 +506,80 @@ public void testCacheCapabilities_propagatedToRemoteExecutionCache() throws Exce executionServer.awaitTermination(); } } + + @Test + public void testVerifyCapabilities_executionAndCacheForSingleEndpointWithCircuitBreaker() + throws Exception { + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); + executionServer.start(); + + try { + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.circuitBreakerStrategy = RemoteOptions.CircuitBreakerStrategy.FAILURE; + + CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); + + remoteModule.beforeCommand(env); + + assertThat(Thread.interrupted()).isFalse(); + assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); + } finally { + executionServer.shutdownNow(); + executionServer.awaitTermination(); + } + } + + @Test + public void testVerifyCapabilities_cacheOnlyEndpointWithCircuitBreaker() throws Exception { + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); + cacheServer.start(); + + try { + remoteOptions.remoteCache = CACHE_SERVER_NAME; + remoteOptions.circuitBreakerStrategy = RemoteOptions.CircuitBreakerStrategy.FAILURE; + + CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); + + remoteModule.beforeCommand(env); + + assertThat(Thread.interrupted()).isFalse(); + assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); + } finally { + cacheServer.shutdownNow(); + cacheServer.awaitTermination(); + } + } + + private void assertCircuitBreakerInstance() { + RemoteActionContextProvider actionContextProvider = remoteModule.getActionContextProvider(); + assertThat(actionContextProvider).isNotNull(); + + Retrier.CircuitBreaker circuitBreaker; + if (actionContextProvider.getRemoteCache() != null) { + circuitBreaker = + ((GrpcCacheClient) actionContextProvider.getRemoteCache().cacheProtocol) + .getRetrier() + .getCircuitBreaker(); + } else if (actionContextProvider.getRemoteExecutionClient() != null) { + circuitBreaker = + ((GrpcRemoteExecutor) actionContextProvider.getRemoteExecutionClient()) + .getRetrier() + .getCircuitBreaker(); + } else { + // no remote cache or execution configured, circuitBreaker is null + return; + } + + if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) { + assertThat(circuitBreaker).isInstanceOf(FailureCircuitBreaker.class); + } + if (remoteOptions.circuitBreakerStrategy == null) { + assertThat(circuitBreaker).isEqualTo(Retrier.ALLOW_ALL_CALLS); + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java index a739309c21b2c1..7c30e1bf6eddc3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -93,7 +94,7 @@ public void retryShouldWork_failure() throws Exception { assertThat(e).hasMessageThat().isEqualTo("call failed"); assertThat(numCalls.get()).isEqualTo(3); - verify(alwaysOpen, times(3)).recordFailure(); + verify(alwaysOpen, times(3)).recordFailure(any(Exception.class)); verify(alwaysOpen, never()).recordSuccess(); } @@ -117,7 +118,7 @@ public void retryShouldWorkNoRetries_failure() throws Exception { assertThat(e).hasMessageThat().isEqualTo("call failed"); assertThat(numCalls.get()).isEqualTo(1); - verify(alwaysOpen, times(1)).recordFailure(); + verify(alwaysOpen, times(1)).recordFailure(e); verify(alwaysOpen, never()).recordSuccess(); } @@ -138,7 +139,7 @@ public void retryShouldWork_success() throws Exception { }); assertThat(val).isEqualTo(1); - verify(alwaysOpen, times(2)).recordFailure(); + verify(alwaysOpen, times(2)).recordFailure(any(Exception.class)); verify(alwaysOpen, times(1)).recordSuccess(); } @@ -350,7 +351,7 @@ public synchronized State state() { } @Override - public synchronized void recordFailure() { + public synchronized void recordFailure(Exception e) { consecutiveFailures++; if (consecutiveFailures >= maxConsecutiveFailures) { state = State.REJECT_CALLS; diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD new file mode 100644 index 00000000000000..987a81bbcbadaa --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD @@ -0,0 +1,31 @@ +load("@rules_java//java:defs.bzl", "java_test") + +package( + default_applicable_licenses = ["//:license"], + default_testonly = 1, + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + testonly = 0, + srcs = glob(["**"]), + visibility = ["//src:__subpackages__"], +) + +java_test( + name = "circuitbreaker", + srcs = glob(["*.java"]), + test_class = "com.google.devtools.build.lib.AllTests", + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", + "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/options", + "//src/main/java/com/google/devtools/common/options", + "//src/test/java/com/google/devtools/build/lib:test_runner", + "//third_party:junit4", + "//third_party:truth", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java new file mode 100644 index 00000000000000..51baee1162f2eb --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java @@ -0,0 +1,44 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.devtools.build.lib.remote.Retrier; +import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.options.RemoteOptions.CircuitBreakerStrategy; +import com.google.devtools.common.options.Options; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link CircuitBreakerFactory}. */ +@RunWith(JUnit4.class) +public class CircuitBreakerFactoryTest { + @Test + public void testCreateCircuitBreaker_failureStrategy() { + RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + remoteOptions.circuitBreakerStrategy = CircuitBreakerStrategy.FAILURE; + + assertThat(CircuitBreakerFactory.createCircuitBreaker(remoteOptions)) + .isInstanceOf(FailureCircuitBreaker.class); + } + + @Test + public void testCreateCircuitBreaker_nullStrategy() { + RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + assertThat(CircuitBreakerFactory.createCircuitBreaker(remoteOptions)) + .isEqualTo(Retrier.ALLOW_ALL_CALLS); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java new file mode 100644 index 00000000000000..2d00a8e0e816ab --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java @@ -0,0 +1,68 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import static com.google.common.truth.Truth.assertThat; + +import build.bazel.remote.execution.v2.Digest; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FailureCircuitBreakerTest { + + @Test + public void testRecordFailure() throws InterruptedException { + final int failureThreshold = 10; + final int windowInterval = 100; + FailureCircuitBreaker failureCircuitBreaker = + new FailureCircuitBreaker(failureThreshold, windowInterval); + + List listOfExceptionThrownOnFailure = new ArrayList<>(); + for (int index = 0; index < failureThreshold; index++) { + listOfExceptionThrownOnFailure.add(new Exception()); + } + for (int index = 0; index < failureThreshold * 9; index++) { + listOfExceptionThrownOnFailure.add(new CacheNotFoundException(Digest.newBuilder().build())); + } + + Collections.shuffle(listOfExceptionThrownOnFailure); + + // make calls equals to threshold number of not ignored failure calls in parallel. + listOfExceptionThrownOnFailure.stream() + .parallel() + .forEach(failureCircuitBreaker::recordFailure); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); + + // Sleep for windowInterval + 1ms. + Thread.sleep(windowInterval + 1 /*to compensate any delay*/); + + // make calls equals to threshold number of not ignored failure calls in parallel. + listOfExceptionThrownOnFailure.stream() + .parallel() + .forEach(failureCircuitBreaker::recordFailure); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); + + // Sleep for less than windowInterval. + Thread.sleep(windowInterval - 5); + failureCircuitBreaker.recordFailure(new Exception()); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS); + } +}