diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a10ca26a5b..9cca0fec7bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Return spec-compliant `TraceIdRatioBased` description. This is a breaking behavioral change, but it is necessary to make the implementation [spec-compliant](https://opentelemetry.io/docs/specs/otel/trace/sdk/#traceidratiobased). (#8027) - Fix a race condition in `go.opentelemetry.io/otel/sdk/metric` where the lastvalue aggregation could collect the value 0 even when no zero-value measurements were recorded. (#8056) +- Fix missing `request.GetBody` in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp` to correctly handle HTTP2 GOAWAY frame. (#8096) diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index c2dcb8a39f1..2a25e3db4bb 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -263,6 +263,7 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro case NoCompression: r.ContentLength = int64(len(body)) req.bodyReader = bodyReader(body) + req.GetBody = bodyReaderErr(body) case GzipCompression: // Ensure the content length is not used. r.ContentLength = -1 @@ -283,6 +284,7 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro } req.bodyReader = bodyReader(b.Bytes()) + req.GetBody = bodyReaderErr(body) } return req, nil @@ -295,6 +297,13 @@ func bodyReader(buf []byte) func() io.ReadCloser { } } +// bodyReaderErr returns a closure returning a new reader for buf. +func bodyReaderErr(buf []byte) func() (io.ReadCloser, error) { + return func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(buf)), nil + } +} + // request wraps an http.Request with a resettable body reader. type request struct { *http.Request diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index d7141c3690c..c66c99745fb 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -20,6 +20,7 @@ import ( "math/big" "net" "net/http" + "net/http/httptest" "net/url" "strings" "sync" @@ -833,6 +834,58 @@ func TestConfig(t *testing.T) { }) } +func TestGetBodyCalledOnRedirect(t *testing.T) { + // Test that req.GetBody is set correctly, allowing the HTTP transport + // to re-send the body on 307 redirects. + var mu sync.Mutex + var requestBodies [][]byte + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + mu.Lock() + requestBodies = append(requestBodies, body) + isFirstRequest := len(requestBodies) == 1 + mu.Unlock() + + if isFirstRequest { + w.Header().Set("Location", "/v1/logs/final") + w.WriteHeader(http.StatusTemporaryRedirect) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(http.StatusOK) + }) + + server := httptest.NewServer(handler) + defer server.Close() + + opts := []Option{WithEndpoint(server.Listener.Addr().String()), WithInsecure()} + cfg := newConfig(opts) + client, err := newHTTPClient(t.Context(), cfg) + require.NoError(t, err) + + exporter, err := newExporter(client, cfg) + require.NoError(t, err) + ctx := t.Context() + defer func() { _ = exporter.Shutdown(ctx) }() + + err = exporter.Export(ctx, make([]log.Record, 1)) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + + require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)") + assert.NotEmpty(t, requestBodies[0], "original request body should not be empty") + assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original") +} + // SetExporterID sets the exporter ID counter to v and returns the previous // value. //