From f06ff56299e9aff6bbbe8e454ceaeedb2e4d679f Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 29 Apr 2025 06:52:11 +0000 Subject: [PATCH 01/18] return error code Internal instead of EOF --- stream.go | 8 ++++++++ test/end2end_test.go | 3 --- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/stream.go b/stream.go index 01e66c1ed88f..aa693f356470 100644 --- a/stream.go +++ b/stream.go @@ -542,6 +542,8 @@ type clientStream struct { sentLast bool // sent an end stream + recvMsg bool // received msg from server + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1134,11 +1136,17 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + if cs.desc.ClientStreams && !cs.desc.ServerStreams && !cs.recvMsg{ + return status.Errorf(codes.Internal, "client streaming cardinality violation") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + if cs.desc.ClientStreams { + cs.recvMsg = true + } if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index a425877155e8..5015dd67d595 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3588,9 +3588,6 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` - // after this is fixed. - t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a From 154642748b3bf6b65abad08412acfe3d06bb0d66 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 12 May 2025 04:47:47 +0000 Subject: [PATCH 02/18] change variable name from recvMsg to recvFirstMsg --- stream.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stream.go b/stream.go index aa693f356470..ad5fdc90ad14 100644 --- a/stream.go +++ b/stream.go @@ -542,7 +542,7 @@ type clientStream struct { sentLast bool // sent an end stream - recvMsg bool // received msg from server + recvFirstMsg bool // received first msg from server methodConfig *MethodConfig @@ -1136,7 +1136,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } - if cs.desc.ClientStreams && !cs.desc.ServerStreams && !cs.recvMsg{ + if cs.desc.ClientStreams && !cs.desc.ServerStreams && !cs.recvFirstMsg { return status.Errorf(codes.Internal, "client streaming cardinality violation") } return io.EOF // indicates successful end of stream. @@ -1145,7 +1145,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } if cs.desc.ClientStreams { - cs.recvMsg = true + cs.recvFirstMsg = true } if a.trInfo != nil { a.mu.Lock() From ca4860a82f627928b4b0d104480030caa4178ac5 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 12 May 2025 19:37:24 +0000 Subject: [PATCH 03/18] changes for unary rpc --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index ad5fdc90ad14..3650741f91b4 100644 --- a/stream.go +++ b/stream.go @@ -1136,7 +1136,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } - if cs.desc.ClientStreams && !cs.desc.ServerStreams && !cs.recvFirstMsg { + if !cs.desc.ServerStreams && !cs.recvFirstMsg { return status.Errorf(codes.Internal, "client streaming cardinality violation") } return io.EOF // indicates successful end of stream. @@ -1144,7 +1144,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } - if cs.desc.ClientStreams { + if !cs.desc.ServerStreams { cs.recvFirstMsg = true } if a.trInfo != nil { From c3ca09b6192587a3a187b784be21ca9ca973d6d6 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 13 May 2025 09:39:12 +0000 Subject: [PATCH 04/18] added comment --- stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream.go b/stream.go index 3650741f91b4..b2c4ab39ae35 100644 --- a/stream.go +++ b/stream.go @@ -1136,6 +1136,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + // received no msg and status ok for non-server streaming rpcs. if !cs.desc.ServerStreams && !cs.recvFirstMsg { return status.Errorf(codes.Internal, "client streaming cardinality violation") } From bbd83666804d26f92254d19ab4366c818e9edff2 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 15 May 2025 17:05:28 +0000 Subject: [PATCH 05/18] added test --- stream.go | 7 +--- test/end2end_test.go | 79 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/stream.go b/stream.go index b2c4ab39ae35..d2c88241ab84 100644 --- a/stream.go +++ b/stream.go @@ -542,8 +542,6 @@ type clientStream struct { sentLast bool // sent an end stream - recvFirstMsg bool // received first msg from server - methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1137,7 +1135,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return statusErr } // received no msg and status ok for non-server streaming rpcs. - if !cs.desc.ServerStreams && !cs.recvFirstMsg { + if !cs.desc.ServerStreams { return status.Errorf(codes.Internal, "client streaming cardinality violation") } return io.EOF // indicates successful end of stream. @@ -1145,9 +1143,6 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } - if !cs.desc.ServerStreams { - cs.recvFirstMsg = true - } if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 5015dd67d595..a40148ca56fa 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3736,6 +3736,85 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err) + } + defer cc.Close() + + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("lis.Accept() = %v", err) + return + } + defer conn.Close() + framer := http2.NewFramer(conn, conn) + + if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { + t.Errorf("Error reading client preface: %v", err) + return + } + if err := framer.WriteSettings(); err != nil { + t.Errorf("Error writing server settings: %v", err) + return + } + if err := framer.WriteSettingsAck(); err != nil { + t.Errorf("Error writing settings ack: %v", err) + return + } + + for ctx.Err() == nil { + frame, err := framer.ReadFrame() + if err != nil { + t.Errorf("Error reading frame: %v", err) + return + } + + switch frame := frame.(type) { + case *http2.HeadersFrame: + if frame.Header().StreamID != 1 { + t.Errorf("Expected stream ID 1, got %d", frame.Header().StreamID) + return + } + + var buf bytes.Buffer + enc := hpack.NewEncoder(&buf) + _ = enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) + _ = enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + _ = enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) + + if err := framer.WriteHeaders(http2.HeadersFrameParam{ + StreamID: 1, + BlockFragment: buf.Bytes(), + EndHeaders: true, + EndStream: true, + }); err != nil { + t.Errorf("Error while writing headers: %v", err) + return + } + time.Sleep(50 * time.Millisecond) + default: + t.Logf("Server received frame: %v", frame) + } + } + }() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e) From 1a23bb7c6bb72c7de3e2271ab867944f4468a829 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 15 May 2025 17:54:00 +0000 Subject: [PATCH 06/18] add comment before test --- test/end2end_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index a40148ca56fa..a21e580aa739 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3736,6 +3736,8 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests that a client receives a cardinality violation error for unary +// RPCs if the server doesn't send a message before returning status OK. func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { From f37ea78478c33241982a5444db971c5dbe6182bf Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 26 May 2025 03:15:51 +0000 Subject: [PATCH 07/18] formatting changes --- test/end2end_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index a21e580aa739..637176e01ce1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3749,14 +3749,14 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { defer cancel() cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err) + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) } defer cc.Close() go func() { conn, err := lis.Accept() if err != nil { - t.Errorf("lis.Accept() = %v", err) + t.Errorf("lis.Accept() failed unexpectedly: %v", err) return } defer conn.Close() @@ -3791,9 +3791,9 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { var buf bytes.Buffer enc := hpack.NewEncoder(&buf) - _ = enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - _ = enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) - _ = enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) + enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) + enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) if err := framer.WriteHeaders(http2.HeadersFrameParam{ StreamID: 1, From 9436092e759af133a5d8d6b9f9afd5fc7485f49d Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 26 May 2025 17:05:16 +0000 Subject: [PATCH 08/18] modify error statement --- stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream.go b/stream.go index d2c88241ab84..ae3b5c97f338 100644 --- a/stream.go +++ b/stream.go @@ -1136,7 +1136,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } // received no msg and status ok for non-server streaming rpcs. if !cs.desc.ServerStreams { - return status.Errorf(codes.Internal, "client streaming cardinality violation") + return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") } return io.EOF // indicates successful end of stream. } From 11da0dc0ec426ba4c99a7dcb46f595eb430f7784 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 26 May 2025 19:51:09 +0000 Subject: [PATCH 09/18] changed test to not use low-level framer --- test/end2end_test.go | 79 ++++++++++++-------------------------------- 1 file changed, 21 insertions(+), 58 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 637176e01ce1..d0a10c0f6994 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3745,6 +3745,27 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } defer lis.Close() + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "EmptyCall", + Handler: func(any, grpc.ServerStream) error { + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -3753,64 +3774,6 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } defer cc.Close() - go func() { - conn, err := lis.Accept() - if err != nil { - t.Errorf("lis.Accept() failed unexpectedly: %v", err) - return - } - defer conn.Close() - framer := http2.NewFramer(conn, conn) - - if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { - t.Errorf("Error reading client preface: %v", err) - return - } - if err := framer.WriteSettings(); err != nil { - t.Errorf("Error writing server settings: %v", err) - return - } - if err := framer.WriteSettingsAck(); err != nil { - t.Errorf("Error writing settings ack: %v", err) - return - } - - for ctx.Err() == nil { - frame, err := framer.ReadFrame() - if err != nil { - t.Errorf("Error reading frame: %v", err) - return - } - - switch frame := frame.(type) { - case *http2.HeadersFrame: - if frame.Header().StreamID != 1 { - t.Errorf("Expected stream ID 1, got %d", frame.Header().StreamID) - return - } - - var buf bytes.Buffer - enc := hpack.NewEncoder(&buf) - enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) - enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) - enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) - - if err := framer.WriteHeaders(http2.HeadersFrameParam{ - StreamID: 1, - BlockFragment: buf.Bytes(), - EndHeaders: true, - EndStream: true, - }); err != nil { - t.Errorf("Error while writing headers: %v", err) - return - } - time.Sleep(50 * time.Millisecond) - default: - t.Logf("Server received frame: %v", frame) - } - } - }() - client := testgrpc.NewTestServiceClient(cc) if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) From 7808f8ac6aae0d9c305541b2af362c36e1582781 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 27 May 2025 05:58:39 +0000 Subject: [PATCH 10/18] added test --- stream.go | 7 ++++++- test/end2end_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index ae3b5c97f338..5bdb7b03f016 100644 --- a/stream.go +++ b/stream.go @@ -542,6 +542,8 @@ type clientStream struct { sentLast bool // sent an end stream + recvFirstMsg bool // received first msg + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1135,7 +1137,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return statusErr } // received no msg and status ok for non-server streaming rpcs. - if !cs.desc.ServerStreams { + if !cs.desc.ServerStreams && !cs.recvFirstMsg { return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") } return io.EOF // indicates successful end of stream. @@ -1143,6 +1145,9 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } + if !cs.desc.ServerStreams { + cs.recvFirstMsg = true + } if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index d0a10c0f6994..c29ce4cb2941 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3736,6 +3736,38 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests for a successful RPC, client will continue to receive io.EOF for successive calls to CloseAndRecv(). +func (s) TestClientStreaming_ClientCallCloseAndRecvAfterCloseAndRecv(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if _, err := stream.CloseAndRecv(); err != nil { + t.Fatalf("stream.CloseAndRecv() = %v , want ", err) + } + if _, err := stream.CloseAndRecv(); err != io.EOF { + t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, io.EOF) + } +} + // Tests that a client receives a cardinality violation error for unary // RPCs if the server doesn't send a message before returning status OK. func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { From ad4ce90072e8f3c917e0869ddda642989b442bb2 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 28 May 2025 10:22:06 +0000 Subject: [PATCH 11/18] add test for unary RPC --- test/end2end_test.go | 52 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index c29ce4cb2941..45f50388cc5d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3736,8 +3736,8 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests for a successful RPC, client will continue to receive io.EOF for successive calls to CloseAndRecv(). -func (s) TestClientStreaming_ClientCallCloseAndRecvAfterCloseAndRecv(t *testing.T) { +// Tests for a successful RPC, client will receive io.EOF for second call to RecvMsg(). +func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { @@ -3760,11 +3760,15 @@ func (s) TestClientStreaming_ClientCallCloseAndRecvAfterCloseAndRecv(t *testing. if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { t.Fatalf("stream.Send(_) = %v, want ", err) } - if _, err := stream.CloseAndRecv(); err != nil { - t.Fatalf("stream.CloseAndRecv() = %v , want ", err) + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() = %v, want ", err) + } + resp := new(testpb.StreamingInputCallResponse) + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) } - if _, err := stream.CloseAndRecv(); err != io.EOF { - t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, io.EOF) + if err := stream.RecvMsg(resp); err != io.EOF { + t.Fatalf("stream.RecvMsg() = %v, want error %s", err, io.EOF) } } @@ -3812,6 +3816,42 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } +// Tests for a successful unary RPC, client will receive io.EOF for second call to RecvMsg(). +func (s) TestUnaryCall_ClientCallRecvMsgTwice(t *testing.T) { + e := tcpTLSEnv + te := newTest(t, e) + defer te.tearDown() + + te.startServer(&testServer{security: e.security}) + + cc := te.clientConn() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { + t.Fatalf("stream.SendMsg(_) = %v, want ", err) + } + + resp := &testpb.SimpleResponse{} + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + + if err = stream.RecvMsg(resp); err != io.EOF { + t.Fatalf("stream.RecvMsg() = %v, want error %s", err, io.EOF) + } +} + func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e) From a21fcc4689edde57e06e762bb344d3a8bbcd56b8 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 28 May 2025 10:28:34 +0000 Subject: [PATCH 12/18] add test for unary RPC --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 45f50388cc5d..c5892afb7203 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3817,7 +3817,7 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } // Tests for a successful unary RPC, client will receive io.EOF for second call to RecvMsg(). -func (s) TestUnaryCall_ClientCallRecvMsgTwice(t *testing.T) { +func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { e := tcpTLSEnv te := newTest(t, e) defer te.tearDown() From 8107444f98662fe2b9460eeb0ce5aa670e252cb8 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 3 Jun 2025 06:39:13 +0000 Subject: [PATCH 13/18] remove extra variable --- stream.go | 11 +++---- test/end2end_test.go | 72 -------------------------------------------- 2 files changed, 5 insertions(+), 78 deletions(-) diff --git a/stream.go b/stream.go index 5bdb7b03f016..4aa3f40a95eb 100644 --- a/stream.go +++ b/stream.go @@ -542,8 +542,6 @@ type clientStream struct { sentLast bool // sent an end stream - recvFirstMsg bool // received first msg - methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1137,7 +1135,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return statusErr } // received no msg and status ok for non-server streaming rpcs. - if !cs.desc.ServerStreams && !cs.recvFirstMsg { + if !cs.desc.ServerStreams { return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") } return io.EOF // indicates successful end of stream. @@ -1145,9 +1143,6 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } - if !cs.desc.ServerStreams { - cs.recvFirstMsg = true - } if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1483,6 +1478,10 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } + // received no msg and status ok for non-server streaming rpcs. + if !as.desc.ServerStreams { + return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) diff --git a/test/end2end_test.go b/test/end2end_test.go index c5892afb7203..d0a10c0f6994 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3736,42 +3736,6 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests for a successful RPC, client will receive io.EOF for second call to RecvMsg(). -func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { - ss := stubserver.StubServer{ - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { - if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { - t.Errorf("stream.SendAndClose(_) = %v, want ", err) - } - return nil - }, - } - if err := ss.Start(nil); err != nil { - t.Fatal("Error starting server:", err) - } - defer ss.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - stream, err := ss.Client.StreamingInputCall(ctx) - if err != nil { - t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) - } - if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { - t.Fatalf("stream.Send(_) = %v, want ", err) - } - if err := stream.CloseSend(); err != nil { - t.Fatalf("stream.CloseSend() = %v, want ", err) - } - resp := new(testpb.StreamingInputCallResponse) - if err := stream.RecvMsg(resp); err != nil { - t.Fatalf("stream.RecvMsg() = %v , want ", err) - } - if err := stream.RecvMsg(resp); err != io.EOF { - t.Fatalf("stream.RecvMsg() = %v, want error %s", err, io.EOF) - } -} - // Tests that a client receives a cardinality violation error for unary // RPCs if the server doesn't send a message before returning status OK. func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { @@ -3816,42 +3780,6 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } -// Tests for a successful unary RPC, client will receive io.EOF for second call to RecvMsg(). -func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { - e := tcpTLSEnv - te := newTest(t, e) - defer te.tearDown() - - te.startServer(&testServer{security: e.security}) - - cc := te.clientConn() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - desc := &grpc.StreamDesc{ - StreamName: "UnaryCall", - ServerStreams: false, - ClientStreams: false, - } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") - if err != nil { - t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) - } - - if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { - t.Fatalf("stream.SendMsg(_) = %v, want ", err) - } - - resp := &testpb.SimpleResponse{} - if err := stream.RecvMsg(resp); err != nil { - t.Fatalf("stream.RecvMsg() = %v , want ", err) - } - - if err = stream.RecvMsg(resp); err != io.EOF { - t.Fatalf("stream.RecvMsg() = %v, want error %s", err, io.EOF) - } -} - func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e) From 643998df2051acd7c0f9c25be94c65d26dfd2d4e Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 3 Jun 2025 07:51:37 +0000 Subject: [PATCH 14/18] vet errors --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 9837a8bf7f6f..b1c102fb4ce7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3778,7 +3778,7 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { client := testgrpc.NewTestServiceClient(cc) if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } + } } // Tests that a client receives a cardinality violation error for client-streaming From f85707bd2ac6b35df5105c417b045a2bf87a9883 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 3 Jun 2025 10:14:33 +0000 Subject: [PATCH 15/18] added tests back --- test/end2end_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index b1c102fb4ce7..bd7b69bc806e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3781,6 +3781,78 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } +// Tests for a successful unary RPC, client will receive cardinality violaiton for second call to RecvMsg(). +func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { + e := tcpTLSEnv + te := newTest(t, e) + defer te.tearDown() + + te.startServer(&testServer{security: e.security}) + + cc := te.clientConn() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { + t.Fatalf("stream.SendMsg(_) = %v, want ", err) + } + + resp := &testpb.SimpleResponse{} + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + + if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests for a successful RPC, client will receive io.EOF for second call to RecvMsg(). +func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() = %v, want ", err) + } + resp := new(testpb.StreamingInputCallResponse) + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server call SendMsg multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { From 4e5075b8477846acd96071dd3e5e113367e6947d Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 3 Jun 2025 10:18:21 +0000 Subject: [PATCH 16/18] refactor comments --- test/end2end_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index bd7b69bc806e..3e59b0445a42 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3781,7 +3781,7 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } -// Tests for a successful unary RPC, client will receive cardinality violaiton for second call to RecvMsg(). +// Tests that client will receive cardinality violation in subsequent calls to RecvMsg(). func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { e := tcpTLSEnv te := newTest(t, e) @@ -3817,7 +3817,7 @@ func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { } } -// Tests for a successful RPC, client will receive io.EOF for second call to RecvMsg(). +// Tests that client will receive cardinality violation in the subsequent calls to RecvMsg(). func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { From b2ab207e71a971d93fe7a1c170b88bc64c0dbba9 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 6 Jun 2025 19:29:24 +0000 Subject: [PATCH 17/18] use unknownServiceHandler --- test/end2end_test.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3e59b0445a42..4512bcf70883 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3739,31 +3739,18 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Tests that a client receives a cardinality violation error for unary // RPCs if the server doesn't send a message before returning status OK. -func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { +func TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) } defer lis.Close() - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "EmptyCall", - Handler: func(any, grpc.ServerStream) error { - return nil - }, - ClientStreams: false, - ServerStreams: false, - }, - }, - Metadata: "grpc/testing/test.proto", - } - s.RegisterService(&serviceDesc, &testServer{}) + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) go s.Serve(lis) defer s.Stop() From 85ab6ed437798156ff30076e23f33f7672a2b050 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 10 Jun 2025 08:16:26 +0000 Subject: [PATCH 18/18] resolved nits --- stream.go | 12 ++++++------ test/end2end_test.go | 10 ++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/stream.go b/stream.go index 33c03ed9b88c..ccde41f8dbcb 100644 --- a/stream.go +++ b/stream.go @@ -1138,9 +1138,9 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } - // received no msg and status ok for non-server streaming rpcs. + // Received no msg and status OK for non-server streaming rpcs. if !cs.desc.ServerStreams { - return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") } return io.EOF // indicates successful end of stream. } @@ -1175,7 +1175,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1482,9 +1482,9 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } - // received no msg and status ok for non-server streaming rpcs. + // Received no msg and status OK for non-server streaming rpcs. if !as.desc.ServerStreams { - return status.Errorf(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") } return io.EOF // indicates successful end of stream. } @@ -1503,7 +1503,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index 4512bcf70883..ab2517d5f18e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3739,7 +3739,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Tests that a client receives a cardinality violation error for unary // RPCs if the server doesn't send a message before returning status OK. -func TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { +func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3768,7 +3768,8 @@ func TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } -// Tests that client will receive cardinality violation in subsequent calls to RecvMsg(). +// Tests that client will receive cardinality violations when calling +// RecvMsg() multiple times for non-streaming response streams. func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { e := tcpTLSEnv te := newTest(t, e) @@ -3804,7 +3805,8 @@ func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { } } -// Tests that client will receive cardinality violation in the subsequent calls to RecvMsg(). +// Tests that client will receive cardinality violations when calling +// RecvMsg() multiple times for non-streaming response streams. func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { @@ -3841,7 +3843,7 @@ func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg multiple times. +// RPCs if the server call SendMsg() multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {