Skip to content
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add unmarshaling and validation for `BatchLogRecordProcessor`, `BatchSpanProcessor`, and `PeriodicMetricReader` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8049)
- Add unmarshaling and validation for `TextMapPropagator` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8052)

### Changed

- Improve performance by reducing allocations in the gRPC stats handler in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#8035)

### Removed

- Drop support for [Go 1.23]. (#7831)
Expand Down
38 changes: 25 additions & 13 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
type gRPCContextKey struct{}

type gRPCContext struct {
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
record bool
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
metricAttrSet attribute.Set
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny story: I spent the last couple of hours going through a profile from production, and came to this repo to suggest this exact optimisation!

record bool
}

type serverHandler struct {
Expand All @@ -38,8 +39,8 @@ type serverHandler struct {
tracer trace.Tracer

duration rpcconv.ServerDuration
inSize rpcconv.ServerRequestSize
outSize rpcconv.ServerResponseSize
inSize int64Hist
outSize int64Hist
inMsg rpcconv.ServerRequestsPerRPC
outMsg rpcconv.ServerResponsesPerRPC
}
Expand Down Expand Up @@ -111,9 +112,12 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
}

if record {
// Make a new slice to avoid aliasing into the same attrs slice used by metrics.
spanAttributes := make([]attribute.KeyValue, 0, len(attrs)+len(h.SpanAttributes))
spanAttributes = append(append(spanAttributes, attrs...), h.SpanAttributes...)
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(append(attrs, h.SpanAttributes...)...),
trace.WithAttributes(spanAttributes...),
}
if h.PublicEndpoint || (h.PublicEndpointFn != nil && h.PublicEndpointFn(ctx, info)) {
opts = append(opts, trace.WithNewRoot())
Expand All @@ -133,6 +137,7 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
metricAttrs: append(attrs, h.MetricAttributes...),
record: record,
}
gctx.metricAttrSet = attribute.NewSet(gctx.metricAttrs...)

return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}
Expand All @@ -157,8 +162,8 @@ type clientHandler struct {
tracer trace.Tracer

duration rpcconv.ClientDuration
inSize rpcconv.ClientResponseSize
outSize rpcconv.ClientRequestSize
inSize int64Hist
outSize int64Hist
inMsg rpcconv.ClientResponsesPerRPC
outMsg rpcconv.ClientRequestsPerRPC
}
Expand Down Expand Up @@ -219,18 +224,22 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
}

if record {
// Make a new slice to avoid aliasing into the same attrs slice used by metrics.
spanAttributes := make([]attribute.KeyValue, 0, len(attrs)+len(h.SpanAttributes))
spanAttributes = append(append(spanAttributes, attrs...), h.SpanAttributes...)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(append(attrs, h.SpanAttributes...)...),
trace.WithAttributes(spanAttributes...),
)
}

gctx := gRPCContext{
metricAttrs: append(attrs, h.MetricAttributes...),
record: record,
}
gctx.metricAttrSet = attribute.NewSet(gctx.metricAttrs...)

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.Propagators)
}
Expand Down Expand Up @@ -262,7 +271,7 @@ func (*clientHandler) HandleConn(context.Context, stats.ConnStats) {
}

type int64Hist interface {
Record(context.Context, int64, ...attribute.KeyValue)
RecordSet(context.Context, int64, attribute.Set)
}

func (c *config) handleRPC(
Expand All @@ -286,7 +295,7 @@ func (c *config) handleRPC(
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.inMessages, 1)
inSize.Record(ctx, int64(rs.Length), gctx.metricAttrs...)
inSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet)
}

if c.ReceivedEvent && span.IsRecording() {
Expand All @@ -302,7 +311,7 @@ func (c *config) handleRPC(
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.outMessages, 1)
outSize.Record(ctx, int64(rs.Length), gctx.metricAttrs...)
outSize.RecordSet(ctx, int64(rs.Length), gctx.metricAttrSet)
}

if c.SentEvent && span.IsRecording() {
Expand Down Expand Up @@ -343,6 +352,9 @@ func (c *config) handleRPC(

var metricAttrs []attribute.KeyValue
if gctx != nil {
// Don't use gctx.metricAttrSet here, because it requires passing
// multiple RecordOptions, which would call metric.mergeSets and
// allocate a new set for each Record call.
metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

metricnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"
tracenoop "go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc/peer"
Expand All @@ -29,7 +30,7 @@ func benchmarkServerHandlerHandleRPC(b *testing.B, stat stats.RPCStats) {
WithTracerProvider(trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
)),
WithMeterProvider(metricnoop.NewMeterProvider()),
WithMeterProvider(metric.NewMeterProvider()),
WithMessageEvents(ReceivedEvents, SentEvents),
)
ctx := b.Context()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -178,8 +179,8 @@ func TestNilInstruments(t *testing.T) {
h := hIface.(*serverHandler)

assert.NotPanics(t, func() { h.duration.Record(ctx, 0) }, "duration")
assert.NotPanics(t, func() { h.inSize.Record(ctx, 0) }, "inSize")
assert.NotPanics(t, func() { h.outSize.Record(ctx, 0) }, "outSize")
assert.NotPanics(t, func() { h.inSize.RecordSet(ctx, 0, *attribute.EmptySet()) }, "inSize")
assert.NotPanics(t, func() { h.outSize.RecordSet(ctx, 0, *attribute.EmptySet()) }, "outSize")
assert.NotPanics(t, func() { h.inMsg.Record(ctx, 0) }, "inMsg")
assert.NotPanics(t, func() { h.outMsg.Record(ctx, 0) }, "outMsg")
})
Expand All @@ -192,8 +193,8 @@ func TestNilInstruments(t *testing.T) {
h := hIface.(*clientHandler)

assert.NotPanics(t, func() { h.duration.Record(ctx, 0) }, "duration")
assert.NotPanics(t, func() { h.inSize.Record(ctx, 0) }, "inSize")
assert.NotPanics(t, func() { h.outSize.Record(ctx, 0) }, "outSize")
assert.NotPanics(t, func() { h.inSize.RecordSet(ctx, 0, *attribute.EmptySet()) }, "inSize")
assert.NotPanics(t, func() { h.outSize.RecordSet(ctx, 0, *attribute.EmptySet()) }, "outSize")
assert.NotPanics(t, func() { h.inMsg.Record(ctx, 0) }, "inMsg")
assert.NotPanics(t, func() { h.outMsg.Record(ctx, 0) }, "outMsg")
})
Expand Down