From 5bf89d85bfa08db0d86d487f0721fc86b20ff586 Mon Sep 17 00:00:00 2001 From: Alok Parlikar Date: Wed, 6 Mar 2019 22:03:44 +0530 Subject: [PATCH] bugfix: disable IOReaderFactory for streaming requests An IOReaderFactory was being used to wrap request body for client/bidi streaming requests. This was causing the requests to be fully buffered before being sent to the grpc server, thereby breaking streaming. This commit changes that to directly use request body. --- .../proto/examplepb/flow_combination.pb.gw.go | 12 ++---------- examples/proto/examplepb/stream.pb.gw.go | 12 ++---------- protoc-gen-grpc-gateway/gengateway/template.go | 16 ++++------------ .../gengateway/template_test.go | 9 +++------ 4 files changed, 11 insertions(+), 38 deletions(-) diff --git a/examples/proto/examplepb/flow_combination.pb.gw.go b/examples/proto/examplepb/flow_combination.pb.gw.go index 70844205596..b34314fe7d1 100644 --- a/examples/proto/examplepb/flow_combination.pb.gw.go +++ b/examples/proto/examplepb/flow_combination.pb.gw.go @@ -61,11 +61,7 @@ func request_FlowCombination_StreamEmptyRpc_0(ctx context.Context, marshaler run grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) for { var protoReq EmptyProto err = dec.Decode(&protoReq) @@ -106,11 +102,7 @@ func request_FlowCombination_StreamEmptyStream_0(ctx context.Context, marshaler grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, berr - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) handleSend := func() error { var protoReq EmptyProto err := dec.Decode(&protoReq) diff --git a/examples/proto/examplepb/stream.pb.gw.go b/examples/proto/examplepb/stream.pb.gw.go index fb4c8fcae45..afda301f0ca 100644 --- a/examples/proto/examplepb/stream.pb.gw.go +++ b/examples/proto/examplepb/stream.pb.gw.go @@ -37,11 +37,7 @@ func request_StreamService_BulkCreate_0(ctx context.Context, marshaler runtime.M grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) for { var protoReq ABitOfEverything err = dec.Decode(&protoReq) @@ -99,11 +95,7 @@ func request_StreamService_BulkEcho_0(ctx context.Context, marshaler runtime.Mar grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, berr - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) handleSend := func() error { var protoReq sub.StringMessage err := dec.Decode(&protoReq) diff --git a/protoc-gen-grpc-gateway/gengateway/template.go b/protoc-gen-grpc-gateway/gengateway/template.go index 12c02ae9f9f..d376ccdb255 100644 --- a/protoc-gen-grpc-gateway/gengateway/template.go +++ b/protoc-gen-grpc-gateway/gengateway/template.go @@ -236,11 +236,7 @@ func request_{{.Method.Service.GetName}}_{{.Method.GetName}}_{{.Index}}(ctx cont grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) for { var protoReq {{.Method.RequestType.GoType .Method.Service.File.GoPkg.Path}} err = dec.Decode(&protoReq) @@ -303,8 +299,8 @@ var ( return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } else { protoReq.{{.FieldMaskField}} = fieldMask - } - } {{end}} + } + } {{end}} {{end}} {{end}} {{if .PathParams}} @@ -378,11 +374,7 @@ var ( grpclog.Infof("Failed to start streaming: %v", err) return nil, metadata, err } - newReader, berr := utilities.IOReaderFactory(req.Body) - if berr != nil { - return nil, metadata, berr - } - dec := marshaler.NewDecoder(newReader()) + dec := marshaler.NewDecoder(req.Body) handleSend := func() error { var protoReq {{.Method.RequestType.GoType .Method.Service.File.GoPkg.Path}} err := dec.Decode(&protoReq) diff --git a/protoc-gen-grpc-gateway/gengateway/template_test.go b/protoc-gen-grpc-gateway/gengateway/template_test.go index ba941d095dd..5e287a680d1 100644 --- a/protoc-gen-grpc-gateway/gengateway/template_test.go +++ b/protoc-gen-grpc-gateway/gengateway/template_test.go @@ -391,9 +391,6 @@ func TestApplyTemplateRequestWithClientStreaming(t *testing.T) { if want := spec.sigWant; !strings.Contains(got, want) { t.Errorf("applyTemplate(%#v) = %s; want to contain %s", file, got, want) } - if want := `marshaler.NewDecoder(newReader()`; !strings.Contains(got, want) { - t.Errorf("applyTemplate(%#v) = %s; want to contain %s", file, got, want) - } if want := `func RegisterExampleServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {`; !strings.Contains(got, want) { t.Errorf("applyTemplate(%#v) = %s; want to contain %s", file, got, want) } @@ -482,7 +479,7 @@ func TestAllowPatchFeature(t *testing.T) { } } -func TestIdentifierCapitalization(t *testing.T){ +func TestIdentifierCapitalization(t *testing.T) { msgdesc1 := &protodescriptor.DescriptorProto{ Name: proto.String("Exam_pleRequest"), } @@ -492,12 +489,12 @@ func TestIdentifierCapitalization(t *testing.T){ meth1 := &protodescriptor.MethodDescriptorProto{ Name: proto.String("ExampleGe2t"), InputType: proto.String("Exam_pleRequest"), - OutputType: proto.String("example_response"), + OutputType: proto.String("example_response"), } meth2 := &protodescriptor.MethodDescriptorProto{ Name: proto.String("Exampl_eGet"), InputType: proto.String("Exam_pleRequest"), - OutputType: proto.String("example_response"), + OutputType: proto.String("example_response"), } svc := &protodescriptor.ServiceDescriptorProto{ Name: proto.String("Example"),