diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dc3df24c23..ce22af7fa7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix bad log message when key-value pairs are dropped because of key duplication in `go.opentelemetry.io/otel/sdk/log`. (#7662) - Fix `DroppedAttributes` on `Record` in `go.opentelemetry.io/otel/sdk/log` to not count the non-attribute key-value pairs dropped because of key duplication. (#7662) - Fix `SetAttributes` on `Record` in `go.opentelemetry.io/otel/sdk/log` to not log that attributes are dropped when they are actually not dropped. (#7662) +- Fix missing `request.GetBody` in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` to correctly handle HTTP2 GOAWAY frame. (#7794) diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index d0688c2016f..5af309fea09 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -270,6 +270,7 @@ func (d *client) newRequest(body []byte) (request, error) { case NoCompression: r.ContentLength = int64(len(body)) req.bodyReader = bodyReader(body) + req.GetBody = bodyReaderErr(body) case GzipCompression: // Ensure the content length is not used. r.ContentLength = -1 @@ -290,6 +291,7 @@ func (d *client) newRequest(body []byte) (request, error) { } req.bodyReader = bodyReader(b.Bytes()) + req.GetBody = bodyReaderErr(b.Bytes()) } return req, nil @@ -315,6 +317,13 @@ func bodyReader(buf []byte) func() io.ReadCloser { } } +// bodyReaderErr returns a closure returning a new reader for buf. +func bodyReaderErr(buf []byte) func() (io.ReadCloser, error) { + return func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(buf)), nil + } +} + // request wraps an http.Request with a resettable body reader. type request struct { *http.Request diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 6eabae06102..002f68daa77 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -6,9 +6,12 @@ package otlptracehttp_test import ( "context" "fmt" + "io" "net/http" + "net/http/httptest" "net/url" "strings" + "sync" "testing" "time" @@ -582,6 +585,59 @@ func TestClientInstrumentation(t *testing.T) { metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...) } +func TestGetBodyCalledOnRedirect(t *testing.T) { + // Test that req.GetBody is set correctly, allowing the HTTP transport + // to re-send the body on 307 redirects. + + var mu sync.Mutex + var requestBodies [][]byte + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + mu.Lock() + requestBodies = append(requestBodies, body) + isFirstRequest := len(requestBodies) == 1 + mu.Unlock() + + if isFirstRequest { + w.Header().Set("Location", "/v1/traces/final") + w.WriteHeader(http.StatusTemporaryRedirect) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(http.StatusOK) + }) + + server := httptest.NewServer(handler) + defer server.Close() + + client := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(server.Listener.Addr().String()), + otlptracehttp.WithInsecure(), + ) + + ctx := t.Context() + exporter, err := otlptrace.New(ctx, client) + require.NoError(t, err) + defer func() { _ = exporter.Shutdown(ctx) }() + + err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + + require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)") + assert.NotEmpty(t, requestBodies[0], "original request body should not be empty") + assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original") +} + func BenchmarkExporterExportSpans(b *testing.B) { const n = 10