From 223d5d83b92f5e01b64347876463990ca2db0613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Tue, 7 Nov 2023 17:45:24 +0100 Subject: [PATCH 1/4] otelgrpc: StreamClientInterceptor ends spans synchronously --- .../grpc/otelgrpc/interceptor.go | 91 ++++--------------- 1 file changed, 20 insertions(+), 71 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go index c737d29dcd0..2f22f0e2bed 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go @@ -123,27 +123,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor { } } -type streamEventType int - -type streamEvent struct { - Type streamEventType - Err error -} - -const ( - receiveEndEvent streamEventType = iota - errorEvent -) - // clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and // SendMsg method call. type clientStream struct { grpc.ClientStream + desc *grpc.StreamDesc - desc *grpc.StreamDesc - events chan streamEvent - eventsDone chan struct{} - finished chan error + span trace.Span receivedEvent bool sentEvent bool @@ -158,11 +144,11 @@ func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) if err == nil && !w.desc.ServerStreams { - w.sendStreamEvent(receiveEndEvent, nil) + w.endSpan(nil) } else if err == io.EOF { - w.sendStreamEvent(receiveEndEvent, nil) + w.endSpan(nil) } else if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } else { w.receivedMessageID++ @@ -184,7 +170,7 @@ func (w *clientStream) SendMsg(m interface{}) error { } if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return err @@ -193,7 +179,7 @@ func (w *clientStream) SendMsg(m interface{}) error { func (w *clientStream) Header() (metadata.MD, error) { md, err := w.ClientStream.Header() if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return md, err @@ -202,54 +188,32 @@ func (w *clientStream) Header() (metadata.MD, error) { func (w *clientStream) CloseSend() error { err := w.ClientStream.CloseSend() if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return err } -func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream { - events := make(chan streamEvent) - eventsDone := make(chan struct{}) - finished := make(chan error) - - go func() { - defer close(eventsDone) - - for { - select { - case event := <-events: - switch event.Type { - case receiveEndEvent: - finished <- nil - return - case errorEvent: - finished <- event.Err - return - } - case <-ctx.Done(): - finished <- ctx.Err() - return - } - } - }() - +func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream { return &clientStream{ ClientStream: s, + span: span, desc: desc, - events: events, - eventsDone: eventsDone, - finished: finished, receivedEvent: cfg.ReceivedEvent, sentEvent: cfg.SentEvent, } } -func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { - select { - case <-w.eventsDone: - case w.events <- streamEvent{Type: eventType, Err: err}: +func (w *clientStream) endSpan(err error) { + if err != nil { + s, _ := status.FromError(err) + w.span.SetStatus(codes.Error, s.Message()) + w.span.SetAttributes(statusCodeAttr(s.Code())) + } else { + w.span.SetAttributes(statusCodeAttr(grpc_codes.OK)) } + + w.span.End() } // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable @@ -302,22 +266,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { span.End() return s, err } - stream := wrapClientStream(ctx, s, desc, cfg) - - go func() { - err := <-stream.finished - - if err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(statusCodeAttr(s.Code())) - } else { - span.SetAttributes(statusCodeAttr(grpc_codes.OK)) - } - - span.End() - }() - + stream := wrapClientStream(ctx, s, desc, span, cfg) return stream, nil } } From 271aec73fb24602fa8ec45a55ad8af7c8d18f4b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Tue, 7 Nov 2023 17:48:25 +0100 Subject: [PATCH 2/4] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5741c65439e..468e2cfc706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045) - The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now sets gRPC status code correctly for the `rpc.server.duration` metric. (#4481) +- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537) ## [1.20.0/0.45.0/0.14.0] - 2023-09-28 From b6316b2d44c1e2f7881930ef56cc0bd31dc5a4b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Wed, 8 Nov 2023 15:58:14 +0100 Subject: [PATCH 3/4] Update grpc_test.go --- .../google.golang.org/grpc/otelgrpc/test/grpc_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index d1837ee0bf1..6d8a82676ba 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -19,7 +19,6 @@ import ( "net" "strconv" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -137,12 +136,6 @@ func TestInterceptors(t *testing.T) { }) t.Run("StreamClientSpans", func(t *testing.T) { - // StreamClientInterceptor ends the spans asynchronously. - // We need to wait for all spans before asserting them. - require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Len(c, clientStreamSR.Ended(), 3) - }, 5*time.Second, 100*time.Millisecond) - checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String()) }) From 5763eb3dd46bee45c98cbaf1e118a6b01bbbc58d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 10 Nov 2023 12:39:27 +0100 Subject: [PATCH 4/4] Add BenchmarkStreamClientInterceptor --- .../otelgrpc/test/grpc_stats_handler_test.go | 5 ++-- .../grpc/otelgrpc/test/grpc_test.go | 27 +++++++++---------- .../grpc/otelgrpc/test/interceptor_test.go | 18 +++++++++++++ 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 6768dfcb4d6..e6fd212f904 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -49,8 +49,7 @@ func TestStatsHandler(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to open port") - err = newGrpcTest( - listener, + client := newGrpcTest(t, listener, []grpc.DialOption{ grpc.WithStatsHandler(otelgrpc.NewClientHandler( otelgrpc.WithTracerProvider(clientTP), @@ -66,7 +65,7 @@ func TestStatsHandler(t *testing.T) { ), }, ) - require.NoError(t, err) + doCalls(client) t.Run("ClientSpans", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 9c99abf4b02..faec8107016 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -45,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{ Version: otelgrpc.Version(), } -// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down. -func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { +// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup. +func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient { grpcServer := grpc.NewServer(sOpt...) pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer()) errCh := make(chan error) go func() { errCh <- grpcServer.Serve(listener) }() + t.Cleanup(func() { + grpcServer.Stop() + assert.NoError(t, <-errCh) + }) ctx := context.Background() cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -67,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv listener.Addr().String(), cOpt..., ) - if err != nil { - return err - } - client := pb.NewTestServiceClient(conn) - - doCalls(client) - - conn.Close() - grpcServer.Stop() + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, conn.Close()) + }) - return <-errCh + return pb.NewTestServiceClient(conn) } func doCalls(client pb.TestServiceClient) { @@ -105,7 +104,7 @@ func TestInterceptors(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to open port") - err = newGrpcTest(listener, + client := newGrpcTest(t, listener, []grpc.DialOption{ //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release. grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor( @@ -132,7 +131,7 @@ func TestInterceptors(t *testing.T) { )), }, ) - require.NoError(t, err) + doCalls(client) t.Run("UnaryClientSpans", func(t *testing.T) { checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String()) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go index 400f970dba0..e4714211caa 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc" grpc_codes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/interop" "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -1128,3 +1129,20 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s require.Len(t, rm.ScopeMetrics, 1) metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } + +func BenchmarkStreamClientInterceptor(b *testing.B) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(b, err, "failed to open port") + client := newGrpcTest(b, listener, + []grpc.DialOption{ + //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release. + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + }, + []grpc.ServerOption{}, + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + interop.DoClientStreaming(client) + } +}