From 5da4f328b0f981b65e16b926fa561759a5864569 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 15 May 2025 17:41:27 +0000 Subject: [PATCH 1/4] return error code Internal instead of EOF --- stream.go | 2 +- test/end2end_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 01e66c1ed88f..285ffa11464d 100644 --- a/stream.go +++ b/stream.go @@ -1167,7 +1167,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) + return status.Errorf(codes.Internal, "client streaming cardinality violation: get , want ") } func (a *csAttempt) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index a425877155e8..92d4a2f36ff8 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3739,6 +3739,39 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests that a client receives a cardinality violation error for client-streaming +// RPCs if the server call SendAndClose after calling SendAndClose. +func (s) TestClientStreaming_ServerHandlerSendAndCloseAfterSendAndClose(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if _, err := stream.CloseAndRecv(); status.Code(err) != codes.Internal { + t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, codes.Internal) + } +} + func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e) From 0810217ef81f3e0bda919b98dab4f96f94d074bf Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Sun, 25 May 2025 18:44:38 +0000 Subject: [PATCH 2/4] minor changes --- test/end2end_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 92d4a2f36ff8..bfbb76b8f9c0 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3740,7 +3740,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendAndClose after calling SendAndClose. +// RPCs if the server call SendAndClose multiple times. func (s) TestClientStreaming_ServerHandlerSendAndCloseAfterSendAndClose(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { @@ -3768,7 +3768,7 @@ func (s) TestClientStreaming_ServerHandlerSendAndCloseAfterSendAndClose(t *testi t.Fatalf("stream.Send(_) = %v, want ", err) } if _, err := stream.CloseAndRecv(); status.Code(err) != codes.Internal { - t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, codes.Internal) + t.Fatalf("stream.CloseAndRecv() = %v, want error with status code %s", err, codes.Internal) } } From c3f1f0454d8f7e2bfbe95097b9f9496c153913e7 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 27 May 2025 07:33:48 +0000 Subject: [PATCH 3/4] update error message --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 285ffa11464d..b9cc6c7c760b 100644 --- a/stream.go +++ b/stream.go @@ -1167,7 +1167,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "client streaming cardinality violation: get , want ") + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but get ") } func (a *csAttempt) finish(err error) { @@ -1491,7 +1491,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but get ") } func (as *addrConnStream) finish(err error) { From 0441021c790ceba60ae7a866effcacb5b72a3f31 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 27 May 2025 15:06:51 +0000 Subject: [PATCH 4/4] Replace SendAndClose() with SendMsg() in the test --- stream.go | 4 ++-- test/end2end_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/stream.go b/stream.go index b9cc6c7c760b..3e98c79f0f15 100644 --- a/stream.go +++ b/stream.go @@ -1167,7 +1167,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but get ") + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1491,7 +1491,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but get ") + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index bfbb76b8f9c0..1fbde8b9df5f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3740,15 +3740,15 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendAndClose multiple times. -func (s) TestClientStreaming_ServerHandlerSendAndCloseAfterSendAndClose(t *testing.T) { +// RPCs if the server call SendMsg multiple times. +func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { - if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { - t.Errorf("stream.SendAndClose(_) = %v, want ", err) + if err := stream.SendMsg(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendMsg(_) = %v, want ", err) } - if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { - t.Errorf("stream.SendAndClose(_) = %v, want ", err) + if err := stream.SendMsg(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendMsg(_) = %v, want ", err) } return nil },