diff --git a/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClientTest.java b/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClientTest.java index 80fd473b3d9..7c3736d8102 100644 --- a/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClientTest.java +++ b/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClientTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import java.io.IOException; +import java.io.InterruptedIOException; import java.time.Duration; import java.util.List; import java.util.Map; @@ -57,6 +58,7 @@ class OkHttpRestClientTest { private static final Map TEST_HEADERS = Map.of("foo", "bar"); + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); private static final String TEST_PATH = "v1/test"; private final MockWebServer mockWebServer = new MockWebServer(); @@ -92,7 +94,7 @@ void getsResponseAsync() throws InterruptedException { .addHeader("Content-Type", "application/json")); final SafeFuture>> responseFuture = - underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition); + underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -118,7 +120,7 @@ void getsResponseAsync() throws InterruptedException { void getsResponseAsyncIgnoringResponseBody() throws InterruptedException { mockWebServer.enqueue(new MockResponse().setResponseCode(200)); - final SafeFuture> responseFuture = underTest.getAsync(TEST_PATH); + final SafeFuture> responseFuture = underTest.getAsync(TEST_PATH, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -134,13 +136,33 @@ void getsResponseAsyncIgnoringResponseBody() throws InterruptedException { assertThat(request.getMethod()).isEqualTo("GET"); } + @Test + void getsResponseAsyncTimeouts() throws InterruptedException { + mockWebServer.enqueue( + new MockResponse().setResponseCode(200).setHeadersDelay(1, TimeUnit.SECONDS)); + + final SafeFuture> responseFuture = + underTest.getAsync(TEST_PATH, Duration.ofMillis(100)); + + assertThat(responseFuture) + .failsWithin(Duration.ofSeconds(1)) + .withThrowableThat() + .withCauseInstanceOf(InterruptedIOException.class) + .withMessageContaining("timeout"); + + final RecordedRequest request = mockWebServer.takeRequest(); + + assertThat(request.getPath()).isEqualTo("/" + TEST_PATH); + assertThat(request.getMethod()).isEqualTo("GET"); + } + @Test void getAsyncHandlesFailures() throws InterruptedException { final String errorBody = "{\"code\":400,\"message\":\"Invalid block: missing signature\"}"; mockWebServer.enqueue(new MockResponse().setResponseCode(400).setBody(errorBody)); final SafeFuture>> responseFuture = - underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition); + underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -157,7 +179,7 @@ void getAsyncHandlesFailures() throws InterruptedException { mockWebServer.enqueue(new MockResponse().setResponseCode(500)); final SafeFuture>> secondResponseFuture = - underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition); + underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT); assertThat(secondResponseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -195,7 +217,8 @@ void postsAsync() throws InterruptedException { final RequestContainer requestBodyObject = new RequestContainer(2); final SafeFuture>> responseFuture = - underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition); + underTest.postAsync( + TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -232,7 +255,8 @@ void postsAsSszAsync() throws InterruptedException { final RequestContainer requestBodyObject = new RequestContainer(2); final SafeFuture>> responseFuture = - underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, true, responseTypeDefinition); + underTest.postAsync( + TEST_PATH, Map.of(), requestBodyObject, true, responseTypeDefinition, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -272,7 +296,8 @@ void postsAsyncDoesNotThrowExceptionsInOtherThreadsWhenRequestCreationFails() { doAnswer(invocation -> failingSchema).when(requestBodyObject).getSchema(); final SafeFuture>> responseFuture = - underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition); + underTest.postAsync( + TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition, TEST_TIMEOUT); // this will fail if there are uncaught exceptions in other threads Waiter.waitFor(() -> assertThat(responseFuture).isDone(), 30, TimeUnit.SECONDS, false); @@ -291,7 +316,7 @@ void postsAsyncIgnoringResponseBody() throws InterruptedException { final RequestContainer requestBodyObject = new RequestContainer(2); final SafeFuture> responseFuture = - underTest.postAsync(TEST_PATH, requestBodyObject, false); + underTest.postAsync(TEST_PATH, requestBodyObject, false, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -318,7 +343,7 @@ void postsAsSszAsyncIgnoringResponseBody() throws InterruptedException { final RequestContainer requestBodyObject = new RequestContainer(2); final SafeFuture> responseFuture = - underTest.postAsync(TEST_PATH, requestBodyObject, true); + underTest.postAsync(TEST_PATH, requestBodyObject, true, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) @@ -350,7 +375,7 @@ void postsAsSszAsyncDetectsUnsupportedMediaTypeError() throws InterruptedExcepti final RequestContainer requestBodyObject = new RequestContainer(2); final SafeFuture> responseFuture = - underTest.postAsync(TEST_PATH, requestBodyObject, true); + underTest.postAsync(TEST_PATH, requestBodyObject, true, TEST_TIMEOUT); assertThat(responseFuture) .succeedsWithin(Duration.ofSeconds(1)) diff --git a/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClientTest.java b/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClientTest.java index 197077a4caf..3cf4d56929b 100644 --- a/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClientTest.java +++ b/ethereum/executionclient/src/integration-test/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClientTest.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.net.ConnectException; import java.nio.charset.StandardCharsets; @@ -36,7 +37,7 @@ import java.util.Locale; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Pattern; import okhttp3.OkHttpClient; @@ -332,18 +333,23 @@ void registerValidators_trySszAfterBackoffTime() { @TestTemplate void registerValidators_shouldNotFallbackWhenTimingOut() { - // Creates a RestBuilderClient that always timeout + // Creates a RestBuilderClient that has a very tiny timeout restBuilderClient = new RestBuilderClient( - forcedTimeoutRestBuilderClientOptions(), okHttpRestClient, timeProvider, spec, true); + tinyTimeoutRestBuilderClientOptions(), okHttpRestClient, timeProvider, spec, true); final SszList signedValidatorRegistrations = createSignedValidatorRegistrations(); + // set up delayed 200 response + mockWebServer.enqueue( + new MockResponse().setResponseCode(200).setHeadersDelay(1, TimeUnit.SECONDS)); + assertThat(restBuilderClient.registerValidators(SLOT, signedValidatorRegistrations)) .failsWithin(WAIT_FOR_CALL_COMPLETION) .withThrowableThat() - .withCauseInstanceOf(TimeoutException.class); + .withCauseInstanceOf(InterruptedIOException.class) + .withMessageContaining("timeout"); verifyPostRequestSsz("/eth/v1/builder/validators", signedValidatorRegistrations); // Check that we do not fallback into JSON (only 1 request) @@ -356,7 +362,7 @@ void registerValidators_shouldNotFallbackWhenFailingForNonHttpReasons() { // Mock asyncPost with ssz to fail doReturn(SafeFuture.failedFuture(new ConnectException())) .when(okHttpRestClient) - .postAsync(eq(BuilderApiMethod.REGISTER_VALIDATOR.getPath()), any(), eq(true)); + .postAsync(eq(BuilderApiMethod.REGISTER_VALIDATOR.getPath()), any(), eq(true), any()); restBuilderClient = new RestBuilderClient(restBuilderClientOptions, okHttpRestClient, timeProvider, spec, true); @@ -846,8 +852,11 @@ private static String changeResponseVersion(final String json, final SpecMilesto "(?<=version\":\\s?\")\\w+", newVersion.toString().toLowerCase(Locale.ROOT)); } - private RestBuilderClientOptions forcedTimeoutRestBuilderClientOptions() { + private RestBuilderClientOptions tinyTimeoutRestBuilderClientOptions() { return new RestBuilderClientOptions( - Duration.ofSeconds(0), Duration.ofSeconds(0), Duration.ofSeconds(0), Duration.ofSeconds(0)); + Duration.ofMillis(100), + Duration.ofMillis(100), + Duration.ofMillis(100), + Duration.ofMillis(100)); } } diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClient.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClient.java index 63d6984e872..dbaf436d413 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClient.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/OkHttpRestClient.java @@ -18,8 +18,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URL; +import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import okhttp3.Call; import okhttp3.Callback; import okhttp3.HttpUrl; @@ -49,29 +51,33 @@ public OkHttpRestClient(final OkHttpClient httpClient, final String baseEndpoint } @Override - public SafeFuture> getAsync(final String apiPath) { + public SafeFuture> getAsync(final String apiPath, final Duration timeout) { final Request request = createGetRequest(apiPath, NO_HEADERS); - return makeAsyncVoidRequest(request); + return makeAsyncVoidRequest(request, timeout); } @Override public SafeFuture>> getAsync( final String apiPath, final Map headers, - final ResponseSchemaAndDeserializableTypeDefinition responseSchema) { + final ResponseSchemaAndDeserializableTypeDefinition responseSchema, + final Duration timeout) { final Request request = createGetRequest(apiPath, headers); - return makeAsyncRequest(request, Optional.of(responseSchema)); + return makeAsyncRequest(request, Optional.of(responseSchema), timeout); } @Override public SafeFuture> postAsync( - final String apiPath, final TReq requestBodyObject, final boolean postAsSsz) { + final String apiPath, + final TReq requestBodyObject, + final boolean postAsSsz, + final Duration timeout) { final RequestBody requestBody = postAsSsz ? createOctetStreamRequestBody(requestBodyObject) : createJsonRequestBody(requestBodyObject); final Request request = createPostRequest(apiPath, requestBody, NO_HEADERS); - return makeAsyncVoidRequest(request); + return makeAsyncVoidRequest(request, timeout); } @Override @@ -81,13 +87,14 @@ SafeFuture>> postAsync( final Map headers, final TReq requestBodyObject, final boolean postAsSsz, - final ResponseSchemaAndDeserializableTypeDefinition responseSchema) { + final ResponseSchemaAndDeserializableTypeDefinition responseSchema, + final Duration timeout) { final RequestBody requestBody = postAsSsz ? createOctetStreamRequestBody(requestBodyObject) : createJsonRequestBody(requestBodyObject); final Request request = createPostRequest(apiPath, requestBody, headers); - return makeAsyncRequest(request, Optional.of(responseSchema)); + return makeAsyncRequest(request, Optional.of(responseSchema), timeout); } private Request createGetRequest(final String apiPath, final Map headers) { @@ -151,16 +158,20 @@ private URL createHttpUrl(final String apiPath) { private SafeFuture>> makeAsyncRequest( final Request request, - final Optional> responseSchemaMaybe) { + final Optional> responseSchemaMaybe, + final Duration timeout) { final Call call = httpClient.newCall(request); + call.timeout().timeout(timeout.toMillis(), TimeUnit.MILLISECONDS); final ResponseHandler responseHandler = new ResponseHandler<>(responseSchemaMaybe); final Callback responseCallback = createResponseCallback(request, responseHandler); call.enqueue(responseCallback); return responseHandler.getFutureResponse(); } - private SafeFuture> makeAsyncVoidRequest(final Request request) { + private SafeFuture> makeAsyncVoidRequest( + final Request request, final Duration timeout) { final Call call = httpClient.newCall(request); + call.timeout().timeout(timeout.toMillis(), TimeUnit.MILLISECONDS); final ResponseHandlerVoid responseHandler = new ResponseHandlerVoid(); final Callback responseCallback = createResponseCallback(request, responseHandler); call.enqueue(responseCallback); diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClient.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClient.java index 9c9fe814668..5dfca3659e6 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClient.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestBuilderClient.java @@ -100,9 +100,8 @@ public RestBuilderClient( @Override public SafeFuture> status() { - return restClient - .getAsync(BuilderApiMethod.GET_STATUS.getPath()) - .orTimeout(options.builderStatusTimeout()); + return restClient.getAsync( + BuilderApiMethod.GET_STATUS.getPath(), options.builderStatusTimeout()); } @Override @@ -114,8 +113,7 @@ public SafeFuture> registerValidators( } if (nextSszRegisterValidatorsTryMillis.isGreaterThan(timeProvider.getTimeInMillis())) { - return registerValidatorsUsingJson(signedValidatorRegistrations) - .orTimeout(options.builderRegisterValidatorTimeout()); + return registerValidatorsUsingJson(signedValidatorRegistrations); } return registerValidatorsUsingSsz(signedValidatorRegistrations) @@ -134,20 +132,25 @@ public SafeFuture> registerValidators( } return SafeFuture.completedFuture(response); - }) - .orTimeout(options.builderRegisterValidatorTimeout()); + }); } private SafeFuture> registerValidatorsUsingJson( final SszList signedValidatorRegistrations) { return restClient.postAsync( - BuilderApiMethod.REGISTER_VALIDATOR.getPath(), signedValidatorRegistrations, false); + BuilderApiMethod.REGISTER_VALIDATOR.getPath(), + signedValidatorRegistrations, + false, + options.builderRegisterValidatorTimeout()); } private SafeFuture> registerValidatorsUsingSsz( final SszList signedValidatorRegistrations) { return restClient.postAsync( - BuilderApiMethod.REGISTER_VALIDATOR.getPath(), signedValidatorRegistrations, true); + BuilderApiMethod.REGISTER_VALIDATOR.getPath(), + signedValidatorRegistrations, + true, + options.builderRegisterValidatorTimeout()); } @Override @@ -183,8 +186,8 @@ public SafeFuture>> getHeader( setUserAgentHeader ? GET_HEADER_HTTP_HEADERS_WITH_USER_AGENT : GET_HEADER_HTTP_HEADERS_WITHOUT_USER_AGENT, - responseTypeDefinition) - .orTimeout(options.builderProposalDelayTolerance()) + responseTypeDefinition, + options.builderProposalDelayTolerance()) .thenApply( response -> response.unwrapVersioned( @@ -217,12 +220,12 @@ public SafeFuture> getPayload( ACCEPT_HEADER), signedBlindedBeaconBlock, LAST_RECEIVED_HEADER_WAS_IN_SSZ.get(), - responseTypeDefinition) + responseTypeDefinition, + options.builderGetPayloadTimeout()) .thenApply( response -> response.unwrapVersioned( - this::extractBuilderPayload, milestone, BuilderApiResponse::version, false)) - .orTimeout(options.builderGetPayloadTimeout()); + this::extractBuilderPayload, milestone, BuilderApiResponse::version, false)); } private diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestClient.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestClient.java index 2d195e644e5..e32df53b857 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestClient.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/rest/RestClient.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.ethereum.executionclient.rest; +import java.time.Duration; import java.util.Collections; import java.util.Map; import tech.pegasys.teku.ethereum.executionclient.schema.BuilderApiResponse; @@ -25,15 +26,16 @@ public interface RestClient { Map NO_HEADERS = Collections.emptyMap(); - SafeFuture> getAsync(String apiPath); + SafeFuture> getAsync(String apiPath, Duration timeout); SafeFuture>> getAsync( String apiPath, Map headers, - ResponseSchemaAndDeserializableTypeDefinition responseSchema); + ResponseSchemaAndDeserializableTypeDefinition responseSchema, + Duration timeout); SafeFuture> postAsync( - String apiPath, TReq requestBodyObject, boolean postAsSsz); + String apiPath, TReq requestBodyObject, boolean postAsSsz, Duration timeout); SafeFuture>> postAsync( @@ -41,7 +43,8 @@ SafeFuture>> postAsync( Map headers, TReq requestBodyObject, boolean postAsSsz, - ResponseSchemaAndDeserializableTypeDefinition responseSchema); + ResponseSchemaAndDeserializableTypeDefinition responseSchema, + Duration timeout); record ResponseSchemaAndDeserializableTypeDefinition( SszSchema responseSchema,