Skip to content

Commit

Permalink
Add bidirectional streaming support by interleaving Send() and Recv()…
Browse files Browse the repository at this point in the history
… calls.
  • Loading branch information
tmc committed May 27, 2016
1 parent edcd9b0 commit cb23d41
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 43 deletions.
61 changes: 40 additions & 21 deletions examples/examplepb/a_bit_of_everything.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 40 additions & 21 deletions examples/examplepb/flow_combination.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 63 additions & 1 deletion protoc-gen-grpc-gateway/gengateway/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ var _ = utilities.NewDoubleArray
`))

handlerTemplate = template.Must(template.New("handler").Parse(`
{{if .Method.GetClientStreaming}}
{{if and .Method.GetClientStreaming .Method.GetServerStreaming}}
{{template "bidi-streaming-request-func" .}}
{{else if .Method.GetClientStreaming}}
{{template "client-streaming-request-func" .}}
{{else}}
{{template "client-rpc-request-func" .}}
Expand Down Expand Up @@ -234,6 +236,66 @@ var (
{{end}}
}`))

_ = template.Must(handlerTemplate.New("bidi-streaming-request-func").Parse(`
{{template "request-func-signature" .}} {
var metadata runtime.ServerMetadata
stream, err := client.{{.Method.GetName}}(ctx)
if err != nil {
grpclog.Printf("Failed to start streaming: %v", err)
return nil, metadata, err
}
dec := marshaler.NewDecoder(req.Body)
sendErrs := make(chan error, 1)
go func(errs chan<- error) {
for {
var protoReq {{.Method.RequestType.GoType .Method.Service.File.GoPkg.Path}}
err = dec.Decode(&protoReq)
if err == nil {
select {
case errs <- err:
default:
}
}
if err == io.EOF {
select {
case errs <- err:
default:
}
return
}
if err != nil {
grpclog.Printf("Failed to decode request: %v", err)
select {
case errs <- grpc.Errorf(codes.InvalidArgument, "%v", err):
default:
}
}
if err = stream.Send(&protoReq); err != nil {
grpclog.Printf("Failed to send request: %v", err)
select {
case errs <- err:
default:
}
}
}
if err := stream.CloseSend(); err != nil {
grpclog.Printf("Failed to terminate client stream: %v", err)
select {
case errs <- err:
default:
}
}
}(sendErrs)
header, err := stream.Header()
if err != nil {
grpclog.Printf("Failed to get header from client: %v", err)
return nil, metadata, err
}
metadata.HeaderMD = header
return stream, metadata, <-sendErrs
}
`))

trailerTemplate = template.Must(template.New("trailer").Parse(`
{{range $svc := .}}
// Register{{$svc.GetName}}HandlerFromEndpoint is same as Register{{$svc.GetName}}Handler but
Expand Down

0 comments on commit cb23d41

Please sign in to comment.