Skip to content

Commit

Permalink
Fix EOF handling in client side streaming (grpc-ecosystem#962)
Browse files Browse the repository at this point in the history
* Add integration test for grpc-ecosystem#961

Co-authored-by: Jonas Arilho <[email protected]>

* Add verification for io.EOF after stream.Send() on generated code template (grpc-ecosystem#961)

Co-authored-by: Jonas Arilho <[email protected]>

* Add more values on testABEBulkCreateWithError, run go mod tidy

Fixes grpc-ecosystem#961
  • Loading branch information
gustavocovas authored and johanbrandhorst committed Jun 28, 2019
1 parent 803f56d commit 00a0106
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 1 deletion.
61 changes: 61 additions & 0 deletions examples/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -263,6 +264,7 @@ func TestABE(t *testing.T) {
testABECreate(t, 8080)
testABECreateBody(t, 8080)
testABEBulkCreate(t, 8080)
testABEBulkCreateWithError(t, 8080)
testABELookup(t, 8080)
testABELookupNotFound(t, 8080)
testABEList(t, 8080)
Expand Down Expand Up @@ -549,6 +551,65 @@ func testABEBulkCreate(t *testing.T, port int) {
}
}

func testABEBulkCreateWithError(t *testing.T, port int) {
count := 0
r, w := io.Pipe()
go func(w io.WriteCloser) {
defer func() {
if cerr := w.Close(); cerr != nil {
t.Errorf("w.Close() failed with %v; want success", cerr)
}
}()
for _, val := range []string{
"foo", "bar", "baz", "qux", "quux",
} {
time.Sleep(1 * time.Millisecond)

want := gw.ABitOfEverything{
StringValue: fmt.Sprintf("strprefix/%s", val),
}
var m jsonpb.Marshaler
if err := m.Marshal(w, &want); err != nil {
t.Fatalf("m.Marshal(%#v, w) failed with %v; want success", want, err)
}
if _, err := io.WriteString(w, "\n"); err != nil {
t.Errorf("w.Write(%q) failed with %v; want success", "\n", err)
return
}
count++
}
}(w)

apiURL := fmt.Sprintf("http://localhost:%d/v1/example/a_bit_of_everything/bulk", port)
request, err := http.NewRequest("POST", apiURL, r)
if err != nil {
t.Fatalf("http.NewRequest(%q, %q, nil) failed with %v; want success", "POST", apiURL, err)
}
request.Header.Add("Grpc-Metadata-error", "some error")

resp, err := http.DefaultClient.Do(request)
if err != nil {
t.Errorf("http.Post(%q) failed with %v; want success", apiURL, err)
return
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("ioutil.ReadAll(resp.Body) failed with %v; want success", err)
return
}

if got, want := resp.StatusCode, http.StatusBadRequest; got != want {
t.Errorf("resp.StatusCode = %d; want %d", got, want)
t.Logf("%s", buf)
}

var msg errorBody
if err := json.Unmarshal(buf, &msg); err != nil {
t.Fatalf("json.Unmarshal(%s, &msg) failed with %v; want success", buf, err)
}
}

func testABELookup(t *testing.T, port int) {
apiURL := fmt.Sprintf("http://localhost:%d/v1/example/a_bit_of_everything", port)
cresp, err := http.Post(apiURL, "application/json", strings.NewReader(`
Expand Down
3 changes: 3 additions & 0 deletions examples/proto/examplepb/flow_combination.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions examples/proto/examplepb/stream.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion examples/server/a_bit_of_everything.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@ func (s *_ABitOfEverythingServer) CreateBody(ctx context.Context, msg *examples.
}

func (s *_ABitOfEverythingServer) BulkCreate(stream examples.StreamService_BulkCreateServer) error {
count := 0
ctx := stream.Context()

if header, ok := metadata.FromIncomingContext(ctx); ok {
if v, ok := header["error"]; ok {
return status.Errorf(codes.InvalidArgument, "error metadata: %v", v)
}
}

count := 0
for {
msg, err := stream.Recv()
if err == io.EOF {
Expand Down
3 changes: 3 additions & 0 deletions protoc-gen-grpc-gateway/gengateway/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func request_{{.Method.Service.GetName}}_{{.Method.GetName}}_{{.Index}}(ctx cont
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err = stream.Send(&protoReq); err != nil {
if err == io.EOF {
break
}
grpclog.Infof("Failed to send request: %v", err)
return nil, metadata, err
}
Expand Down

0 comments on commit 00a0106

Please sign in to comment.