Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.1.jar
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.1.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long)
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ public GrpcExporter build() {
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService,
grpcChannel));
grpcChannel,
// 1kb since don't do anything with responses today
1024L));
Comment thread
jack-berg marked this conversation as resolved.
Outdated
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create(
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService,
@Nullable Object managedChannel) {
@Nullable Object managedChannel,
long maxResponseBodySize) {
return new AutoValue_ImmutableGrpcSenderConfig(
endpoint,
fullMethodName,
Expand All @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create(
sslContext,
trustManager,
executorService,
managedChannel);
managedChannel,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ public HttpExporter build() {
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService));
executorService,
// 1kb since don't do anything with responses today
1024L));
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());

return new HttpExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static HttpSenderConfig create(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
return new AutoValue_ImmutableHttpSenderConfig(
endpoint,
contentType,
Expand All @@ -47,6 +48,10 @@ static HttpSenderConfig create(
retryPolicy,
sslContext,
trustManager,
executorService);
executorService,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void setUp() {
null,
null,
null,
null),
null,
Long.MAX_VALUE),
InternalTelemetryVersion.LATEST,
ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER),
MeterProvider::noop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class JdkHttpSender implements HttpSender {
private final Supplier<Map<String, List<String>>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
private final Predicate<IOException> retryExceptionPredicate;
private final long maxResponseBodySize;

// Visible for testing
JdkHttpSender(
Expand All @@ -82,7 +83,8 @@ public final class JdkHttpSender implements HttpSender {
Duration timeout,
Supplier<Map<String, List<String>>> headerSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JaegerRemoteSamplerBuilder.setMaxSamplingStrategyResponseBodySize validates bytes > 0. But the sender constructors (JdkHttpSender, OkHttpHttpSender, OkHttpGrpcSender) accept any long without
validation. A caller can bypass the guard by constructing a sender directly with 0 or -1. Consider adding the validation at the sender level too, or document the expected invariant.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the sender constructors are internal, and have a bunch of other unvalidated parameters which could be equally corrupted if a user goes around the guards. There's a conversation here discussing where to add additional null checks beyond the guarantees from nullaway. I think we should adopt a policy of adding additional null checks at the API boundaries, but trusting nullaway once we're within the walled garden of internal code. The same would apply to parameter validation, I think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... setting directly to a low value like -1 or 0 will effectively bloc data export...
see:

  if (bodyBytes.length > maxResponseBodySize) {
    throw new ResponseBodyTooLargeException(
        "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes");
  }

All bodies will be bigger that that.

this.client = client;
this.endpoint = endpoint;
this.contentType = contentType;
Expand All @@ -101,6 +103,7 @@ public final class JdkHttpSender implements HttpSender {
this.executorService = executorService;
this.managedExecutor = false;
}
this.maxResponseBodySize = maxResponseBodySize;
}

JdkHttpSender(
Expand All @@ -113,7 +116,8 @@ public final class JdkHttpSender implements HttpSender {
@Nullable RetryPolicy retryPolicy,
@Nullable ProxyOptions proxyOptions,
@Nullable SSLContext sslContext,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
this(
configureClient(sslContext, connectTimeout, proxyOptions),
endpoint,
Expand All @@ -122,7 +126,8 @@ public final class JdkHttpSender implements HttpSender {
timeout,
headerSupplier,
retryPolicy,
executorService);
executorService,
maxResponseBodySize);
}

private static ExecutorService newExecutor() {
Expand Down Expand Up @@ -287,7 +292,19 @@ private static String responseStringRepresentation(HttpResponse<?> response) {
private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
return client.send(
requestBuilder.build(),
responseInfo ->
HttpResponse.BodySubscribers.mapping(
HttpResponse.BodySubscribers.ofInputStream(),
inputStream -> {
try (inputStream) {
return inputStream.readNBytes(
(int) Math.min(maxResponseBodySize, Integer.MAX_VALUE));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) {
httpSenderConfig.getRetryPolicy(),
httpSenderConfig.getProxyOptions(),
httpSenderConfig.getSslContext(),
httpSenderConfig.getExecutorService());
httpSenderConfig.getExecutorService(),
httpSenderConfig.getMaxResponseBodySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void setup() throws IOException, InterruptedException {
Duration.ofSeconds(10),
Collections::emptyMap,
RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(),
null);
null,
Long.MAX_VALUE);
Comment thread
jack-berg marked this conversation as resolved.
}

@Test
Expand Down Expand Up @@ -121,7 +122,8 @@ void sendInternal_RetryableConnectException() throws IOException, InterruptedExc
Duration.ofSeconds(10),
Collections::emptyMap,
RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(),
null);
null,
Long.MAX_VALUE);

assertThatThrownBy(() -> sender.sendInternal(new NoOpRequestBodyWriter()))
.satisfies(
Expand Down Expand Up @@ -177,7 +179,8 @@ void connectTimeout() {
null,
null,
null,
null);
null,
Long.MAX_VALUE);

assertThat(sender)
.extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public final class OkHttpGrpcSender implements GrpcSender {
private final HttpUrl url;
@Nullable private final Compressor compressor;
private final Supplier<Map<String, List<String>>> headersSupplier;
private final long maxResponseBodySize;

/** Creates a new {@link OkHttpGrpcSender}. */
@SuppressWarnings("TooManyParameters")
Expand All @@ -95,7 +96,8 @@ public OkHttpGrpcSender(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE);
int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE);

Expand Down Expand Up @@ -133,6 +135,7 @@ public OkHttpGrpcSender(
this.compressor = compressor;
this.headersSupplier = headersSupplier;
this.url = HttpUrl.get(endpoint);
this.maxResponseBodySize = maxResponseBodySize;
}

@Override
Expand Down Expand Up @@ -169,7 +172,15 @@ public void onResponse(Call call, Response response) {
// Must consume body before accessing trailers
byte[] bodyBytes = null;
try {
bodyBytes = getResponseMessageBytes(body.bytes());
Buffer buffer = new Buffer();
while (buffer.size() < maxResponseBodySize) {
long n =
body.source().read(buffer, maxResponseBodySize - buffer.size());
if (n == -1L) {
break;
}
}
bodyBytes = getResponseMessageBytes(buffer.readByteArray());
} catch (IOException e) {
bodyBytes = new byte[0];
logger.log(Level.FINE, "Failed to read response body", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) {
grpcSenderConfig.getRetryPolicy(),
grpcSenderConfig.getSslContext(),
grpcSenderConfig.getTrustManager(),
grpcSenderConfig.getExecutorService());
grpcSenderConfig.getExecutorService(),
grpcSenderConfig.getMaxResponseBodySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.Okio;

Expand All @@ -58,6 +59,7 @@ public final class OkHttpHttpSender implements HttpSender {
private final Supplier<Map<String, List<String>>> headerSupplier;
private final MediaType mediaType;
@Nullable private final Compressor compressor;
private final long maxResponseBodySize;

/** Create a sender. */
@SuppressWarnings("TooManyParameters")
Expand All @@ -72,7 +74,8 @@ public OkHttpHttpSender(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE);
int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE);

Expand Down Expand Up @@ -111,6 +114,7 @@ public OkHttpHttpSender(
this.mediaType = MediaType.parse(contentType);
this.compressor = compressor;
this.headerSupplier = headerSupplier;
this.maxResponseBodySize = maxResponseBodySize;
}

@Override
Expand Down Expand Up @@ -160,7 +164,16 @@ public String getStatusMessage() {
public byte[] getResponseBody() {
if (bodyBytes == null) {
try {
bodyBytes = body.bytes();
Buffer buffer = new Buffer();
while (buffer.size() < maxResponseBodySize) {
Comment thread
jack-berg marked this conversation as resolved.
Outdated
long n =
body.source()
.read(buffer, maxResponseBodySize - buffer.size());
if (n == -1L) {
break;
}
}
bodyBytes = buffer.readByteArray();
} catch (IOException e) {
bodyBytes = new byte[0];
logger.log(Level.WARNING, "Failed to read response body", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) {
httpSenderConfig.getRetryPolicy(),
httpSenderConfig.getSslContext(),
httpSenderConfig.getTrustManager(),
httpSenderConfig.getExecutorService());
httpSenderConfig.getExecutorService(),
httpSenderConfig.getMaxResponseBodySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception {
null,
null,
null,
null);
null,
Long.MAX_VALUE);

CompletableResultCode sendResult = new CompletableResultCode();
sender.send(
Expand Down Expand Up @@ -132,7 +133,8 @@ void shutdown_NonManagedExecutor_ReturnsImmediately() {
null,
null,
null,
customExecutor); // Pass custom executor -> managedExecutor = false
customExecutor, // Pass custom executor -> managedExecutor = false
Long.MAX_VALUE);

CompletableResultCode shutdownResult = sender.shutdown();

Expand Down Expand Up @@ -169,7 +171,8 @@ void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exc
null,
null,
null,
null); // null executor = managed
null, // null executor = managed
Long.MAX_VALUE);

// Start multiple requests to ensure threads are busy
CountDownLatch blockCallbacks = new CountDownLatch(1);
Expand Down Expand Up @@ -231,7 +234,8 @@ void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception {
null,
null,
null,
null);
null,
Long.MAX_VALUE);

// Trigger some activity
sender.send(new TestMessageWriter(), response -> {}, error -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ OkHttpGrpcSender createSender(String endpoint) {
null,
null,
null,
null);
null,
Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ OkHttpHttpSender createSender(String endpoint) {
null,
null,
null,
null);
null,
Long.MAX_VALUE);
}
}
Loading
Loading