diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md index 569844dca56..225c802623e 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/CHANGELOG.md @@ -7,6 +7,10 @@ Notes](../../RELEASENOTES.md). ## Unreleased +* Limit how much of the response body is read when export fails and + error logging is enabled. + ([#7017](https://github.com/open-telemetry/opentelemetry-dotnet/pull/7017)) + ## 1.15.1 Released 2026-Mar-27 diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs index 88a52c629a5..2d66a1dc463 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpExportClient.cs @@ -4,7 +4,11 @@ #if NETFRAMEWORK using System.Net.Http; #endif +#if NET +using System.Buffers; +#endif using System.Net.Http.Headers; +using System.Text; using OpenTelemetry.Internal; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; @@ -65,9 +69,9 @@ public bool Shutdown(int timeoutMilliseconds) return true; } - protected static string? TryGetResponseBody(HttpResponseMessage? httpResponse, CancellationToken cancellationToken) + protected internal static string? TryGetResponseBody(HttpResponseMessage? httpResponse, CancellationToken cancellationToken) { - if (httpResponse?.Content == null) + if (httpResponse?.Content == null || cancellationToken.IsCancellationRequested) { return null; } @@ -76,16 +80,95 @@ public bool Shutdown(int timeoutMilliseconds) { #if NET var stream = httpResponse.Content.ReadAsStream(cancellationToken); - using var reader = new StreamReader(stream); - return reader.ReadToEnd(); #else - return httpResponse.Content.ReadAsStringAsync().GetAwaiter().GetResult(); + var stream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult(); +#endif + + // See https://github.com/open-telemetry/opentelemetry-proto/pull/781 + const int MessageSizeLimit = 4 * 1024 * 1024; // 4MiB + + var length = GetBufferLength(stream, MessageSizeLimit); + +#if NET + var buffer = ArrayPool.Shared.Rent(length); +#else + var buffer = new byte[length]; +#endif + + string result; + + try + { + var count = 0; + + // Read raw bytes so the size limit applies to bytes rather than characters + while (count < length && !cancellationToken.IsCancellationRequested) + { + var read = stream.Read(buffer, count, length - count); + + if (read is 0) + { + break; + } + + count += read; + } + + // Decode using the charset from the response content headers, if available + var encoding = GetEncoding(httpResponse.Content.Headers.ContentType?.CharSet); + result = encoding.GetString(buffer, 0, count); + + if (result.Length is MessageSizeLimit) + { + result += "[TRUNCATED]"; + } + } + finally + { +#if NET + ArrayPool.Shared.Return(buffer); #endif + } + + return result; } catch (Exception) { return null; } + + static int GetBufferLength(Stream stream, int limit) + { + try + { + // Avoid allocating an overly large buffer if the stream is smaller than the size limit + return stream.Length < limit ? (int)stream.Length : limit; + } + catch (Exception) + { + // Not all Stream types support Length, so default to the maximum + return limit; + } + } + + static Encoding GetEncoding(string? name) + { + Encoding encoding = Encoding.UTF8; + + if (!string.IsNullOrWhiteSpace(name)) + { + try + { + encoding = Encoding.GetEncoding(name); + } + catch (Exception) + { + // Invalid encoding name + } + } + + return encoding; + } } protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength) @@ -114,16 +197,14 @@ protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength) return request; } - protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) - { + protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) => #if NET // Note: SendAsync must be used with HTTP/2 because synchronous send is // not supported. - return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform + this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform ? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult() : this.HttpClient.Send(request, cancellationToken); #else - return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); + this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); #endif - } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs index 521575cb7c8..b0678278625 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcExportClient.cs @@ -4,6 +4,7 @@ #if NETFRAMEWORK using System.Net.Http; #endif +using System.Diagnostics.Tracing; using System.Net.Http.Headers; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc; @@ -123,8 +124,12 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten catch (HttpRequestException ex) { // Handle non-retryable HTTP errors. - var response = TryGetResponseBody(httpResponse, cancellationToken); - OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, response, ex); + if (OpenTelemetryProtocolExporterEventSource.Log.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + var response = TryGetResponseBody(httpResponse, cancellationToken); + OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, response, ex); + } + return new ExportClientGrpcResponse( success: false, deadlineUtc: deadlineUtc, @@ -165,12 +170,10 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten } } - private static bool IsTransientNetworkError(HttpRequestException ex) - { - return ex.InnerException is System.Net.Sockets.SocketException socketEx - && (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut - || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset - || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable - || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionRefused); - } + private static bool IsTransientNetworkError(HttpRequestException ex) => + ex.InnerException is System.Net.Sockets.SocketException socketEx + && (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut + || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset + || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable + || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionRefused); } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs index dfff80fffcb..b6980966674 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpExportClient.cs @@ -4,6 +4,7 @@ #if NETFRAMEWORK using System.Net.Http; #endif +using System.Diagnostics.Tracing; using System.Net.Http.Headers; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; @@ -35,8 +36,12 @@ public override ExportClientResponse SendExportRequest(byte[] buffer, int conten } catch (HttpRequestException ex) { - var response = TryGetResponseBody(httpResponse, cancellationToken); - OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, response, ex); + if (OpenTelemetryProtocolExporterEventSource.Log.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + var response = TryGetResponseBody(httpResponse, cancellationToken); + OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, response, ex); + } + return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex); } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExportClientTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExportClientTests.cs new file mode 100644 index 00000000000..25a971aba32 --- /dev/null +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExportClientTests.cs @@ -0,0 +1,307 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#if NETFRAMEWORK +using System.Net.Http; +#endif +using System.IO.Compression; +using System.Net; +using System.Text; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests; + +public class OtlpExportClientTests +{ + private const int MessageSizeLimit = 4 * 1024 * 1024; + private const string TruncationSuffix = "[TRUNCATED]"; + + [Fact] + public void TryGetResponseBody_NullHttpResponse_ReturnsNull() + { + // Arrange + HttpResponseMessage? httpResponse = null; + var cancellationToken = CancellationToken.None; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Null(actual); + } + + [Fact] + public void TryGetResponseBody_HttpResponseWithoutContent_ReturnsCorrectResult() + { + // Arrange + using var httpResponse = new HttpResponseMessage() { Content = null }; + var cancellationToken = CancellationToken.None; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert +#if NETFRAMEWORK + Assert.Null(actual); +#else + Assert.Equal(string.Empty, actual); +#endif + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(256)] + [InlineData(1024)] + [InlineData((30 * 1024) - 1)] + [InlineData(30 * 1024)] + public void TryGetResponseBody_SmallContent_ReturnsFullContent(int length) + { + // Arrange + var expected = new string('A', length); + var cancellationToken = CancellationToken.None; + + using var httpResponse = new HttpResponseMessage() + { + Content = new StringContent(expected), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Equal(expected, actual); + } + + [Theory] + [InlineData(1)] + [InlineData(1024)] + [InlineData(2048)] + public void TryGetResponseBody_ContentExceedsLimit_ReturnsTruncatedContent(int excess) + { + // Arrange + var content = new string('C', MessageSizeLimit + excess); + var cancellationToken = CancellationToken.None; + + using var httpResponse = new HttpResponseMessage() + { + Content = new StringContent(content), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.NotNull(actual); + Assert.Equal(MessageSizeLimit + TruncationSuffix.Length, actual.Length); + Assert.Equal(new string('C', MessageSizeLimit) + TruncationSuffix, actual); + } + + [Theory] + [InlineData(1)] + [InlineData(1024)] + [InlineData(2048)] + public void TryGetResponseBody_DecompressedContentExceedsLimit_ReturnsTruncatedContent(int excess) + { + // Arrange + var content = new string('G', MessageSizeLimit + excess); + var rawBytes = Encoding.UTF8.GetBytes(content); + var cancellationToken = CancellationToken.None; + + using var memoryStream = new MemoryStream(); + + using (var compressor = new GZipStream(memoryStream, CompressionMode.Compress, leaveOpen: true)) + { + compressor.Write(rawBytes, 0, rawBytes.Length); + } + + memoryStream.Seek(0, SeekOrigin.Begin); + + Assert.True( + memoryStream.Length < MessageSizeLimit, + $"The compressed message length {memoryStream.Length} is not less than {MessageSizeLimit}."); + + using var compressed = new GZipStream(memoryStream, CompressionMode.Decompress, leaveOpen: true); + + using var httpResponse = new HttpResponseMessage() + { + Content = new StreamContent(compressed), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.NotNull(actual); + Assert.Equal(MessageSizeLimit + TruncationSuffix.Length, actual.Length); + Assert.Equal(new string('G', MessageSizeLimit) + TruncationSuffix, actual); + } + + [Fact] + public void TryGetResponseBody_EmptyContent_ReturnsEmptyString() + { + // Arrange + var cancellationToken = CancellationToken.None; + + using var httpResponse = new HttpResponseMessage() + { + Content = new StringContent(string.Empty), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Equal(string.Empty, actual); + } + + [Fact] + public void TryGetResponseBody_ExceptionDuringRead_ReturnsNull() + { + // Arrange + var cancellationToken = CancellationToken.None; + + using var httpResponse = new HttpResponseMessage() + { + Content = new ThrowingHttpContent(), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Null(actual); + } + + [Fact] + public void TryGetResponseBody_CancellationTokenSignalled_ReturnsNull() + { + // Arrange + var cancellationToken = new CancellationToken(canceled: true); + + using var httpResponse = new HttpResponseMessage() + { + Content = new StringContent("foo"), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Null(actual); + } + + [Fact] + public void TryGetResponseBody_NonSeekableStream_ReturnsContent() + { + // Arrange + var expected = "non-seekable response body"; + var cancellationToken = CancellationToken.None; + + using var httpResponse = new HttpResponseMessage() + { + Content = new NonSeekableStreamContent(Encoding.UTF8.GetBytes(expected)), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Equal(expected, actual); + } + + [Fact] + public void TryGetResponseBody_NonUtf8Charset_ReturnsCorrectlyDecodedContent() + { + // Arrange + var expected = "iso-8859-1 response body: caf\u00e9"; + var cancellationToken = CancellationToken.None; + var iso8859 = Encoding.GetEncoding("iso-8859-1"); + + using var httpResponse = new HttpResponseMessage() + { + Content = new StringContent(expected, iso8859), + }; + + // Act + var actual = OtlpExportClient.TryGetResponseBody(httpResponse, cancellationToken); + + // Assert + Assert.Equal(expected, actual); + } + + private sealed class ThrowingHttpContent : HttpContent + { + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) + => throw new InvalidOperationException("Test exception"); + +#if NET + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken) + => throw new InvalidOperationException("Test exception"); +#endif + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + + private sealed class NonSeekableStreamContent : HttpContent + { + private readonly byte[] data; + + public NonSeekableStreamContent(byte[] data) + { + this.data = data; + this.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("text/plain") { CharSet = "utf-8" }; + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) + => stream.WriteAsync(this.data, 0, this.data.Length); + +#if NET + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken) + => stream.WriteAsync(this.data, cancellationToken).AsTask(); + + protected override Stream CreateContentReadStream(CancellationToken cancellationToken) + => new NonSeekableStream(new MemoryStream(this.data)); +#endif + + protected override Task CreateContentReadStreamAsync() + => Task.FromResult(new NonSeekableStream(new MemoryStream(this.data))); + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + + private sealed class NonSeekableStream(Stream inner) : Stream + { + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() => inner.Flush(); + + public override int Read(byte[] buffer, int offset, int count) => inner.Read(buffer, offset, count); + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +}