Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,6 +58,7 @@
class OkHttpRestClientTest {

private static final Map<String, String> TEST_HEADERS = Map.of("foo", "bar");
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
private static final String TEST_PATH = "v1/test";

private final MockWebServer mockWebServer = new MockWebServer();
Expand Down Expand Up @@ -92,7 +94,7 @@ void getsResponseAsync() throws InterruptedException {
.addHeader("Content-Type", "application/json"));

final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> responseFuture =
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition);
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand All @@ -118,7 +120,7 @@ void getsResponseAsync() throws InterruptedException {
void getsResponseAsyncIgnoringResponseBody() throws InterruptedException {
mockWebServer.enqueue(new MockResponse().setResponseCode(200));

final SafeFuture<Response<Void>> responseFuture = underTest.getAsync(TEST_PATH);
final SafeFuture<Response<Void>> responseFuture = underTest.getAsync(TEST_PATH, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand All @@ -134,13 +136,33 @@ void getsResponseAsyncIgnoringResponseBody() throws InterruptedException {
assertThat(request.getMethod()).isEqualTo("GET");
}

@Test
void getsResponseAsyncTimeouts() throws InterruptedException {
mockWebServer.enqueue(
new MockResponse().setResponseCode(200).setHeadersDelay(1, TimeUnit.SECONDS));

final SafeFuture<Response<Void>> responseFuture =
underTest.getAsync(TEST_PATH, Duration.ofMillis(100));

assertThat(responseFuture)
.failsWithin(Duration.ofSeconds(1))
.withThrowableThat()
.withCauseInstanceOf(InterruptedIOException.class)
.withMessageContaining("timeout");

final RecordedRequest request = mockWebServer.takeRequest();

assertThat(request.getPath()).isEqualTo("/" + TEST_PATH);
assertThat(request.getMethod()).isEqualTo("GET");
}

@Test
void getAsyncHandlesFailures() throws InterruptedException {
final String errorBody = "{\"code\":400,\"message\":\"Invalid block: missing signature\"}";
mockWebServer.enqueue(new MockResponse().setResponseCode(400).setBody(errorBody));

final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> responseFuture =
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition);
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand All @@ -157,7 +179,7 @@ void getAsyncHandlesFailures() throws InterruptedException {
mockWebServer.enqueue(new MockResponse().setResponseCode(500));

final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> secondResponseFuture =
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition);
underTest.getAsync(TEST_PATH, TEST_HEADERS, responseTypeDefinition, TEST_TIMEOUT);

assertThat(secondResponseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand Down Expand Up @@ -195,7 +217,8 @@ void postsAsync() throws InterruptedException {

final RequestContainer requestBodyObject = new RequestContainer(2);
final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> responseFuture =
underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition);
underTest.postAsync(
TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand Down Expand Up @@ -232,7 +255,8 @@ void postsAsSszAsync() throws InterruptedException {

final RequestContainer requestBodyObject = new RequestContainer(2);
final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> responseFuture =
underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, true, responseTypeDefinition);
underTest.postAsync(
TEST_PATH, Map.of(), requestBodyObject, true, responseTypeDefinition, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand Down Expand Up @@ -272,7 +296,8 @@ void postsAsyncDoesNotThrowExceptionsInOtherThreadsWhenRequestCreationFails() {
doAnswer(invocation -> failingSchema).when(requestBodyObject).getSchema();

final SafeFuture<Response<BuilderApiResponse<ResponseContainer>>> responseFuture =
underTest.postAsync(TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition);
underTest.postAsync(
TEST_PATH, Map.of(), requestBodyObject, false, responseTypeDefinition, TEST_TIMEOUT);

// this will fail if there are uncaught exceptions in other threads
Waiter.waitFor(() -> assertThat(responseFuture).isDone(), 30, TimeUnit.SECONDS, false);
Expand All @@ -291,7 +316,7 @@ void postsAsyncIgnoringResponseBody() throws InterruptedException {

final RequestContainer requestBodyObject = new RequestContainer(2);
final SafeFuture<Response<Void>> responseFuture =
underTest.postAsync(TEST_PATH, requestBodyObject, false);
underTest.postAsync(TEST_PATH, requestBodyObject, false, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand All @@ -318,7 +343,7 @@ void postsAsSszAsyncIgnoringResponseBody() throws InterruptedException {

final RequestContainer requestBodyObject = new RequestContainer(2);
final SafeFuture<Response<Void>> responseFuture =
underTest.postAsync(TEST_PATH, requestBodyObject, true);
underTest.postAsync(TEST_PATH, requestBodyObject, true, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand Down Expand Up @@ -350,7 +375,7 @@ void postsAsSszAsyncDetectsUnsupportedMediaTypeError() throws InterruptedExcepti

final RequestContainer requestBodyObject = new RequestContainer(2);
final SafeFuture<Response<Void>> responseFuture =
underTest.postAsync(TEST_PATH, requestBodyObject, true);
underTest.postAsync(TEST_PATH, requestBodyObject, true, TEST_TIMEOUT);

assertThat(responseFuture)
.succeedsWithin(Duration.ofSeconds(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -332,18 +333,23 @@ void registerValidators_trySszAfterBackoffTime() {

@TestTemplate
void registerValidators_shouldNotFallbackWhenTimingOut() {
// Creates a RestBuilderClient that always timeout
// Creates a RestBuilderClient that has a very tiny timeout
restBuilderClient =
new RestBuilderClient(
forcedTimeoutRestBuilderClientOptions(), okHttpRestClient, timeProvider, spec, true);
tinyTimeoutRestBuilderClientOptions(), okHttpRestClient, timeProvider, spec, true);

final SszList<SignedValidatorRegistration> signedValidatorRegistrations =
createSignedValidatorRegistrations();

// set up delayed 200 response
mockWebServer.enqueue(
new MockResponse().setResponseCode(200).setHeadersDelay(1, TimeUnit.SECONDS));

assertThat(restBuilderClient.registerValidators(SLOT, signedValidatorRegistrations))
.failsWithin(WAIT_FOR_CALL_COMPLETION)
.withThrowableThat()
.withCauseInstanceOf(TimeoutException.class);
.withCauseInstanceOf(InterruptedIOException.class)
.withMessageContaining("timeout");

verifyPostRequestSsz("/eth/v1/builder/validators", signedValidatorRegistrations);
// Check that we do not fallback into JSON (only 1 request)
Expand All @@ -356,7 +362,7 @@ void registerValidators_shouldNotFallbackWhenFailingForNonHttpReasons() {
// Mock asyncPost with ssz to fail
doReturn(SafeFuture.failedFuture(new ConnectException()))
.when(okHttpRestClient)
.postAsync(eq(BuilderApiMethod.REGISTER_VALIDATOR.getPath()), any(), eq(true));
.postAsync(eq(BuilderApiMethod.REGISTER_VALIDATOR.getPath()), any(), eq(true), any());
restBuilderClient =
new RestBuilderClient(restBuilderClientOptions, okHttpRestClient, timeProvider, spec, true);

Expand Down Expand Up @@ -846,8 +852,11 @@ private static String changeResponseVersion(final String json, final SpecMilesto
"(?<=version\":\\s?\")\\w+", newVersion.toString().toLowerCase(Locale.ROOT));
}

private RestBuilderClientOptions forcedTimeoutRestBuilderClientOptions() {
private RestBuilderClientOptions tinyTimeoutRestBuilderClientOptions() {
return new RestBuilderClientOptions(
Duration.ofSeconds(0), Duration.ofSeconds(0), Duration.ofSeconds(0), Duration.ofSeconds(0));
Duration.ofMillis(100),
Duration.ofMillis(100),
Duration.ofMillis(100),
Duration.ofMillis(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
Expand Down Expand Up @@ -49,29 +51,33 @@ public OkHttpRestClient(final OkHttpClient httpClient, final String baseEndpoint
}

@Override
public SafeFuture<Response<Void>> getAsync(final String apiPath) {
public SafeFuture<Response<Void>> getAsync(final String apiPath, final Duration timeout) {
final Request request = createGetRequest(apiPath, NO_HEADERS);
return makeAsyncVoidRequest(request);
return makeAsyncVoidRequest(request, timeout);
}

@Override
public <TResp extends SszData> SafeFuture<Response<BuilderApiResponse<TResp>>> getAsync(
final String apiPath,
final Map<String, String> headers,
final ResponseSchemaAndDeserializableTypeDefinition<TResp> responseSchema) {
final ResponseSchemaAndDeserializableTypeDefinition<TResp> responseSchema,
final Duration timeout) {
final Request request = createGetRequest(apiPath, headers);
return makeAsyncRequest(request, Optional.of(responseSchema));
return makeAsyncRequest(request, Optional.of(responseSchema), timeout);
}

@Override
public <TReq extends SszData> SafeFuture<Response<Void>> postAsync(
final String apiPath, final TReq requestBodyObject, final boolean postAsSsz) {
final String apiPath,
final TReq requestBodyObject,
final boolean postAsSsz,
final Duration timeout) {
final RequestBody requestBody =
postAsSsz
? createOctetStreamRequestBody(requestBodyObject)
: createJsonRequestBody(requestBodyObject);
final Request request = createPostRequest(apiPath, requestBody, NO_HEADERS);
return makeAsyncVoidRequest(request);
return makeAsyncVoidRequest(request, timeout);
}

@Override
Expand All @@ -81,13 +87,14 @@ SafeFuture<Response<BuilderApiResponse<TResp>>> postAsync(
final Map<String, String> headers,
final TReq requestBodyObject,
final boolean postAsSsz,
final ResponseSchemaAndDeserializableTypeDefinition<TResp> responseSchema) {
final ResponseSchemaAndDeserializableTypeDefinition<TResp> responseSchema,
final Duration timeout) {
final RequestBody requestBody =
postAsSsz
? createOctetStreamRequestBody(requestBodyObject)
: createJsonRequestBody(requestBodyObject);
final Request request = createPostRequest(apiPath, requestBody, headers);
return makeAsyncRequest(request, Optional.of(responseSchema));
return makeAsyncRequest(request, Optional.of(responseSchema), timeout);
}

private Request createGetRequest(final String apiPath, final Map<String, String> headers) {
Expand Down Expand Up @@ -151,16 +158,20 @@ private URL createHttpUrl(final String apiPath) {

private <TResp extends SszData> SafeFuture<Response<BuilderApiResponse<TResp>>> makeAsyncRequest(
final Request request,
final Optional<ResponseSchemaAndDeserializableTypeDefinition<TResp>> responseSchemaMaybe) {
final Optional<ResponseSchemaAndDeserializableTypeDefinition<TResp>> responseSchemaMaybe,
final Duration timeout) {
final Call call = httpClient.newCall(request);
call.timeout().timeout(timeout.toMillis(), TimeUnit.MILLISECONDS);
final ResponseHandler<TResp> responseHandler = new ResponseHandler<>(responseSchemaMaybe);
final Callback responseCallback = createResponseCallback(request, responseHandler);
call.enqueue(responseCallback);
return responseHandler.getFutureResponse();
}

private SafeFuture<Response<Void>> makeAsyncVoidRequest(final Request request) {
private SafeFuture<Response<Void>> makeAsyncVoidRequest(
final Request request, final Duration timeout) {
final Call call = httpClient.newCall(request);
call.timeout().timeout(timeout.toMillis(), TimeUnit.MILLISECONDS);
final ResponseHandlerVoid responseHandler = new ResponseHandlerVoid();
final Callback responseCallback = createResponseCallback(request, responseHandler);
call.enqueue(responseCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ public RestBuilderClient(

@Override
public SafeFuture<Response<Void>> status() {
return restClient
.getAsync(BuilderApiMethod.GET_STATUS.getPath())
.orTimeout(options.builderStatusTimeout());
return restClient.getAsync(
BuilderApiMethod.GET_STATUS.getPath(), options.builderStatusTimeout());
}

@Override
Expand All @@ -114,8 +113,7 @@ public SafeFuture<Response<Void>> registerValidators(
}

if (nextSszRegisterValidatorsTryMillis.isGreaterThan(timeProvider.getTimeInMillis())) {
return registerValidatorsUsingJson(signedValidatorRegistrations)
.orTimeout(options.builderRegisterValidatorTimeout());
return registerValidatorsUsingJson(signedValidatorRegistrations);
}

return registerValidatorsUsingSsz(signedValidatorRegistrations)
Expand All @@ -134,20 +132,25 @@ public SafeFuture<Response<Void>> registerValidators(
}

return SafeFuture.completedFuture(response);
})
.orTimeout(options.builderRegisterValidatorTimeout());
});
}

private SafeFuture<Response<Void>> registerValidatorsUsingJson(
final SszList<SignedValidatorRegistration> signedValidatorRegistrations) {
return restClient.postAsync(
BuilderApiMethod.REGISTER_VALIDATOR.getPath(), signedValidatorRegistrations, false);
BuilderApiMethod.REGISTER_VALIDATOR.getPath(),
signedValidatorRegistrations,
false,
options.builderRegisterValidatorTimeout());
}

private SafeFuture<Response<Void>> registerValidatorsUsingSsz(
final SszList<SignedValidatorRegistration> signedValidatorRegistrations) {
return restClient.postAsync(
BuilderApiMethod.REGISTER_VALIDATOR.getPath(), signedValidatorRegistrations, true);
BuilderApiMethod.REGISTER_VALIDATOR.getPath(),
signedValidatorRegistrations,
true,
options.builderRegisterValidatorTimeout());
}

@Override
Expand Down Expand Up @@ -183,8 +186,8 @@ public SafeFuture<Response<Optional<SignedBuilderBid>>> getHeader(
setUserAgentHeader
? GET_HEADER_HTTP_HEADERS_WITH_USER_AGENT
: GET_HEADER_HTTP_HEADERS_WITHOUT_USER_AGENT,
responseTypeDefinition)
.orTimeout(options.builderProposalDelayTolerance())
responseTypeDefinition,
options.builderProposalDelayTolerance())
.thenApply(
response ->
response.unwrapVersioned(
Expand Down Expand Up @@ -217,12 +220,12 @@ public SafeFuture<Response<BuilderPayload>> getPayload(
ACCEPT_HEADER),
signedBlindedBeaconBlock,
LAST_RECEIVED_HEADER_WAS_IN_SSZ.get(),
responseTypeDefinition)
responseTypeDefinition,
options.builderGetPayloadTimeout())
.thenApply(
response ->
response.unwrapVersioned(
this::extractBuilderPayload, milestone, BuilderApiResponse::version, false))
.orTimeout(options.builderGetPayloadTimeout());
this::extractBuilderPayload, milestone, BuilderApiResponse::version, false));
}

private <T extends BuilderPayload>
Expand Down
Loading