From 23406a6132dfe34e4bf8e65d1b0eff060bc26580 Mon Sep 17 00:00:00 2001 From: joybestourous Date: Fri, 12 Dec 2025 14:19:27 -0500 Subject: [PATCH 1/4] Check header size --- internal/transport/controlbuf.go | 18 ++++-- internal/transport/http2_server.go | 91 +++++++++++++++++------------- test/end2end_test.go | 37 ++++++++++++ test/servertester.go | 13 ++++- 4 files changed, 115 insertions(+), 44 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 2dcd1e63bdd2..ec21b5c4290e 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -147,11 +147,12 @@ type cleanupStream struct { func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM type earlyAbortStream struct { - httpStatus uint32 - streamID uint32 - contentSubtype string - status *status.Status - rst bool + httpStatus uint32 + streamID uint32 + contentSubtype string + status *status.Status + rst bool + maxSendHeaderListSize *uint32 } func (*earlyAbortStream) isTransportResponseFrame() bool { return false } @@ -854,6 +855,13 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())}, } + if !checkForHeaderListSize(headerFields, eas.maxSendHeaderListSize) { + if l.logger.V(logLevel) { + l.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *eas.maxSendHeaderListSize) + } + return l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeInternal) + } + if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil { return err } diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6f78a6b0c8cc..6b0dbc9800f1 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -480,11 +480,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.logger.Infof("Aborting the stream early: %v", errMsg) } t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusBadRequest, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.New(codes.Internal, errMsg), - rst: !frame.StreamEnded(), + httpStatus: http.StatusBadRequest, + streamID: streamID, + contentSubtype: s.contentSubtype, + status: status.New(codes.Internal, errMsg), + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) return nil } @@ -500,21 +501,23 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade } if !isGRPC { t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusUnsupportedMediaType, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType), - rst: !frame.StreamEnded(), + httpStatus: http.StatusUnsupportedMediaType, + streamID: streamID, + contentSubtype: s.contentSubtype, + status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType), + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) return nil } if headerError != nil { t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusBadRequest, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: headerError, - rst: !frame.StreamEnded(), + httpStatus: http.StatusBadRequest, + streamID: streamID, + contentSubtype: s.contentSubtype, + status: headerError, + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) return nil } @@ -570,11 +573,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.logger.Infof("Aborting the stream early: %v", errMsg) } t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusMethodNotAllowed, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.New(codes.Internal, errMsg), - rst: !frame.StreamEnded(), + httpStatus: http.StatusMethodNotAllowed, + streamID: streamID, + contentSubtype: s.contentSubtype, + status: status.New(codes.Internal, errMsg), + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) s.cancel() return nil @@ -591,11 +595,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade stat = status.New(codes.PermissionDenied, err.Error()) } t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusOK, - streamID: s.id, - contentSubtype: s.contentSubtype, - status: stat, - rst: !frame.StreamEnded(), + httpStatus: http.StatusOK, + streamID: s.id, + contentSubtype: s.contentSubtype, + status: stat, + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) return nil } @@ -605,11 +610,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.mu.Unlock() // Early abort in case the timeout was zero or so low it already fired. t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusOK, - streamID: s.id, - contentSubtype: s.contentSubtype, - status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), - rst: !frame.StreamEnded(), + httpStatus: http.StatusOK, + streamID: s.id, + contentSubtype: s.contentSubtype, + status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), + rst: !frame.StreamEnded(), + maxSendHeaderListSize: t.maxSendHeaderListSize, }) return nil } @@ -969,23 +975,32 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) return headerFields } -func (t *http2Server) checkForHeaderListSize(it any) bool { - if t.maxSendHeaderListSize == nil { +// checkForHeaderListSize checks if the header list size exceeds the limit set +// by the peer. It returns false if the limit is exceeded. +func checkForHeaderListSize(hf []hpack.HeaderField, maxSendHeaderListSize *uint32) bool { + if maxSendHeaderListSize == nil { return true } - hdrFrame := it.(*headerFrame) var sz int64 - for _, f := range hdrFrame.hf { - if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { - if t.logger.V(logLevel) { - t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) - } + for _, f := range hf { + if sz += int64(f.Size()); sz > int64(*maxSendHeaderListSize) { return false } } return true } +func (t *http2Server) checkForHeaderListSize(it any) bool { + hdrFrame := it.(*headerFrame) + if !checkForHeaderListSize(hdrFrame.hf, t.maxSendHeaderListSize) { + if t.logger.V(logLevel) { + t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) + } + return false + } + return true +} + func (t *http2Server) streamContextErr(s *ServerStream) error { select { case <-t.done: diff --git a/test/end2end_test.go b/test/end2end_test.go index 9157c525c094..c670d704a9fe 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6090,6 +6090,43 @@ func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) } } +func (s) TestEarlyAbortStreamHeaderListSizeCheck(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + s := grpc.NewServer() + defer s.Stop() + go s.Serve(lis) + + conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + defer conn.Close() + st := newServerTesterFromConn(t, conn) + + // Set a very small MaxHeaderListSize that any response headers would violate. + st.greetWithSettings(http2.Setting{ID: http2.SettingMaxHeaderListSize, Val: 1}) + + // Send a request with an invalid content-type to trigger early abort. + st.writeHeaders(http2.HeadersFrameParam{ + StreamID: 1, + BlockFragment: st.encodeHeader( + ":method", "POST", + ":path", "/grpc.testing.TestService/UnaryCall", + "content-type", "text/plain", // Invalid content-type to trigger early abort + "te", "trailers", + ), + EndStream: true, + EndHeaders: true, + }) + + // We should receive a RST_STREAM with ErrCodeInternal because the response + // headers exceed the MaxHeaderListSize limit. + st.wantRSTStream(http2.ErrCodeInternal) +} + func (s) TestNetPipeConn(t *testing.T) { // This test will block indefinitely if grpc writes both client and server // prefaces without either reading from the Conn. diff --git a/test/servertester.go b/test/servertester.go index 3701a0e094d9..ba324e324530 100644 --- a/test/servertester.go +++ b/test/servertester.go @@ -91,8 +91,19 @@ func (st *serverTester) readFrame() (http2.Frame, error) { // greet initiates the client's HTTP/2 connection into a state where // frames may be sent. func (st *serverTester) greet() { + st.greetWithSettings() +} + +// greetWithSettings initiates the client's HTTP/2 connection with custom settings. +func (st *serverTester) greetWithSettings(settings ...http2.Setting) { st.writePreface() - st.writeInitialSettings() + if len(settings) > 0 { + if err := st.fr.WriteSettings(settings...); err != nil { + st.t.Fatalf("Error writing initial SETTINGS frame from client to server: %v", err) + } + } else { + st.writeInitialSettings() + } st.wantSettings() st.writeSettingsAck() for { From 472ec384760e246c7fdc7e49d8aa080d03991cc7 Mon Sep 17 00:00:00 2001 From: joybestourous Date: Mon, 15 Dec 2025 13:02:13 -0500 Subject: [PATCH 2/4] vet --- internal/transport/controlbuf.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index a617012da197..f99ba7e7a613 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -858,7 +858,6 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())}, } - if p := istatus.RawStatusProto(eas.status); len(p.GetDetails()) > 0 { stBytes, err := proto.Marshal(p) if err != nil { @@ -867,13 +866,13 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)}) } } - - if !checkForHeaderListSize(headerFields, eas.maxSendHeaderListSize) { + + if !checkForHeaderListSize(headerFields, eas.maxSendHeaderListSize) { if l.logger.V(logLevel) { l.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *eas.maxSendHeaderListSize) } return l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeInternal) - } + } if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil { return err From 2ceced5a7c089fba2b994b42416f45fd70a7868f Mon Sep 17 00:00:00 2001 From: joybestourous Date: Thu, 18 Dec 2025 15:57:04 -0500 Subject: [PATCH 3/4] use executeAndPut for early abort header list size --- internal/transport/controlbuf.go | 44 ++------------ internal/transport/http2_server.go | 92 ++++++++++++++---------------- 2 files changed, 48 insertions(+), 88 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index f99ba7e7a613..7efa524785b1 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -24,19 +24,13 @@ import ( "fmt" "net" "runtime" - "strconv" "sync" "sync/atomic" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/grpcutil" - "google.golang.org/grpc/internal/pretty" - istatus "google.golang.org/grpc/internal/status" "google.golang.org/grpc/mem" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" ) var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { @@ -150,12 +144,9 @@ type cleanupStream struct { func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM type earlyAbortStream struct { - httpStatus uint32 - streamID uint32 - contentSubtype string - status *status.Status - rst bool - maxSendHeaderListSize *uint32 + streamID uint32 + rst bool + hf []hpack.HeaderField // Pre-built header fields } func (*earlyAbortStream) isTransportResponseFrame() bool { return false } @@ -847,34 +838,7 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { if l.side == clientSide { return errors.New("earlyAbortStream not handled on client") } - // In case the caller forgets to set the http status, default to 200. - if eas.httpStatus == 0 { - eas.httpStatus = 200 - } - headerFields := []hpack.HeaderField{ - {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))}, - {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)}, - {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))}, - {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())}, - } - - if p := istatus.RawStatusProto(eas.status); len(p.GetDetails()) > 0 { - stBytes, err := proto.Marshal(p) - if err != nil { - l.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err) - } else { - headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)}) - } - } - - if !checkForHeaderListSize(headerFields, eas.maxSendHeaderListSize) { - if l.logger.V(logLevel) { - l.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *eas.maxSendHeaderListSize) - } - return l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeInternal) - } - - if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil { + if err := l.writeHeader(eas.streamID, true, eas.hf, nil); err != nil { return err } if eas.rst { diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6b0dbc9800f1..0d042082beb2 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -479,14 +479,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade if t.logger.V(logLevel) { t.logger.Infof("Aborting the stream early: %v", errMsg) } - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusBadRequest, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.New(codes.Internal, errMsg), - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(streamID, s.contentSubtype, status.New(codes.Internal, errMsg), http.StatusBadRequest, !frame.StreamEnded()) return nil } @@ -500,25 +493,11 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade return nil } if !isGRPC { - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusUnsupportedMediaType, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType), - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(streamID, s.contentSubtype, status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType), http.StatusUnsupportedMediaType, !frame.StreamEnded()) return nil } if headerError != nil { - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusBadRequest, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: headerError, - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(streamID, s.contentSubtype, headerError, http.StatusBadRequest, !frame.StreamEnded()) return nil } @@ -572,14 +551,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade if t.logger.V(logLevel) { t.logger.Infof("Aborting the stream early: %v", errMsg) } - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusMethodNotAllowed, - streamID: streamID, - contentSubtype: s.contentSubtype, - status: status.New(codes.Internal, errMsg), - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(streamID, s.contentSubtype, status.New(codes.Internal, errMsg), http.StatusMethodNotAllowed, !frame.StreamEnded()) s.cancel() return nil } @@ -594,14 +566,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade if !ok { stat = status.New(codes.PermissionDenied, err.Error()) } - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusOK, - streamID: s.id, - contentSubtype: s.contentSubtype, - status: stat, - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(s.id, s.contentSubtype, stat, http.StatusOK, !frame.StreamEnded()) return nil } } @@ -609,14 +574,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade if s.ctx.Err() != nil { t.mu.Unlock() // Early abort in case the timeout was zero or so low it already fired. - t.controlBuf.put(&earlyAbortStream{ - httpStatus: http.StatusOK, - streamID: s.id, - contentSubtype: s.contentSubtype, - status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), - rst: !frame.StreamEnded(), - maxSendHeaderListSize: t.maxSendHeaderListSize, - }) + t.writeEarlyAbort(s.id, s.contentSubtype, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), http.StatusOK, !frame.StreamEnded()) return nil } @@ -1001,6 +959,44 @@ func (t *http2Server) checkForHeaderListSize(it any) bool { return true } +// buildEarlyAbortHF builds the header fields for an early abort response. +func buildEarlyAbortHF(httpStatus uint32, contentSubtype string, stat *status.Status) []hpack.HeaderField { + hf := []hpack.HeaderField{ + {Name: ":status", Value: strconv.Itoa(int(httpStatus))}, + {Name: "content-type", Value: grpcutil.ContentType(contentSubtype)}, + {Name: "grpc-status", Value: strconv.Itoa(int(stat.Code()))}, + {Name: "grpc-message", Value: encodeGrpcMessage(stat.Message())}, + } + if p := istatus.RawStatusProto(stat); len(p.GetDetails()) > 0 { + stBytes, err := proto.Marshal(p) + if err == nil { + hf = append(hf, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)}) + } + } + return hf +} + +// writeEarlyAbort sends an early abort response with the given HTTP status and gRPC status. +// If the header list size exceeds the peer's limit, it sends a RST_STREAM instead. +func (t *http2Server) writeEarlyAbort(streamID uint32, contentSubtype string, stat *status.Status, httpStatus uint32, rst bool) { + hf := buildEarlyAbortHF(httpStatus, contentSubtype, stat) + success, _ := t.controlBuf.executeAndPut(func() bool { + return checkForHeaderListSize(hf, t.maxSendHeaderListSize) + }, &earlyAbortStream{ + streamID: streamID, + rst: rst, + hf: hf, + }) + if !success { + t.controlBuf.put(&cleanupStream{ + streamID: streamID, + rst: true, + rstCode: http2.ErrCodeInternal, + onWrite: func() {}, + }) + } +} + func (t *http2Server) streamContextErr(s *ServerStream) error { select { case <-t.done: From fdf518cb88271555d0009281fbf91408ffc25815 Mon Sep 17 00:00:00 2001 From: joybestourous Date: Mon, 29 Dec 2025 15:08:30 -0500 Subject: [PATCH 4/4] arjan's comments --- internal/transport/http2_server.go | 47 ++++++++++++------------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 0d042082beb2..a1a14e14fc8e 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -573,8 +573,9 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade if s.ctx.Err() != nil { t.mu.Unlock() + st := status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) // Early abort in case the timeout was zero or so low it already fired. - t.writeEarlyAbort(s.id, s.contentSubtype, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), http.StatusOK, !frame.StreamEnded()) + t.writeEarlyAbort(s.id, s.contentSubtype, st, http.StatusOK, !frame.StreamEnded()) return nil } @@ -933,34 +934,26 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) return headerFields } -// checkForHeaderListSize checks if the header list size exceeds the limit set -// by the peer. It returns false if the limit is exceeded. -func checkForHeaderListSize(hf []hpack.HeaderField, maxSendHeaderListSize *uint32) bool { - if maxSendHeaderListSize == nil { +func (t *http2Server) checkForHeaderListSize(hf []hpack.HeaderField) bool { + if t.maxSendHeaderListSize == nil { return true } var sz int64 for _, f := range hf { - if sz += int64(f.Size()); sz > int64(*maxSendHeaderListSize) { + if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { + if t.logger.V(logLevel) { + t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) + } return false } } return true } -func (t *http2Server) checkForHeaderListSize(it any) bool { - hdrFrame := it.(*headerFrame) - if !checkForHeaderListSize(hdrFrame.hf, t.maxSendHeaderListSize) { - if t.logger.V(logLevel) { - t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) - } - return false - } - return true -} - -// buildEarlyAbortHF builds the header fields for an early abort response. -func buildEarlyAbortHF(httpStatus uint32, contentSubtype string, stat *status.Status) []hpack.HeaderField { +// writeEarlyAbort sends an early abort response with the given HTTP status and +// gRPC status. If the header list size exceeds the peer's limit, it sends a +// RST_STREAM instead. +func (t *http2Server) writeEarlyAbort(streamID uint32, contentSubtype string, stat *status.Status, httpStatus uint32, rst bool) { hf := []hpack.HeaderField{ {Name: ":status", Value: strconv.Itoa(int(httpStatus))}, {Name: "content-type", Value: grpcutil.ContentType(contentSubtype)}, @@ -969,19 +962,15 @@ func buildEarlyAbortHF(httpStatus uint32, contentSubtype string, stat *status.St } if p := istatus.RawStatusProto(stat); len(p.GetDetails()) > 0 { stBytes, err := proto.Marshal(p) + if err != nil { + t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err) + } if err == nil { hf = append(hf, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)}) } } - return hf -} - -// writeEarlyAbort sends an early abort response with the given HTTP status and gRPC status. -// If the header list size exceeds the peer's limit, it sends a RST_STREAM instead. -func (t *http2Server) writeEarlyAbort(streamID uint32, contentSubtype string, stat *status.Status, httpStatus uint32, rst bool) { - hf := buildEarlyAbortHF(httpStatus, contentSubtype, stat) success, _ := t.controlBuf.executeAndPut(func() bool { - return checkForHeaderListSize(hf, t.maxSendHeaderListSize) + return t.checkForHeaderListSize(hf) }, &earlyAbortStream{ streamID: streamID, rst: rst, @@ -1052,7 +1041,7 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error { endStream: false, onWrite: t.setResetPingStrikes, } - success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf) + success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf.hf) }, hf) if !success { if err != nil { return err @@ -1122,7 +1111,7 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error { } success, err := t.controlBuf.executeAndPut(func() bool { - return t.checkForHeaderListSize(trailingHeader) + return t.checkForHeaderListSize(trailingHeader.hf) }, nil) if !success { if err != nil {