From e6d23f39294831bef78c3d0796f396d4d3ddf34d Mon Sep 17 00:00:00 2001 From: Alan Parra Date: Thu, 1 Feb 2024 11:22:15 -0300 Subject: [PATCH 1/4] Verify that intercepted stream Sends wrap io.EOF --- api/utils/grpc/interceptors/errors_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/api/utils/grpc/interceptors/errors_test.go b/api/utils/grpc/interceptors/errors_test.go index ffa0d28356e5c..e2d752562f17f 100644 --- a/api/utils/grpc/interceptors/errors_test.go +++ b/api/utils/grpc/interceptors/errors_test.go @@ -16,10 +16,10 @@ package interceptors_test import ( "context" - "errors" "io" "net" "testing" + "time" "github.com/gravitational/trace" "github.com/stretchr/testify/assert" @@ -88,15 +88,19 @@ func TestGRPCErrorWrapping(t *testing.T) { stream, err := client.AddMFADevice(context.Background()) require.NoError(t, err) + // Give the server time to close the stream. This allows us to more + // consistently hit the io.EOF error. + time.Sleep(100 * time.Millisecond) + //nolint:staticcheck // SA1019. The specific stream used here doesn't matter. sendErr := stream.Send(&proto.AddMFADeviceRequest{}) - // io.EOF means the server closed the stream, which can - // happen depending in timing. In either case, it is - // still safe to recv from the stream and check for + // Expect either a success (unlikely because of the Sleep) or an unwrapped + // io.EOF error (meaning the server errored and closed the stream). + // In either case, it is still safe to recv from the stream and check for // the already exists error. - if sendErr != nil && !errors.Is(sendErr, io.EOF) { - t.Fatalf("Unexpected error: %v", sendErr) + if sendErr != nil && sendErr != io.EOF /* == error comparsion on purpose! */ { + t.Fatalf("Unexpected error: %q (%T)", sendErr, sendErr) } _, err = stream.Recv() From 4b445acd502a10cdae21c9bae852f1a6940f5ed3 Mon Sep 17 00:00:00 2001 From: Alan Parra Date: Thu, 1 Feb 2024 11:23:22 -0300 Subject: [PATCH 2/4] fix: Do not wrap io.EOF intercepted by stream Sends --- api/utils/grpc/interceptors/errors.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api/utils/grpc/interceptors/errors.go b/api/utils/grpc/interceptors/errors.go index e765cc10a94a0..7a254ce6c8dec 100644 --- a/api/utils/grpc/interceptors/errors.go +++ b/api/utils/grpc/interceptors/errors.go @@ -50,7 +50,11 @@ type grpcClientStreamWrapper struct { // SendMsg wraps around ClientStream.SendMsg func (s *grpcClientStreamWrapper) SendMsg(m interface{}) error { - if err := s.ClientStream.SendMsg(m); err != nil { + switch err := s.ClientStream.SendMsg(m); { + case errors.Is(err, io.EOF): + // Do not wrap io.EOF errors, they are often used as stop guards for streams. + return err + case err != nil: return &RemoteError{Err: trace.Unwrap(trail.FromGRPC(s.ClientStream.SendMsg(m)))} } return nil From 115f6400ad5a1a8f83e454f31b714782c3c7a167 Mon Sep 17 00:00:00 2001 From: Alan Parra Date: Thu, 1 Feb 2024 11:33:31 -0300 Subject: [PATCH 3/4] Use a helper func, fix duplicate Send/Recv calls --- api/utils/grpc/interceptors/errors.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/api/utils/grpc/interceptors/errors.go b/api/utils/grpc/interceptors/errors.go index 7a254ce6c8dec..3b9e834b60c18 100644 --- a/api/utils/grpc/interceptors/errors.go +++ b/api/utils/grpc/interceptors/errors.go @@ -50,26 +50,24 @@ type grpcClientStreamWrapper struct { // SendMsg wraps around ClientStream.SendMsg func (s *grpcClientStreamWrapper) SendMsg(m interface{}) error { - switch err := s.ClientStream.SendMsg(m); { - case errors.Is(err, io.EOF): - // Do not wrap io.EOF errors, they are often used as stop guards for streams. - return err - case err != nil: - return &RemoteError{Err: trace.Unwrap(trail.FromGRPC(s.ClientStream.SendMsg(m)))} - } - return nil + return wrapStreamErr(s.ClientStream.SendMsg(m)) } // RecvMsg wraps around ClientStream.RecvMsg func (s *grpcClientStreamWrapper) RecvMsg(m interface{}) error { - switch err := s.ClientStream.RecvMsg(m); { + return wrapStreamErr(s.ClientStream.RecvMsg(m)) +} + +func wrapStreamErr(err error) error { + switch { + case err == nil: + return nil case errors.Is(err, io.EOF): // Do not wrap io.EOF errors, they are often used as stop guards for streams. return err - case err != nil: - return &RemoteError{Err: trace.Unwrap(trail.FromGRPC(s.ClientStream.RecvMsg(m)))} + default: + return &RemoteError{Err: trace.Unwrap(trail.FromGRPC(err))} } - return nil } // GRPCServerUnaryErrorInterceptor is a gRPC unary server interceptor that From d2aba7c59dfd8984f5e951f06046b57bc0091903 Mon Sep 17 00:00:00 2001 From: Alan Parra Date: Thu, 1 Feb 2024 11:40:25 -0300 Subject: [PATCH 4/4] Fix typo --- api/utils/grpc/interceptors/errors_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/utils/grpc/interceptors/errors_test.go b/api/utils/grpc/interceptors/errors_test.go index e2d752562f17f..3e01ffca507ff 100644 --- a/api/utils/grpc/interceptors/errors_test.go +++ b/api/utils/grpc/interceptors/errors_test.go @@ -99,7 +99,7 @@ func TestGRPCErrorWrapping(t *testing.T) { // io.EOF error (meaning the server errored and closed the stream). // In either case, it is still safe to recv from the stream and check for // the already exists error. - if sendErr != nil && sendErr != io.EOF /* == error comparsion on purpose! */ { + if sendErr != nil && sendErr != io.EOF /* == error comparison on purpose! */ { t.Fatalf("Unexpected error: %q (%T)", sendErr, sendErr) }