diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc28098000..4311ca7ca22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/contrib/exporters/autoexport` package to provide configuration of trace exporters with useful defaults and envar support. (#2753, #4100, #4129, #4132, #4134) - `WithRouteTag` in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` adds HTTP route attribute to metrics. (#615) - Add `WithSpanOptions` option in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#3768) +- Add metrics to `NewTransport` in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#3769) ### Fixed diff --git a/instrumentation/net/http/otelhttp/common.go b/instrumentation/net/http/otelhttp/common.go index 303e5505e41..6bd7231d29a 100644 --- a/instrumentation/net/http/otelhttp/common.go +++ b/instrumentation/net/http/otelhttp/common.go @@ -37,6 +37,13 @@ const ( ServerLatency = "http.server.duration" // Incoming end to end duration, microseconds ) +// Client HTTP metrics. +const ( + ClientRequestContentLength = "http.client.request.size" // Outgoing request bytes total + ClientResponseContentLength = "http.client.response.size" // Outgoing response bytes total + ClientLatency = "http.client.duration" // Outgoing end to end duration, microseconds +) + // Filter is a predicate used to determine whether a given http.request should // be traced. A Filter must return true if the request should be traced. type Filter func(*http.Request) bool diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index 123365caed1..f3b51a58d2a 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -216,7 +216,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http next.ServeHTTP(w, r.WithContext(ctx)) - setAfterServeAttributes(span, bw.read, rww.written, rww.statusCode, bw.err, rww.err) + setAfterServeAttributes(span, bw.read.Load(), rww.written, rww.statusCode, bw.err, rww.err) // Add metrics attributes := append(labeler.Get(), semconvutil.HTTPServerRequest(h.server, r)...) @@ -224,7 +224,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http attributes = append(attributes, semconv.HTTPStatusCode(rww.statusCode)) } o := metric.WithAttributes(attributes...) - h.counters[RequestContentLength].Add(ctx, bw.read, o) + h.counters[RequestContentLength].Add(ctx, bw.read.Load(), o) h.counters[ResponseContentLength].Add(ctx, rww.written, o) // Use floating point division here for higher precision (instead of Millisecond method). diff --git a/instrumentation/net/http/otelhttp/test/transport_test.go b/instrumentation/net/http/otelhttp/test/transport_test.go index e9a62034aeb..ad3c810aa84 100644 --- a/instrumentation/net/http/otelhttp/test/transport_test.go +++ b/instrumentation/net/http/otelhttp/test/transport_test.go @@ -16,11 +16,14 @@ package test import ( "context" + "fmt" "io" + "net" "net/http" "net/http/httptest" "net/http/httptrace" "runtime" + "strconv" "strings" "testing" @@ -28,10 +31,16 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" ) @@ -238,3 +247,100 @@ func TestWithHTTPTrace(t *testing.T) { assert.Equal(t, spans[2].SpanContext().SpanID(), spans[0].Parent().SpanID()) assert.Equal(t, spans[1].SpanContext().SpanID(), spans[2].Parent().SpanID()) } + +func TestTransportMetrics(t *testing.T) { + reader := metric.NewManualReader() + meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) + + content := []byte("Hello, world!") + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if _, err := w.Write(content); err != nil { + t.Fatal(err) + } + })) + defer ts.Close() + + r, err := http.NewRequest(http.MethodGet, ts.URL, nil) + if err != nil { + t.Fatal(err) + } + + tr := otelhttp.NewTransport( + http.DefaultTransport, + otelhttp.WithMeterProvider(meterProvider), + ) + + c := http.Client{Transport: tr} + res, err := c.Do(r) + if err != nil { + t.Fatal(err) + } + require.NoError(t, res.Body.Close()) + + host, portStr, _ := net.SplitHostPort(r.Host) + if host == "" { + host = "127.0.0.1" + } + port, err := strconv.Atoi(portStr) + if err != nil { + port = 0 + } + + rm := metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + attrs := attribute.NewSet( + semconv.NetPeerName(host), + semconv.NetPeerPort(port), + semconv.HTTPURL(ts.URL), + semconv.HTTPFlavorKey.String(fmt.Sprintf("1.%d", r.ProtoMinor)), + semconv.HTTPMethod("GET"), + semconv.HTTPResponseContentLength(13), + semconv.HTTPStatusCode(200), + ) + assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs) +} + +func assertClientScopeMetrics(t *testing.T, sm metricdata.ScopeMetrics, attrs attribute.Set) { + assert.Equal(t, instrumentation.Scope{ + Name: "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp", + Version: otelhttp.Version(), + }, sm.Scope) + + require.Len(t, sm.Metrics, 3) + + want := metricdata.Metrics{ + Name: "http.client.request_content_length", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: attrs, Value: 0}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + } + metricdatatest.AssertEqual(t, want, sm.Metrics[0], metricdatatest.IgnoreTimestamp()) + + want = metricdata.Metrics{ + Name: "http.client.response_content_length", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: attrs, Value: 13}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + } + metricdatatest.AssertEqual(t, want, sm.Metrics[1], metricdatatest.IgnoreTimestamp()) + + // Duration value is not predictable. + dur := sm.Metrics[2] + assert.Equal(t, "http.client.duration", dur.Name) + require.IsType(t, dur.Data, metricdata.Histogram[float64]{}) + hist := dur.Data.(metricdata.Histogram[float64]) + assert.Equal(t, metricdata.CumulativeTemporality, hist.Temporality) + require.Len(t, hist.DataPoints, 1) + dPt := hist.DataPoints[0] + assert.Equal(t, attrs, dPt.Attributes, "attributes") + assert.Equal(t, uint64(1), dPt.Count, "count") + assert.Equal(t, []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, dPt.Bounds, "bounds") +} diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index e835cac12e4..cb2c1407331 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -19,10 +19,13 @@ import ( "io" "net/http" "net/http/httptrace" + "time" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -32,12 +35,18 @@ import ( type Transport struct { rt http.RoundTripper - tracer trace.Tracer - propagators propagation.TextMapPropagator - spanStartOptions []trace.SpanStartOption - filters []Filter - spanNameFormatter func(string, *http.Request) string - clientTrace func(context.Context) *httptrace.ClientTrace + tracer trace.Tracer + meter metric.Meter + propagators propagation.TextMapPropagator + spanStartOptions []trace.SpanStartOption + readEvent bool + filters []Filter + spanNameFormatter func(string, *http.Request) string + clientTrace func(context.Context) *httptrace.ClientTrace + getRequestAttributes func(*http.Request) []attribute.KeyValue + getResponseAttributes func(response *http.Response) []attribute.KeyValue + counters map[string]metric.Int64Counter + valueRecorders map[string]metric.Float64Histogram } var _ http.RoundTripper = &Transport{} @@ -63,14 +72,17 @@ func NewTransport(base http.RoundTripper, opts ...Option) *Transport { c := newConfig(append(defaultOpts, opts...)...) t.applyConfig(c) + t.createMeasures() return &t } func (t *Transport) applyConfig(c *config) { t.tracer = c.Tracer + t.meter = c.Meter t.propagators = c.Propagators t.spanStartOptions = c.SpanStartOptions + t.readEvent = c.ReadEvent t.filters = c.Filters t.spanNameFormatter = c.SpanNameFormatter t.clientTrace = c.ClientTrace @@ -80,10 +92,29 @@ func defaultTransportFormatter(_ string, r *http.Request) string { return "HTTP " + r.Method } +func (t *Transport) createMeasures() { + t.counters = make(map[string]metric.Int64Counter) + t.valueRecorders = make(map[string]metric.Float64Histogram) + + requestBytesCounter, err := t.meter.Int64Counter(ClientRequestContentLength) + handleErr(err) + + responseBytesCounter, err := t.meter.Int64Counter(ClientResponseContentLength) + handleErr(err) + + clientLatencyMeasure, err := t.meter.Float64Histogram(ClientLatency) + handleErr(err) + + t.counters[ClientRequestContentLength] = requestBytesCounter + t.counters[ClientResponseContentLength] = responseBytesCounter + t.valueRecorders[ClientLatency] = clientLatencyMeasure +} + // RoundTrip creates a Span and propagates its context via the provided request's headers // before handing the request to the configured base RoundTripper. The created span will // end when the response body is closed or when a read from the body returns io.EOF. func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { + requestStartTime := time.Now() for _, f := range t.filters { if !f(r) { // Simply pass through to the base RoundTripper if a filter rejects the request @@ -109,8 +140,28 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx)) } + readRecordFunc := func(int64) {} + if t.readEvent { + readRecordFunc = func(n int64) { + span.AddEvent("read", trace.WithAttributes(ReadBytesKey.Int64(n))) + } + } + + var bw bodyWrapper + // if request body is nil or NoBody, we don't want to mutate the body as it + // will affect the identity of it in an unforeseeable way because we assert + // ReadCloser fulfills a certain interface, and it is indeed nil or NoBody. + if r.Body != nil && r.Body != http.NoBody { + bw.ReadCloser = r.Body + bw.record = readRecordFunc + r.Body = &bw + } + r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request. span.SetAttributes(semconvutil.HTTPClientRequest(r)...) + if t.getRequestAttributes != nil { + span.SetAttributes(t.getRequestAttributes(r)...) + } t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header)) res, err := t.rt.RoundTrip(r) @@ -118,12 +169,35 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) span.End() - return res, err + } else { + span.SetAttributes(semconvutil.HTTPClientResponse(res)...) + if t.getResponseAttributes != nil { + span.SetAttributes(t.getResponseAttributes(res)...) + } + span.SetStatus(semconvutil.HTTPClientStatus(res.StatusCode)) + res.Body = newWrappedBody(span, res.Body) + } + + // Add metrics + attributes := semconvutil.HTTPClientRequest(r) + if t.getRequestAttributes != nil { + attributes = append(attributes, t.getRequestAttributes(r)...) + } + if err == nil { + attributes = append(attributes, semconvutil.HTTPClientResponse(res)...) + if t.getResponseAttributes != nil { + attributes = append(attributes, t.getResponseAttributes(res)...) + } + } + o := metric.WithAttributes(attributes...) + t.counters[ClientRequestContentLength].Add(ctx, bw.read.Load(), o) + if err == nil { + t.counters[ClientResponseContentLength].Add(ctx, res.ContentLength, o) } - span.SetAttributes(semconvutil.HTTPClientResponse(res)...) - span.SetStatus(semconvutil.HTTPClientStatus(res.StatusCode)) - res.Body = newWrappedBody(span, res.Body) + // Use floating point division here for higher precision (instead of Millisecond method). + elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond) + t.valueRecorders[ClientLatency].Record(ctx, elapsedTime, o) return res, err } diff --git a/instrumentation/net/http/otelhttp/wrap.go b/instrumentation/net/http/otelhttp/wrap.go index 11a35ed167f..2852ec97171 100644 --- a/instrumentation/net/http/otelhttp/wrap.go +++ b/instrumentation/net/http/otelhttp/wrap.go @@ -18,6 +18,7 @@ import ( "context" "io" "net/http" + "sync/atomic" "go.opentelemetry.io/otel/propagation" ) @@ -30,14 +31,14 @@ type bodyWrapper struct { io.ReadCloser record func(n int64) // must not be nil - read int64 + read atomic.Int64 err error } func (w *bodyWrapper) Read(b []byte) (int, error) { n, err := w.ReadCloser.Read(b) n1 := int64(n) - w.read += n1 + w.read.Add(n1) w.err = err w.record(n1) return n, err