From d0d1fba1611262b0f1bab2c36ee4c47a2eb23004 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Thu, 24 Jan 2019 18:01:56 -0800 Subject: [PATCH 1/7] client: handle HTTP fallback correctly --- internal/transport/http2_client.go | 2 +- internal/transport/http_util.go | 94 ++++++++++++----- test/end2end_test.go | 156 ++++++++++++++++++++++++++++- test/rawConnWrapper.go | 42 ++++++++ 4 files changed, 269 insertions(+), 25 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ff8f4db08ba4..e0037d5ba2a4 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1143,7 +1143,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { atomic.StoreUint32(&s.bytesReceived, 1) var state decodeState if err := state.decodeHeader(frame); err != nil { - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, false) // Something wrong. Stops reading even when there is remaining. return } diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 77a2cfaaef33..bcaf1ce0c438 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -244,44 +244,92 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { if frame.Truncated { return status.Error(codes.Internal, "peer header list size exceeded limit") } + + // isGRPC indicates whether the peer is speaking gRPC (otherwise HTTP). + // If the header contains valid a content-type, i.e. a string starts with "application/grpc", then + // we are in gRPC mode and should handle gRPC specific error. + // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we + // are in HTTP fallback mode, and should handle error specific to HTTP. + var isGRPC bool + var grpcErr, httpErr, contentTypeErr error for _, hf := range frame.Fields { - if err := d.processHeaderField(hf); err != nil { - return err + err := d.processHeaderField(hf) + switch hf.Name { + case "content-type": + if err == nil { + // gRPC mode + isGRPC = true + } else { + contentTypeErr = err + } + case ":status": + httpErr = err // In gRPC mode, we don't care about HTTP field parsing error. + default: + if err != nil && grpcErr == nil { + grpcErr = err // store the first encountered gRPC field parsing error. + } } } - if d.serverSide { + // gRPC mode + if isGRPC { + if grpcErr != nil { + return grpcErr + } + if d.serverSide { + return nil + } + if d.rawStatusCode == nil && d.statusGen == nil { + // gRPC status doesn't exist and content-type indicates gRPC peer. + // Set rawStatusCode to be unknown and return nil error. + // So that, if the stream has ended this Unknown status + // will be propagated to the user. + // Otherwise, it will be ignored. In which case, status from + // a later trailer, that has StreamEnded flag set, is propagated. + code := int(codes.Unknown) + d.rawStatusCode = &code + } return nil } - // If grpc status exists, no need to check further. - if d.rawStatusCode != nil || d.statusGen != nil { - return nil + // HTTP fallback mode + if httpErr != nil { + return httpErr } - // If grpc status doesn't exist and http status doesn't exist, - // then it's a malformed header. - if d.httpStatus == nil { - return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)") - } + var ( + code = codes.Internal // when header does not include HTTP status, return INTERNAL + ok bool + ) - if *(d.httpStatus) != http.StatusOK { - code, ok := httpStatusConvTab[*(d.httpStatus)] + if d.httpStatus != nil { + code, ok = httpStatusConvTab[*(d.httpStatus)] if !ok { code = codes.Unknown } - return status.Error(code, http.StatusText(*(d.httpStatus))) } - // gRPC status doesn't exist and http status is OK. - // Set rawStatusCode to be unknown and return nil error. - // So that, if the stream has ended this Unknown status - // will be propagated to the user. - // Otherwise, it will be ignored. In which case, status from - // a later trailer, that has StreamEnded flag set, is propagated. - code := int(codes.Unknown) - d.rawStatusCode = &code - return nil + return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)) +} + +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status code and its corresponding message + content-type error message. +func (d *decodeState) constructHTTPErrMsg(contentTypeErr error) string { + var errMsgs []string + + if d.httpStatus == nil { + errMsgs = append(errMsgs, "malformed header: in HTTP fallback mode, but doesn't contain HTTP status") + } else { + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.httpStatus)), *d.httpStatus)) + } + + if contentTypeErr == nil { + errMsgs = append(errMsgs, "transport: missing content-type field") + } else { + errMsgs = append(errMsgs, status.Convert(contentTypeErr).Message()) + } + + return strings.Join(errMsgs, "\t") } func (d *decodeState) addMetadata(k, v string) { diff --git a/test/end2end_test.go b/test/end2end_test.go index 2ffa2e334e88..b1e0683134a7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -29,6 +29,7 @@ import ( "flag" "fmt" "io" + "log" "math" "net" "net/http" @@ -149,6 +150,7 @@ type testServer struct { earlyFail bool // whether to error out the execution of a service handler prematurely. setAndSendHeader bool // whether to call setHeader and sendHeader. setHeaderOnly bool // whether to only call setHeader, not sendHeader. + notSetAndNotSend bool // whether to not set and send header. multipleSetTrailer bool // whether to call setTrailer multiple times. unaryCallSleepTime time.Duration } @@ -6892,7 +6894,7 @@ func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) time.Sleep(100 * time.Millisecond) rcw.writeHeaders(http2.HeadersFrameParam{ StreamID: tc.getCurrentStreamID(), - BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")), + BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")), EndStream: false, EndHeaders: true, }) @@ -7125,3 +7127,155 @@ func (s) TestRPCWaitsForResolver(t *testing.T) { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) } } + +func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { + e := tcpClearRREnv + te := newTest(t, e) + lis := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) + defer te.tearDown() + + for _, test := range []struct { + header []string + errCode codes.Code + }{ + { + header: []string{ + // non-grpc content-type, fallback to HTTP mode + ":status", "400", + "content-type", "text/html", + }, + errCode: codes.Internal, + }, + { + header: []string{ + // non-grpc content-type, fallback to HTTP mode + ":status", "401", + "content-type", "text/html", + }, + errCode: codes.Unauthenticated, + }, + { + header: []string{ + // non-grpc content-type, fallback to HTTP mode + ":status", "404", + "content-type", "text/html", + }, + errCode: codes.Unimplemented, + }, + { + header: []string{ + // non-grpc content-type, fallback to HTTP mode + ":status", "502", + "content-type", "text/html", + }, + errCode: codes.Unavailable, + }, + { + // non-grpc content-type, fallback to HTTP mode + // missing HTTP status + header: []string{ + "content-type", "text/html", + }, + errCode: codes.Internal, + }, + { + // non-grpc content-type, fallback to HTTP mode + // malformed HTTP status + header: []string{ + ":status", "abc", + "content-type", "text/html", + }, + errCode: codes.Internal, + }, + { + // gRPC mode, but missing gRPC status (trailer, as EndStream is set). + header: []string{ + ":status", "403", + "content-type", "application/grpc", + }, + errCode: codes.Unknown, + }, + { + // gRPC mode, but missing gRPC status (trailer, as EndStream is set). + header: []string{ + ":status", "403", + "content-type", "application/grpc", + }, + errCode: codes.Unknown, + }, + { + // gRPC mode, with malformed grpc-status. + header: []string{ + ":status", "502", + "content-type", "application/grpc", + "grpc-status", "abc", + }, + errCode: codes.Internal, + }, + { + // gRPC mode, with malformed grpc-tags-bin field. + header: []string{ + ":status", "502", + "content-type", "application/grpc", + "grpc-status", "0", + "grpc-tags-bin", "???", + }, + errCode: codes.Internal, + }, + { + header: []string{ + ":status", "502", + "content-type", "application/grpc", + "grpc-status", "3", + }, + errCode: codes.InvalidArgument, + }, + } { + cc := te.clientConn() + tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want ", err) + } + + // do a round trip of request and response to make sure the stream is up. + const smallSize = 1 + smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) + if err != nil { + t.Fatal(err) + } + + sreq := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseParameters: []*testpb.ResponseParameters{ + {Size: smallSize}, + }, + Payload: smallPayload, + } + + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = %v, want ", stream, err) + } + + rcw := lis.getLastConn() + hdr := http2.HeadersFrameParam{ + StreamID: tc.getCurrentStreamID(), + BlockFragment: rcw.encodeRawHeader(test.header...), + EndStream: true, + EndHeaders: true, + } + if err := rcw.writeHeaders(hdr); err != nil { + log.Fatalf("failed to write header due to %v", err) + } + if _, err := stream.Recv(); err == nil || status.Code(err) != test.errCode { + t.Fatalf("%v.Recv() = %v, want error code to be %v", stream, err, test.errCode) + } + cancel() + cc.Close() + te.cc = nil // clear te.cc to force te.clientConn() to make new connection + } +} diff --git a/test/rawConnWrapper.go b/test/rawConnWrapper.go index 5d991cf013ac..124b10e09e7b 100644 --- a/test/rawConnWrapper.go +++ b/test/rawConnWrapper.go @@ -227,6 +227,47 @@ func (rcw *rawConnWrapper) encodeHeaderField(k, v string) error { return nil } +// encodeRawHeader is for usage on both client and server side to construct header based on the input +// key, value pairs. +func (rcw *rawConnWrapper) encodeRawHeader(headers ...string) []byte { + if len(headers)%2 == 1 { + panic("odd number of kv args") + } + + rcw.headerBuf.Reset() + + pseudoCount := map[string]int{} + var keys []string + vals := map[string][]string{} + + for len(headers) > 0 { + k, v := headers[0], headers[1] + headers = headers[2:] + if _, ok := vals[k]; !ok { + keys = append(keys, k) + } + if strings.HasPrefix(k, ":") { + pseudoCount[k]++ + if pseudoCount[k] == 1 { + vals[k] = []string{v} + } else { + // Allows testing of invalid headers w/ dup pseudo fields. + vals[k] = append(vals[k], v) + } + } else { + vals[k] = append(vals[k], v) + } + } + for _, k := range keys { + for _, v := range vals[k] { + rcw.encodeHeaderField(k, v) + } + } + return rcw.headerBuf.Bytes() +} + +// encodeHeader is for usage on client side to write request header. +// // encodeHeader encodes headers and returns their HPACK bytes. headers // must contain an even number of key/value pairs. There may be // multiple pairs for keys (e.g. "cookie"). The :method, :path, and @@ -288,6 +329,7 @@ func (rcw *rawConnWrapper) encodeHeader(headers ...string) []byte { return rcw.headerBuf.Bytes() } +// writeHeadersGRPC is for usage on client side to write request header. func (rcw *rawConnWrapper) writeHeadersGRPC(streamID uint32, path string) { rcw.writeHeaders(http2.HeadersFrameParam{ StreamID: streamID, From 824a7c0f28c73657f765cac39215b99d7bd00afe Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 25 Jan 2019 11:14:10 -0800 Subject: [PATCH 2/7] try fix --- internal/transport/http2_client.go | 5 +- internal/transport/http2_server.go | 2 +- internal/transport/http_util.go | 12 +- internal/transport/transport_test.go | 163 --------------------------- test/end2end_test.go | 137 ++++++++++++++-------- 5 files changed, 105 insertions(+), 214 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e0037d5ba2a4..3366ab2181f4 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1142,7 +1142,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } atomic.StoreUint32(&s.bytesReceived, 1) var state decodeState - if err := state.decodeHeader(frame); err != nil { + // If HEADER frame has been received before, then this header frame is a Trailer. + // isTrailer should be false if the HEADER frame is Response-Headers or Trailers-Only. + isTrailer := atomic.LoadUint32(&s.headerDone) == 1 + if err := state.decodeHeader(frame, isTrailer); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, false) // Something wrong. Stops reading even when there is remaining. return diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 2b996f641416..85533761b469 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -287,7 +287,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { streamID := frame.Header().StreamID state := decodeState{serverSide: true} - if err := state.decodeHeader(frame); err != nil { + if err := state.decodeHeader(frame, false); err != nil { if se, ok := status.FromError(err); ok { t.controlBuf.put(&cleanupStream{ streamID: streamID, diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index bcaf1ce0c438..5eaf73d55c74 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -238,7 +238,7 @@ func decodeMetadataHeader(k, v string) (string, error) { return v, nil } -func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { +func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool) error { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. if frame.Truncated { @@ -252,8 +252,12 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { // are in HTTP fallback mode, and should handle error specific to HTTP. var isGRPC bool var grpcErr, httpErr, contentTypeErr error + var str string for _, hf := range frame.Fields { err := d.processHeaderField(hf) + str += hf.Name + str += ":" + str += " " + hf.Value + " " switch hf.Name { case "content-type": if err == nil { @@ -272,7 +276,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { } // gRPC mode - if isGRPC { + if isGRPC || isTrailer { if grpcErr != nil { return grpcErr } @@ -280,7 +284,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { return nil } if d.rawStatusCode == nil && d.statusGen == nil { - // gRPC status doesn't exist and content-type indicates gRPC peer. + // gRPC status doesn't exist. // Set rawStatusCode to be unknown and return nil error. // So that, if the stream has ended this Unknown status // will be propagated to the user. @@ -309,7 +313,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { } } - return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)) + return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)+" "+str) } // constructErrMsg constructs error message to be returned in HTTP fallback mode. diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index e3857356a2c1..baea6befdcde 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -19,7 +19,6 @@ package transport import ( - "bufio" "bytes" "context" "encoding/binary" @@ -28,7 +27,6 @@ import ( "io" "math" "net" - "net/http" "reflect" "runtime" "strconv" @@ -1943,167 +1941,6 @@ func waitWhileTrue(t *testing.T, condition func() (bool, error)) { } } -// A function of type writeHeaders writes out -// http status with the given stream ID using the given framer. -type writeHeaders func(*http2.Framer, uint32, int) error - -func writeOneHeader(framer *http2.Framer, sid uint32, httpStatus int) error { - var buf bytes.Buffer - henc := hpack.NewEncoder(&buf) - henc.WriteField(hpack.HeaderField{Name: ":status", Value: fmt.Sprint(httpStatus)}) - return framer.WriteHeaders(http2.HeadersFrameParam{ - StreamID: sid, - BlockFragment: buf.Bytes(), - EndStream: true, - EndHeaders: true, - }) -} - -func writeTwoHeaders(framer *http2.Framer, sid uint32, httpStatus int) error { - var buf bytes.Buffer - henc := hpack.NewEncoder(&buf) - henc.WriteField(hpack.HeaderField{ - Name: ":status", - Value: fmt.Sprint(http.StatusOK), - }) - if err := framer.WriteHeaders(http2.HeadersFrameParam{ - StreamID: sid, - BlockFragment: buf.Bytes(), - EndHeaders: true, - }); err != nil { - return err - } - buf.Reset() - henc.WriteField(hpack.HeaderField{ - Name: ":status", - Value: fmt.Sprint(httpStatus), - }) - return framer.WriteHeaders(http2.HeadersFrameParam{ - StreamID: sid, - BlockFragment: buf.Bytes(), - EndStream: true, - EndHeaders: true, - }) -} - -type httpServer struct { - httpStatus int - wh writeHeaders -} - -func (s *httpServer) start(t *testing.T, lis net.Listener) { - // Launch an HTTP server to send back header with httpStatus. - go func() { - conn, err := lis.Accept() - if err != nil { - t.Errorf("Error accepting connection: %v", err) - return - } - defer conn.Close() - // Read preface sent by client. - if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil { - t.Errorf("Error at server-side while reading preface from client. Err: %v", err) - return - } - reader := bufio.NewReaderSize(conn, defaultWriteBufSize) - writer := bufio.NewWriterSize(conn, defaultReadBufSize) - framer := http2.NewFramer(writer, reader) - if err = framer.WriteSettingsAck(); err != nil { - t.Errorf("Error at server-side while sending Settings ack. Err: %v", err) - return - } - var sid uint32 - // Read frames until a header is received. - for { - frame, err := framer.ReadFrame() - if err != nil { - t.Errorf("Error at server-side while reading frame. Err: %v", err) - return - } - if hframe, ok := frame.(*http2.HeadersFrame); ok { - sid = hframe.Header().StreamID - break - } - } - if err = s.wh(framer, sid, s.httpStatus); err != nil { - t.Errorf("Error at server-side while writing headers. Err: %v", err) - return - } - writer.Flush() - }() -} - -func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (*Stream, func()) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to listen. Err: %v", err) - } - server := &httpServer{ - httpStatus: httpStatus, - wh: wh, - } - server.start(t, lis) - connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - defer cancel() - client, err := newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {}) - if err != nil { - lis.Close() - t.Fatalf("Error creating client. Err: %v", err) - } - stream, err := client.NewStream(context.Background(), &CallHdr{Method: "bogus/method"}) - if err != nil { - client.Close() - lis.Close() - t.Fatalf("Error creating stream at client-side. Err: %v", err) - } - return stream, func() { - client.Close() - lis.Close() - } -} - -func TestHTTPToGRPCStatusMapping(t *testing.T) { - for k := range httpStatusConvTab { - testHTTPToGRPCStatusMapping(t, k, writeOneHeader) - } -} - -func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) { - stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh) - defer cleanUp() - want := httpStatusConvTab[httpStatus] - buf := make([]byte, 8) - _, err := stream.Read(buf) - if err == nil { - t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want) - } - serr, ok := status.FromError(err) - if !ok { - t.Fatalf("err.(Type) = %T, want status error", err) - } - if want != serr.Code() { - t.Fatalf("Want error code: %v, got: %v", want, serr.Code()) - } -} - -func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) { - stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader) - defer cleanUp() - buf := make([]byte, 8) - _, err := stream.Read(buf) - if err != io.EOF { - t.Fatalf("stream.Read(_) = _, %v, want _, io.EOF", err) - } - want := codes.Unknown - if stream.status.Code() != want { - t.Fatalf("Status code of stream: %v, want: %v", stream.status.Code(), want) - } -} - -func TestHTTPStatusNottOKAndMissingGRPCStatusInSecondHeader(t *testing.T) { - testHTTPToGRPCStatusMapping(t, http.StatusUnauthorized, writeTwoHeaders) -} - // If any error occurs on a call to Stream.Read, future calls // should continue to return that same error. func TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) { diff --git a/test/end2end_test.go b/test/end2end_test.go index b1e0683134a7..d3781d0502b1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -22,6 +22,7 @@ package test import ( + "bufio" "bytes" "context" "crypto/tls" @@ -29,7 +30,6 @@ import ( "flag" "fmt" "io" - "log" "math" "net" "net/http" @@ -43,6 +43,8 @@ import ( "testing" "time" + "golang.org/x/net/http2/hpack" + "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" "golang.org/x/net/http2" @@ -150,7 +152,6 @@ type testServer struct { earlyFail bool // whether to error out the execution of a service handler prematurely. setAndSendHeader bool // whether to call setHeader and sendHeader. setHeaderOnly bool // whether to only call setHeader, not sendHeader. - notSetAndNotSend bool // whether to not set and send header. multipleSetTrailer bool // whether to call setTrailer multiple times. unaryCallSleepTime time.Duration } @@ -7129,11 +7130,6 @@ func (s) TestRPCWaitsForResolver(t *testing.T) { } func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { - e := tcpClearRREnv - te := newTest(t, e) - lis := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) - defer te.tearDown() - for _, test := range []struct { header []string errCode codes.Code @@ -7231,51 +7227,102 @@ func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { errCode: codes.InvalidArgument, }, } { - cc := te.clientConn() - tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - stream, err := tc.FullDuplexCall(ctx) - if err != nil { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want ", err) - } + hTTPStatusTest(t, test.header, test.errCode) + } +} - // do a round trip of request and response to make sure the stream is up. - const smallSize = 1 - smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) - if err != nil { - t.Fatal(err) - } +type httpServer struct { + headerFields []string +} - sreq := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseParameters: []*testpb.ResponseParameters{ - {Size: smallSize}, - }, - Payload: smallPayload, - } +func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string) error { + if len(headerFields)%2 == 1 { + panic("odd number of kv args") + } + + var buf bytes.Buffer + henc := hpack.NewEncoder(&buf) + for len(headerFields) > 0 { + k, v := headerFields[0], headerFields[1] + headerFields = headerFields[2:] + henc.WriteField(hpack.HeaderField{Name: k, Value: v}) + } + + return framer.WriteHeaders(http2.HeadersFrameParam{ + StreamID: sid, + BlockFragment: buf.Bytes(), + EndStream: true, + EndHeaders: true, + }) +} - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) +func (s *httpServer) start(t *testing.T, lis net.Listener) { + // Launch an HTTP server to send back header with httpStatus. + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Error accepting connection: %v", err) + return } - if _, err := stream.Recv(); err != nil { - t.Fatalf("%v.Recv() = %v, want ", stream, err) + defer conn.Close() + // Read preface sent by client. + if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil { + t.Errorf("Error at server-side while reading preface from client. Err: %v", err) + return } - - rcw := lis.getLastConn() - hdr := http2.HeadersFrameParam{ - StreamID: tc.getCurrentStreamID(), - BlockFragment: rcw.encodeRawHeader(test.header...), - EndStream: true, - EndHeaders: true, + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + framer := http2.NewFramer(writer, reader) + if err = framer.WriteSettingsAck(); err != nil { + t.Errorf("Error at server-side while sending Settings ack. Err: %v", err) + return } - if err := rcw.writeHeaders(hdr); err != nil { - log.Fatalf("failed to write header due to %v", err) + writer.Flush() // necessary since client is expecting preface before declaring connection fully setup. + + var sid uint32 + // Read frames until a header is received. + for { + frame, err := framer.ReadFrame() + if err != nil { + t.Errorf("Error at server-side while reading frame. Err: %v", err) + return + } + if hframe, ok := frame.(*http2.HeadersFrame); ok { + sid = hframe.Header().StreamID + break + } } - if _, err := stream.Recv(); err == nil || status.Code(err) != test.errCode { - t.Fatalf("%v.Recv() = %v, want error code to be %v", stream, err, test.errCode) + if err = s.writeHeader(framer, sid, s.headerFields); err != nil { + t.Errorf("Error at server-side while writing headers. Err: %v", err) + return } - cancel() - cc.Close() - te.cc = nil // clear te.cc to force te.clientConn() to make new connection + writer.Flush() + }() +} + +func hTTPStatusTest(t *testing.T, headerFields []string, errCode codes.Code) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen. Err: %v", err) + } + defer lis.Close() + server := &httpServer{ + headerFields: headerFields, + } + server.start(t, lis) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + cc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("failed to dial due to err: %v", err) + } + defer cc.Close() + client := testpb.NewTestServiceClient(cc) + stream, err := client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("error creating stream due to err: %v", err) + } + if _, err := stream.Recv(); err == nil || status.Code(err) != errCode { + t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) } } From 5ccd80f36f858ee88dd4a8528ce222ee86b53a0e Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 28 Jan 2019 15:11:39 -0800 Subject: [PATCH 3/7] fixed --- internal/transport/http2_client.go | 4 +- internal/transport/http_util.go | 8 +- test/end2end_test.go | 175 +++++++++++++++++------------ 3 files changed, 106 insertions(+), 81 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3366ab2181f4..cd7a66a9726e 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1142,8 +1142,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } atomic.StoreUint32(&s.bytesReceived, 1) var state decodeState - // If HEADER frame has been received before, then this header frame is a Trailer. - // isTrailer should be false if the HEADER frame is Response-Headers or Trailers-Only. + // If HEADER frame has been received before, then this header frame contains a Trailer. + // isTrailer should be false if the HEADER frame contains Response-Headers or Trailers-Only. isTrailer := atomic.LoadUint32(&s.headerDone) == 1 if err := state.decodeHeader(frame, isTrailer); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, false) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 5eaf73d55c74..b6f8ba42153e 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -252,12 +252,8 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool // are in HTTP fallback mode, and should handle error specific to HTTP. var isGRPC bool var grpcErr, httpErr, contentTypeErr error - var str string for _, hf := range frame.Fields { err := d.processHeaderField(hf) - str += hf.Name - str += ":" - str += " " + hf.Value + " " switch hf.Name { case "content-type": if err == nil { @@ -267,7 +263,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool contentTypeErr = err } case ":status": - httpErr = err // In gRPC mode, we don't care about HTTP field parsing error. + httpErr = err // In gRPC mode, we don't care about HTTP field parsing error, so we store it separately. default: if err != nil && grpcErr == nil { grpcErr = err // store the first encountered gRPC field parsing error. @@ -313,7 +309,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool } } - return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)+" "+str) + return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)) } // constructErrMsg constructs error message to be returned in HTTP fallback mode. diff --git a/test/end2end_test.go b/test/end2end_test.go index d3781d0502b1..04141f3d6541 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -43,11 +43,10 @@ import ( "testing" "time" - "golang.org/x/net/http2/hpack" - "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" @@ -7129,70 +7128,63 @@ func (s) TestRPCWaitsForResolver(t *testing.T) { } } -func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { +// A copy from http_util.go for testing. +var httpStatusConvTab = map[int]codes.Code{ + // 400 Bad Request - INTERNAL. + http.StatusBadRequest: codes.Internal, + // 401 Unauthorized - UNAUTHENTICATED. + http.StatusUnauthorized: codes.Unauthenticated, + // 403 Forbidden - PERMISSION_DENIED. + http.StatusForbidden: codes.PermissionDenied, + // 404 Not Found - UNIMPLEMENTED. + http.StatusNotFound: codes.Unimplemented, + // 429 Too Many Requests - UNAVAILABLE. + http.StatusTooManyRequests: codes.Unavailable, + // 502 Bad Gateway - UNAVAILABLE. + http.StatusBadGateway: codes.Unavailable, + // 503 Service Unavailable - UNAVAILABLE. + http.StatusServiceUnavailable: codes.Unavailable, + // 504 Gateway timeout - UNAVAILABLE. + http.StatusGatewayTimeout: codes.Unavailable, +} + +func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { + // Non-gRPC content-type fallback path. + for httpCode := range httpStatusConvTab { + doHTTPHeaderTest(t, httpStatusConvTab[int(httpCode)], []string{ + ":status", fmt.Sprintf("%d", httpCode), + "content-type", "text/html", // non-gRPC content type to switch to HTTP mode. + "grpc-status", "1", // Make up a gRPC status error + "grpc-status-details-bin", "???", // Make up a gRPC field parsing error + }) + } + + // Missing content-type fallback path. + for httpCode := range httpStatusConvTab { + doHTTPHeaderTest(t, httpStatusConvTab[int(httpCode)], []string{ + ":status", fmt.Sprintf("%d", httpCode), + // Omitting content type to switch to HTTP mode. + "grpc-status", "1", // Make up a gRPC status error + "grpc-status-details-bin", "???", // Make up a gRPC field parsing error + }) + } + + // Malformed HTTP status when fallback. + doHTTPHeaderTest(t, codes.Internal, []string{ + ":status", "abc", + // Omitting content type to switch to HTTP mode. + "grpc-status", "1", // Make up a gRPC status error + "grpc-status-details-bin", "???", // Make up a gRPC field parsing error + }) +} + +func (s) TestHTTPHeaderFrameErrorHandlingGRPCNonTrailer(t *testing.T) { for _, test := range []struct { header []string errCode codes.Code }{ { - header: []string{ - // non-grpc content-type, fallback to HTTP mode - ":status", "400", - "content-type", "text/html", - }, - errCode: codes.Internal, - }, - { - header: []string{ - // non-grpc content-type, fallback to HTTP mode - ":status", "401", - "content-type", "text/html", - }, - errCode: codes.Unauthenticated, - }, - { - header: []string{ - // non-grpc content-type, fallback to HTTP mode - ":status", "404", - "content-type", "text/html", - }, - errCode: codes.Unimplemented, - }, - { - header: []string{ - // non-grpc content-type, fallback to HTTP mode - ":status", "502", - "content-type", "text/html", - }, - errCode: codes.Unavailable, - }, - { - // non-grpc content-type, fallback to HTTP mode - // missing HTTP status - header: []string{ - "content-type", "text/html", - }, - errCode: codes.Internal, - }, - { - // non-grpc content-type, fallback to HTTP mode - // malformed HTTP status - header: []string{ - ":status", "abc", - "content-type", "text/html", - }, - errCode: codes.Internal, - }, - { - // gRPC mode, but missing gRPC status (trailer, as EndStream is set). - header: []string{ - ":status", "403", - "content-type", "application/grpc", - }, - errCode: codes.Unknown, - }, - { - // gRPC mode, but missing gRPC status (trailer, as EndStream is set). + // missing gRPC status. header: []string{ ":status", "403", "content-type", "application/grpc", @@ -7200,7 +7192,7 @@ func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { errCode: codes.Unknown, }, { - // gRPC mode, with malformed grpc-status. + // malformed grpc-status. header: []string{ ":status", "502", "content-type", "application/grpc", @@ -7209,7 +7201,7 @@ func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { errCode: codes.Internal, }, { - // gRPC mode, with malformed grpc-tags-bin field. + // Malformed grpc-tags-bin field. header: []string{ ":status", "502", "content-type", "application/grpc", @@ -7219,6 +7211,7 @@ func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { errCode: codes.Internal, }, { + // gRPC status error. header: []string{ ":status", "502", "content-type", "application/grpc", @@ -7227,15 +7220,49 @@ func (s) TestHTTPHeaderFrameErrorHandling(t *testing.T) { errCode: codes.InvalidArgument, }, } { - hTTPStatusTest(t, test.header, test.errCode) + doHTTPHeaderTest(t, test.errCode, test.header) + } +} + +func (s) TestHTTPHeaderFrameErrorHandlingTrailer(t *testing.T) { + for _, test := range []struct { + responseHeader []string + trailer []string + errCode codes.Code + }{ + { + responseHeader: []string{ + ":status", "200", + "content-type", "application/grpc", + }, + trailer: []string{ + // trailer missing grpc-status + ":status", "502", + }, + errCode: codes.Unknown, + }, + { + responseHeader: []string{ + ":status", "404", + "content-type", "application/grpc", + }, + trailer: []string{ + // malformed grpc-status-details-bin field + "grpc-status", "0", + "grpc-status-details-bin", "????", + }, + errCode: codes.Internal, + }, + } { + doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer) } } type httpServer struct { - headerFields []string + headerFields [][]string } -func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string) error { +func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error { if len(headerFields)%2 == 1 { panic("odd number of kv args") } @@ -7251,13 +7278,13 @@ func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields return framer.WriteHeaders(http2.HeadersFrameParam{ StreamID: sid, BlockFragment: buf.Bytes(), - EndStream: true, + EndStream: endStream, EndHeaders: true, }) } func (s *httpServer) start(t *testing.T, lis net.Listener) { - // Launch an HTTP server to send back header with httpStatus. + // Launch an HTTP server to send back header. go func() { conn, err := lis.Accept() if err != nil { @@ -7292,15 +7319,17 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) { break } } - if err = s.writeHeader(framer, sid, s.headerFields); err != nil { - t.Errorf("Error at server-side while writing headers. Err: %v", err) - return + for i, headers := range s.headerFields { + if err = s.writeHeader(framer, sid, headers, i == len(s.headerFields)-1); err != nil { + t.Errorf("Error at server-side while writing headers. Err: %v", err) + return + } + writer.Flush() } - writer.Flush() }() } -func hTTPStatusTest(t *testing.T, headerFields []string, errCode codes.Code) { +func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to listen. Err: %v", err) From 3b71bc340545961226bce8d214d9393a6792cd2d Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 28 Jan 2019 18:30:52 -0800 Subject: [PATCH 4/7] fix some offline comments --- internal/transport/http2_client.go | 15 ++-- internal/transport/http2_server.go | 26 +++---- internal/transport/http_util.go | 113 ++++++++++++++++++----------- 3 files changed, 88 insertions(+), 66 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index cd7a66a9726e..200367080453 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1141,11 +1141,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } atomic.StoreUint32(&s.bytesReceived, 1) - var state decodeState - // If HEADER frame has been received before, then this header frame contains a Trailer. - // isTrailer should be false if the HEADER frame contains Response-Headers or Trailers-Only. - isTrailer := atomic.LoadUint32(&s.headerDone) == 1 - if err := state.decodeHeader(frame, isTrailer); err != nil { + state := decodeState{isTrailer: atomic.LoadUint32(&s.headerDone) == 1} + if err := state.decodeHeader(frame); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, false) // Something wrong. Stops reading even when there is remaining. return @@ -1178,9 +1175,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = state.encoding - if len(state.mdata) > 0 { - s.header = state.mdata + s.recvCompress = state.data.encoding + if len(state.data.mdata) > 0 { + s.header = state.data.mdata } } else { s.noHeaders = true @@ -1192,7 +1189,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive - t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true) + t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) } // reader runs as a separate goroutine in charge of reading data from network diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 85533761b469..9efd6071ebce 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -287,7 +287,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { streamID := frame.Header().StreamID state := decodeState{serverSide: true} - if err := state.decodeHeader(frame, false); err != nil { + if err := state.decodeHeader(frame); err != nil { if se, ok := status.FromError(err); ok { t.controlBuf.put(&cleanupStream{ streamID: streamID, @@ -305,16 +305,16 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( st: t, buf: buf, fc: &inFlow{limit: uint32(t.initialWindowSize)}, - recvCompress: state.encoding, - method: state.method, - contentSubtype: state.contentSubtype, + recvCompress: state.data.encoding, + method: state.data.method, + contentSubtype: state.data.contentSubtype, } if frame.StreamEnded() { // s is just created by the caller. No lock needed. s.state = streamReadDone } - if state.timeoutSet { - s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) + if state.data.timeoutSet { + s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) } else { s.ctx, s.cancel = context.WithCancel(t.ctx) } @@ -327,19 +327,19 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } s.ctx = peer.NewContext(s.ctx, pr) // Attach the received metadata to the context. - if len(state.mdata) > 0 { - s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) + if len(state.data.mdata) > 0 { + s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata) } - if state.statsTags != nil { - s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) + if state.data.statsTags != nil { + s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags) } - if state.statsTrace != nil { - s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) + if state.data.statsTrace != nil { + s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) } if t.inTapHandle != nil { var err error info := &tap.Info{ - FullMethodName: state.method, + FullMethodName: state.data.method, } s.ctx, err = t.inTapHandle(s.ctx, info) if err != nil { diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index b6f8ba42153e..e51fe3655f09 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -98,9 +98,7 @@ var ( } ) -// Records the states during HPACK decoding. Must be reset once the -// decoding of the entire headers are finished. -type decodeState struct { +type parsedHeaderData struct { encoding string // statusGen caches the stream status received from the trailer the server // sent. Client side only. Do not access directly. After all trailers are @@ -120,8 +118,21 @@ type decodeState struct { statsTags []byte statsTrace []byte contentSubtype string +} + +// Records the states during HPACK decoding. Must be reset once the +// decoding of the entire headers are finished. +type decodeState struct { // whether decoding on server side or not serverSide bool + // whether the header is Trailers or not (Note Trailers != Trailers-Only, see + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md for definition). + // For server, this field is always false. + isTrailer bool + + // data struct will be filled with info parsed from HTTP HEADERS frame once decodeHeader function + // has been invoked and returned. + data parsedHeaderData } // isReservedHeader checks whether hdr belongs to HTTP2 headers @@ -202,11 +213,11 @@ func contentType(contentSubtype string) string { } func (d *decodeState) status() *status.Status { - if d.statusGen == nil { + if d.data.statusGen == nil { // No status-details were provided; generate status using code/msg. - d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg) + d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) } - return d.statusGen + return d.data.statusGen } const binHdrSuffix = "-bin" @@ -238,7 +249,7 @@ func decodeMetadataHeader(k, v string) (string, error) { return v, nil } -func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool) error { +func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. if frame.Truncated { @@ -246,40 +257,54 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool } // isGRPC indicates whether the peer is speaking gRPC (otherwise HTTP). - // If the header contains valid a content-type, i.e. a string starts with "application/grpc", then - // we are in gRPC mode and should handle gRPC specific error. + // + // We are in gRPC mode (peer speaking gRPC) if: + // * We are client side and have already received a HEADER frame that indicates gRPC peer. + // * The header contains valid a content-type, i.e. a string starts with "application/grpc" + // And we should handle error specific to gRPC. + // // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we // are in HTTP fallback mode, and should handle error specific to HTTP. - var isGRPC bool + // + // d.isTrailer is only set on client side after a gRPC ResponseHeader has been received (indicating + // peer speaking gRPC). Therefore, we can initialized isGRPC to d.isTrailer. + isGRPC := d.isTrailer var grpcErr, httpErr, contentTypeErr error for _, hf := range frame.Fields { - err := d.processHeaderField(hf) - switch hf.Name { - case "content-type": - if err == nil { - // gRPC mode - isGRPC = true - } else { + if hf.Name != "content-type" && hf.Name != ":status" && grpcErr != nil { + // if we've already encountered grpc related field parsing error, then we skip processing + // all following grpc related field. + continue + } + + if err := d.processHeaderField(hf); err != nil { + switch hf.Name { + case "content-type": contentTypeErr = err - } - case ":status": - httpErr = err // In gRPC mode, we don't care about HTTP field parsing error, so we store it separately. - default: - if err != nil && grpcErr == nil { + case ":status": + httpErr = err // In gRPC mode, we don't care about HTTP field parsing error, so we store it separately. + default: grpcErr = err // store the first encountered gRPC field parsing error. } + continue + } + + // we got a valid content-type that starts with "applicatin/grpc", so we are operating in grpc + // mode. + if hf.Name == "content-type" { + isGRPC = true } } - // gRPC mode - if isGRPC || isTrailer { + // + if isGRPC { if grpcErr != nil { return grpcErr } if d.serverSide { return nil } - if d.rawStatusCode == nil && d.statusGen == nil { + if d.data.rawStatusCode == nil && d.data.statusGen == nil { // gRPC status doesn't exist. // Set rawStatusCode to be unknown and return nil error. // So that, if the stream has ended this Unknown status @@ -287,7 +312,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool // Otherwise, it will be ignored. In which case, status from // a later trailer, that has StreamEnded flag set, is propagated. code := int(codes.Unknown) - d.rawStatusCode = &code + d.data.rawStatusCode = &code } return nil } @@ -302,8 +327,8 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool ok bool ) - if d.httpStatus != nil { - code, ok = httpStatusConvTab[*(d.httpStatus)] + if d.data.httpStatus != nil { + code, ok = httpStatusConvTab[*(d.data.httpStatus)] if !ok { code = codes.Unknown } @@ -317,10 +342,10 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame, isTrailer bool func (d *decodeState) constructHTTPErrMsg(contentTypeErr error) string { var errMsgs []string - if d.httpStatus == nil { + if d.data.httpStatus == nil { errMsgs = append(errMsgs, "malformed header: in HTTP fallback mode, but doesn't contain HTTP status") } else { - errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.httpStatus)), *d.httpStatus)) + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) } if contentTypeErr == nil { @@ -333,10 +358,10 @@ func (d *decodeState) constructHTTPErrMsg(contentTypeErr error) string { } func (d *decodeState) addMetadata(k, v string) { - if d.mdata == nil { - d.mdata = make(map[string][]string) + if d.data.mdata == nil { + d.data.mdata = make(map[string][]string) } - d.mdata[k] = append(d.mdata[k], v) + d.data.mdata[k] = append(d.data.mdata[k], v) } func (d *decodeState) processHeaderField(f hpack.HeaderField) error { @@ -346,22 +371,22 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { if !validContentType { return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) } - d.contentSubtype = contentSubtype + d.data.contentSubtype = contentSubtype // TODO: do we want to propagate the whole content-type in the metadata, // or come up with a way to just propagate the content-subtype if it was set? // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} // in the metadata? d.addMetadata(f.Name, f.Value) case "grpc-encoding": - d.encoding = f.Value + d.data.encoding = f.Value case "grpc-status": code, err := strconv.Atoi(f.Value) if err != nil { return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) } - d.rawStatusCode = &code + d.data.rawStatusCode = &code case "grpc-message": - d.rawStatusMsg = decodeGrpcMessage(f.Value) + d.data.rawStatusMsg = decodeGrpcMessage(f.Value) case "grpc-status-details-bin": v, err := decodeBinHeader(f.Value) if err != nil { @@ -371,34 +396,34 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { if err := proto.Unmarshal(v, s); err != nil { return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) } - d.statusGen = status.FromProto(s) + d.data.statusGen = status.FromProto(s) case "grpc-timeout": - d.timeoutSet = true + d.data.timeoutSet = true var err error - if d.timeout, err = decodeTimeout(f.Value); err != nil { + if d.data.timeout, err = decodeTimeout(f.Value); err != nil { return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err) } case ":path": - d.method = f.Value + d.data.method = f.Value case ":status": code, err := strconv.Atoi(f.Value) if err != nil { return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) } - d.httpStatus = &code + d.data.httpStatus = &code case "grpc-tags-bin": v, err := decodeBinHeader(f.Value) if err != nil { return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) } - d.statsTags = v + d.data.statsTags = v d.addMetadata(f.Name, string(v)) case "grpc-trace-bin": v, err := decodeBinHeader(f.Value) if err != nil { return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) } - d.statsTrace = v + d.data.statsTrace = v d.addMetadata(f.Name, string(v)) default: if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) { From 291add43a9e406c8b188893288bed06096260ee7 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Tue, 29 Jan 2019 16:41:56 -0800 Subject: [PATCH 5/7] include logic to check more than two HEADERS frames --- internal/transport/http2_client.go | 32 ++++++++++++++++++++---------- internal/transport/http2_server.go | 2 +- internal/transport/http_util.go | 24 +++++++++++++++------- internal/transport/transport.go | 3 +-- test/end2end_test.go | 14 +++++++++++-- 5 files changed, 52 insertions(+), 23 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 200367080453..61b077450872 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1140,15 +1140,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if !ok { return } + endStream := frame.StreamEnded() atomic.StoreUint32(&s.bytesReceived, 1) - state := decodeState{isTrailer: atomic.LoadUint32(&s.headerDone) == 1} + initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0 + + if !initialHeader && !endStream { + // As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear + // at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set. + st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream") + t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false) + return + } + + state := newDecodeState(false, !initialHeader) if err := state.decodeHeader(frame); err != nil { - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, false) - // Something wrong. Stops reading even when there is remaining. + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } - endStream := frame.StreamEnded() var isHeader bool defer func() { if t.statsHandler != nil { @@ -1167,10 +1176,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } } }() + // If headers haven't been received yet. - if atomic.SwapUint32(&s.headerDone, 1) == 0 { + if initialHeader { if !endStream { - // Headers frame is not actually a trailers-only frame. + // Headers frame is ResponseHeader. isHeader = true // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed @@ -1179,14 +1189,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if len(state.data.mdata) > 0 { s.header = state.data.mdata } - } else { - s.noHeaders = true + close(s.headerChan) + return } + // Headers frame is Trailers-only. + s.noHeaders = true close(s.headerChan) } - if !endStream { - return - } + // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 9efd6071ebce..dde68de93679 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -286,7 +286,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err // operateHeader takes action on the decoded headers. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { streamID := frame.Header().StreamID - state := decodeState{serverSide: true} + state := newDecodeState(true, false) if err := state.decodeHeader(frame); err != nil { if se, ok := status.FromError(err); ok { t.controlBuf.put(&cleanupStream{ diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index e51fe3655f09..f5330c2673ad 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -120,15 +120,26 @@ type parsedHeaderData struct { contentSubtype string } +func newDecodeState(serverSide bool, ignoreContentType bool) *decodeState { + return &decodeState{ + serverSide: serverSide, + ignoreContentType: ignoreContentType, + } +} + // Records the states during HPACK decoding. Must be reset once the // decoding of the entire headers are finished. type decodeState struct { // whether decoding on server side or not serverSide bool - // whether the header is Trailers or not (Note Trailers != Trailers-Only, see - // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md for definition). + // ignoreContentType indicates whether when processing the HEADERS frame, ignoring checking the + // content-type is grpc or not. + // + // If we've already received a HEADERS frame which indicates gRPC peer, then we can ignore + // content-type for the following HEADERS frame (i.e. Trailers). + // // For server, this field is always false. - isTrailer bool + ignoreContentType bool // data struct will be filled with info parsed from HTTP HEADERS frame once decodeHeader function // has been invoked and returned. @@ -266,9 +277,9 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we // are in HTTP fallback mode, and should handle error specific to HTTP. // - // d.isTrailer is only set on client side after a gRPC ResponseHeader has been received (indicating - // peer speaking gRPC). Therefore, we can initialized isGRPC to d.isTrailer. - isGRPC := d.isTrailer + // d.ignoreContentType is only set on client side after a gRPC ResponseHeader has been received (indicating + // peer speaking gRPC). Therefore, we can initialized isGRPC to d.ignoreContentType. + isGRPC := d.ignoreContentType var grpcErr, httpErr, contentTypeErr error for _, hf := range frame.Fields { if hf.Name != "content-type" && hf.Name != ":status" && grpcErr != nil { @@ -296,7 +307,6 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { } } - // if isGRPC { if grpcErr != nil { return grpcErr diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 2580aa7d3ba1..e0501c99826a 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -327,8 +327,7 @@ func (s *Stream) TrailersOnly() (bool, error) { if err != nil { return false, err } - // if !headerDone, some other connection error occurred. - return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil + return s.noHeaders, nil } // Trailer returns the cached trailer metedata. Note that if it is not called diff --git a/test/end2end_test.go b/test/end2end_test.go index 04141f3d6541..aee567630776 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -7178,7 +7178,8 @@ func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { }) } -func (s) TestHTTPHeaderFrameErrorHandlingGRPCNonTrailer(t *testing.T) { +// Testing erroneous ReponseHeader or Trailers-only (delivered in the first HEADERS frame). +func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { for _, test := range []struct { header []string errCode codes.Code @@ -7224,7 +7225,8 @@ func (s) TestHTTPHeaderFrameErrorHandlingGRPCNonTrailer(t *testing.T) { } } -func (s) TestHTTPHeaderFrameErrorHandlingTrailer(t *testing.T) { +// Testing non-Trailers-only Trailers (delievered in second HEADERS frame) +func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { for _, test := range []struct { responseHeader []string trailer []string @@ -7258,6 +7260,14 @@ func (s) TestHTTPHeaderFrameErrorHandlingTrailer(t *testing.T) { } } +func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) { + header := []string{ + ":status", "200", + "content-type", "application/grpc", + } + doHTTPHeaderTest(t, codes.Internal, header, header, header) +} + type httpServer struct { headerFields [][]string } From 54a6601aa4e24f36c4cef62f5090a2c308b88049 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Tue, 19 Feb 2019 15:27:56 -0800 Subject: [PATCH 6/7] fix reviews --- internal/transport/http2_client.go | 8 +- internal/transport/http2_server.go | 5 +- internal/transport/http_util.go | 120 +++++++++++++---------------- test/end2end_test.go | 29 ++----- 4 files changed, 68 insertions(+), 94 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 61b077450872..746423ff7d28 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1152,7 +1152,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - state := newDecodeState(false, !initialHeader) + state := &decodeState{ + serverSide: false, + ignoreContentType: !initialHeader, + } + // Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received + // which indicates peer speaking gRPC, we are in gRPC mode. + state.data.isGRPC = !initialHeader if err := state.decodeHeader(frame); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index dde68de93679..19ff6edcc7a5 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -286,7 +286,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err // operateHeader takes action on the decoded headers. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { streamID := frame.Header().StreamID - state := newDecodeState(true, false) + state := &decodeState{ + serverSide: true, + ignoreContentType: false, + } if err := state.decodeHeader(frame); err != nil { if se, ok := status.FromError(err); ok { t.controlBuf.put(&cleanupStream{ diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index f5330c2673ad..ff9b653c2f0e 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -78,7 +78,8 @@ var ( codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm, codes.PermissionDenied: http2.ErrCodeInadequateSecurity, } - httpStatusConvTab = map[int]codes.Code{ + // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table. + HTTPStatusConvTab = map[int]codes.Code{ // 400 Bad Request - INTERNAL. http.StatusBadRequest: codes.Internal, // 401 Unauthorized - UNAUTHENTICATED. @@ -118,31 +119,37 @@ type parsedHeaderData struct { statsTags []byte statsTrace []byte contentSubtype string -} -func newDecodeState(serverSide bool, ignoreContentType bool) *decodeState { - return &decodeState{ - serverSide: serverSide, - ignoreContentType: ignoreContentType, - } + // isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP). + // + // We are in gRPC mode (peer speaking gRPC) if: + // * We are client side and have already received a HEADER frame that indicates gRPC peer. + // * The header contains valid a content-type, i.e. a string starts with "application/grpc" + // And we should handle error specific to gRPC. + // + // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we + // are in HTTP fallback mode, and should handle error specific to HTTP. + isGRPC bool + grpcErr error + httpErr error + contentTypeErr string } -// Records the states during HPACK decoding. Must be reset once the -// decoding of the entire headers are finished. +// decodeState configures decoding criteria and records the decoded data. type decodeState struct { // whether decoding on server side or not serverSide bool // ignoreContentType indicates whether when processing the HEADERS frame, ignoring checking the // content-type is grpc or not. // - // If we've already received a HEADERS frame which indicates gRPC peer, then we can ignore - // content-type for the following HEADERS frame (i.e. Trailers). + // Trailers (after headers) should not have a content-type. And thus we will ignore checking the + // content-type. // // For server, this field is always false. ignoreContentType bool - // data struct will be filled with info parsed from HTTP HEADERS frame once decodeHeader function - // has been invoked and returned. + // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS + // frame once decodeHeader function has been invoked and returned. data parsedHeaderData } @@ -267,49 +274,19 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { return status.Error(codes.Internal, "peer header list size exceeded limit") } - // isGRPC indicates whether the peer is speaking gRPC (otherwise HTTP). - // - // We are in gRPC mode (peer speaking gRPC) if: - // * We are client side and have already received a HEADER frame that indicates gRPC peer. - // * The header contains valid a content-type, i.e. a string starts with "application/grpc" - // And we should handle error specific to gRPC. - // - // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we - // are in HTTP fallback mode, and should handle error specific to HTTP. - // - // d.ignoreContentType is only set on client side after a gRPC ResponseHeader has been received (indicating - // peer speaking gRPC). Therefore, we can initialized isGRPC to d.ignoreContentType. - isGRPC := d.ignoreContentType - var grpcErr, httpErr, contentTypeErr error for _, hf := range frame.Fields { - if hf.Name != "content-type" && hf.Name != ":status" && grpcErr != nil { + if hf.Name != "content-type" && hf.Name != ":status" && d.data.grpcErr != nil { // if we've already encountered grpc related field parsing error, then we skip processing // all following grpc related field. continue } - if err := d.processHeaderField(hf); err != nil { - switch hf.Name { - case "content-type": - contentTypeErr = err - case ":status": - httpErr = err // In gRPC mode, we don't care about HTTP field parsing error, so we store it separately. - default: - grpcErr = err // store the first encountered gRPC field parsing error. - } - continue - } - - // we got a valid content-type that starts with "applicatin/grpc", so we are operating in grpc - // mode. - if hf.Name == "content-type" { - isGRPC = true - } + d.processHeaderField(hf) } - if isGRPC { - if grpcErr != nil { - return grpcErr + if d.data.isGRPC { + if d.data.grpcErr != nil { + return d.data.grpcErr } if d.serverSide { return nil @@ -328,8 +305,8 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { } // HTTP fallback mode - if httpErr != nil { - return httpErr + if d.data.httpErr != nil { + return d.data.httpErr } var ( @@ -338,33 +315,33 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { ) if d.data.httpStatus != nil { - code, ok = httpStatusConvTab[*(d.data.httpStatus)] + code, ok = HTTPStatusConvTab[*(d.data.httpStatus)] if !ok { code = codes.Unknown } } - return status.Error(code, d.constructHTTPErrMsg(contentTypeErr)) + return status.Error(code, d.constructHTTPErrMsg()) } // constructErrMsg constructs error message to be returned in HTTP fallback mode. // Format: HTTP status code and its corresponding message + content-type error message. -func (d *decodeState) constructHTTPErrMsg(contentTypeErr error) string { +func (d *decodeState) constructHTTPErrMsg() string { var errMsgs []string if d.data.httpStatus == nil { - errMsgs = append(errMsgs, "malformed header: in HTTP fallback mode, but doesn't contain HTTP status") + errMsgs = append(errMsgs, "malformed header: missing HTTP status") } else { errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) } - if contentTypeErr == nil { + if d.data.contentTypeErr == "" { errMsgs = append(errMsgs, "transport: missing content-type field") } else { - errMsgs = append(errMsgs, status.Convert(contentTypeErr).Message()) + errMsgs = append(errMsgs, d.data.contentTypeErr) } - return strings.Join(errMsgs, "\t") + return strings.Join(errMsgs, "; ") } func (d *decodeState) addMetadata(k, v string) { @@ -374,12 +351,13 @@ func (d *decodeState) addMetadata(k, v string) { d.data.mdata[k] = append(d.data.mdata[k], v) } -func (d *decodeState) processHeaderField(f hpack.HeaderField) error { +func (d *decodeState) processHeaderField(f hpack.HeaderField) { switch f.Name { case "content-type": contentSubtype, validContentType := contentSubtype(f.Value) if !validContentType { - return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) + d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value) + return } d.data.contentSubtype = contentSubtype // TODO: do we want to propagate the whole content-type in the metadata, @@ -387,12 +365,14 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} // in the metadata? d.addMetadata(f.Name, f.Value) + d.data.isGRPC = true case "grpc-encoding": d.data.encoding = f.Value case "grpc-status": code, err := strconv.Atoi(f.Value) if err != nil { - return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + return } d.data.rawStatusCode = &code case "grpc-message": @@ -400,38 +380,43 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { case "grpc-status-details-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + return } s := &spb.Status{} if err := proto.Unmarshal(v, s); err != nil { - return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + return } d.data.statusGen = status.FromProto(s) case "grpc-timeout": d.data.timeoutSet = true var err error if d.data.timeout, err = decodeTimeout(f.Value); err != nil { - return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err) } case ":path": d.data.method = f.Value case ":status": code, err := strconv.Atoi(f.Value) if err != nil { - return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + return } d.data.httpStatus = &code case "grpc-tags-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + return } d.data.statsTags = v d.addMetadata(f.Name, string(v)) case "grpc-trace-bin": v, err := decodeBinHeader(f.Value) if err != nil { - return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + return } d.data.statsTrace = v d.addMetadata(f.Name, string(v)) @@ -442,11 +427,10 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { v, err := decodeMetadataHeader(f.Name, f.Value) if err != nil { errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err) - return nil + return } d.addMetadata(f.Name, v) } - return nil } type timeoutUnit uint8 diff --git a/test/end2end_test.go b/test/end2end_test.go index aee567630776..80a72d6495b2 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -64,6 +64,7 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -7128,30 +7129,10 @@ func (s) TestRPCWaitsForResolver(t *testing.T) { } } -// A copy from http_util.go for testing. -var httpStatusConvTab = map[int]codes.Code{ - // 400 Bad Request - INTERNAL. - http.StatusBadRequest: codes.Internal, - // 401 Unauthorized - UNAUTHENTICATED. - http.StatusUnauthorized: codes.Unauthenticated, - // 403 Forbidden - PERMISSION_DENIED. - http.StatusForbidden: codes.PermissionDenied, - // 404 Not Found - UNIMPLEMENTED. - http.StatusNotFound: codes.Unimplemented, - // 429 Too Many Requests - UNAVAILABLE. - http.StatusTooManyRequests: codes.Unavailable, - // 502 Bad Gateway - UNAVAILABLE. - http.StatusBadGateway: codes.Unavailable, - // 503 Service Unavailable - UNAVAILABLE. - http.StatusServiceUnavailable: codes.Unavailable, - // 504 Gateway timeout - UNAVAILABLE. - http.StatusGatewayTimeout: codes.Unavailable, -} - func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { // Non-gRPC content-type fallback path. - for httpCode := range httpStatusConvTab { - doHTTPHeaderTest(t, httpStatusConvTab[int(httpCode)], []string{ + for httpCode := range transport.HTTPStatusConvTab { + doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ ":status", fmt.Sprintf("%d", httpCode), "content-type", "text/html", // non-gRPC content type to switch to HTTP mode. "grpc-status", "1", // Make up a gRPC status error @@ -7160,8 +7141,8 @@ func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { } // Missing content-type fallback path. - for httpCode := range httpStatusConvTab { - doHTTPHeaderTest(t, httpStatusConvTab[int(httpCode)], []string{ + for httpCode := range transport.HTTPStatusConvTab { + doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ ":status", fmt.Sprintf("%d", httpCode), // Omitting content type to switch to HTTP mode. "grpc-status", "1", // Make up a gRPC status error From 6d948d062b33fac68009dd41e137ba1a15964377 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Wed, 27 Feb 2019 14:30:41 -0800 Subject: [PATCH 7/7] fix reviews --- internal/transport/http_util.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index ff9b653c2f0e..de0e7264bb3f 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -275,12 +275,6 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error { } for _, hf := range frame.Fields { - if hf.Name != "content-type" && hf.Name != ":status" && d.data.grpcErr != nil { - // if we've already encountered grpc related field parsing error, then we skip processing - // all following grpc related field. - continue - } - d.processHeaderField(hf) }