Skip to content

Commit

Permalink
Retry ConnectException, add retry logging (#6614)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Aug 9, 2024
1 parent e2936d4 commit 910c7cc
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.exporter.sender.jdk.internal;

import static java.util.stream.Collectors.joining;

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
Expand All @@ -25,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -33,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
Expand All @@ -52,6 +57,8 @@ public final class JdkHttpSender implements HttpSender {
private static final ThreadLocal<ByteBufferPool> threadLocalByteBufPool =
ThreadLocal.withInitial(ByteBufferPool::new);

private static final Logger logger = Logger.getLogger(JdkHttpSender.class.getName());

private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final HttpClient client;
private final URI uri;
Expand Down Expand Up @@ -211,11 +218,37 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
exception = e;
}

if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) {
return httpResponse;
if (httpResponse != null) {
boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode());
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
"Attempt "
+ attempt
+ " returned "
+ (retryable ? "retryable" : "non-retryable")
+ " response: "
+ responseStringRepresentation(httpResponse));
}
if (!retryable) {
return httpResponse;
}
}
if (exception != null && !isRetryableException(exception)) {
throw exception;
if (exception != null) {
boolean retryable = isRetryableException(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
"Attempt "
+ attempt
+ " failed with "
+ (retryable ? "retryable" : "non-retryable")
+ " exception",
exception);
}
if (!retryable) {
throw exception;
}
}
} while (attempt < retryPolicy.getMaxAttempts());

Expand All @@ -225,6 +258,17 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
throw exception;
}

private static String responseStringRepresentation(HttpResponse<?> response) {
StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}");
joiner.add("code=" + response.statusCode());
joiner.add(
"headers="
+ response.headers().map().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(joining(",", "[", "]")));
return joiner.toString();
}

private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.time.Duration;
Expand Down Expand Up @@ -53,8 +55,8 @@ void setup() throws IOException, InterruptedException {
sender =
new JdkHttpSender(
mockHttpClient,
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
// Connecting to a non-routable IP address to trigger connection timeout
"http://10.255.255.1",
null,
false,
"text/plain",
Expand All @@ -74,6 +76,44 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
verify(mockHttpClient, times(2)).send(any(), any());
}

@Test
void sendInternal_RetryableConnectException() throws IOException, InterruptedException {
sender =
new JdkHttpSender(
mockHttpClient,
// Connecting to localhost on an unused port address to trigger
// java.net.ConnectException (or java.net.http.HttpConnectTimeoutException on linux java
// 11+)
"http://localhost:" + freePort(),
null,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
RetryPolicy.builder()
.setMaxAttempts(2)
.setInitialBackoff(Duration.ofMillis(1))
.build());

assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.satisfies(
e ->
assertThat(
(e instanceof ConnectException)
|| (e instanceof HttpConnectTimeoutException))
.isTrue());

verify(mockHttpClient, times(2)).send(any(), any());
}

private static int freePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import static java.util.stream.Collectors.joining;

import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Locale;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Interceptor;
import okhttp3.Response;

Expand All @@ -23,6 +29,8 @@
*/
public final class RetryInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(RetryInterceptor.class.getName());

private final RetryPolicy retryPolicy;
private final Function<Response, Boolean> isRetryable;
private final Function<IOException, Boolean> isRetryableException;
Expand Down Expand Up @@ -84,12 +92,39 @@ public Response intercept(Chain chain) throws IOException {
} catch (IOException e) {
exception = e;
}
if (response != null && !Boolean.TRUE.equals(isRetryable.apply(response))) {
return response;
if (response != null) {
boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response));
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
"Attempt "
+ attempt
+ " returned "
+ (retryable ? "retryable" : "non-retryable")
+ " response: "
+ responseStringRepresentation(response));
}
if (!retryable) {
return response;
}
}
if (exception != null && !Boolean.TRUE.equals(isRetryableException.apply(exception))) {
throw exception;
if (exception != null) {
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
"Attempt "
+ attempt
+ " failed with "
+ (retryable ? "retryable" : "non-retryable")
+ " exception",
exception);
}
if (!retryable) {
throw exception;
}
}

} while (attempt < retryPolicy.getMaxAttempts());

if (response != null) {
Expand All @@ -98,15 +133,30 @@ public Response intercept(Chain chain) throws IOException {
throw exception;
}

private static String responseStringRepresentation(Response response) {
StringJoiner joiner = new StringJoiner(",", "Response{", "}");
joiner.add("code=" + response.code());
joiner.add(
"headers="
+ response.headers().toMultimap().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(joining(",", "[", "]")));
return joiner.toString();
}

// Visible for testing
static boolean isRetryableException(IOException e) {
if (!(e instanceof SocketTimeoutException)) {
return false;
if (e instanceof SocketTimeoutException) {
String message = e.getMessage();
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect
// timed out"
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
} else if (e instanceof ConnectException) {
// Exceptions resemble: java.net.ConnectException: Failed to connect to
// localhost/[0:0:0:0:0:0:0:1]:62611
return true;
}
String message = e.getMessage();
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect timed
// out"
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
return false;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -157,6 +159,34 @@ void connectTimeout() throws Exception {
verify(sleeper, times(4)).sleep(anyLong());
}

@Test
void connectException() throws Exception {
client = connectTimeoutClient();
when(random.get(anyLong())).thenReturn(1L);
doNothing().when(sleeper).sleep(anyLong());

// Connecting to localhost on an unused port address to trigger java.net.ConnectException
int openPort = freePort();
assertThatThrownBy(
() ->
client
.newCall(new Request.Builder().url("http://localhost:" + openPort).build())
.execute())
.isInstanceOf(ConnectException.class);

verify(isRetryableException, times(5)).apply(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}

private static int freePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
Expand Down

0 comments on commit 910c7cc

Please sign in to comment.