diff --git a/http2/pipe.go b/http2/pipe.go index c15b8a771..684d984fd 100644 --- a/http2/pipe.go +++ b/http2/pipe.go @@ -88,13 +88,9 @@ func (p *pipe) Write(d []byte) (n int, err error) { p.c.L = &p.mu } defer p.c.Signal() - if p.err != nil { + if p.err != nil || p.breakErr != nil { return 0, errClosedPipeWrite } - if p.breakErr != nil { - p.unread += len(d) - return len(d), nil // discard when there is no reader - } return p.b.Write(d) } diff --git a/http2/pipe_test.go b/http2/pipe_test.go index 83d2dfd27..67562a92a 100644 --- a/http2/pipe_test.go +++ b/http2/pipe_test.go @@ -125,14 +125,14 @@ func TestPipeBreakWithError(t *testing.T) { if p.Len() != 3 { t.Errorf("pipe should have 3 unread bytes") } - // Write should succeed silently. - if n, err := p.Write([]byte("abc")); err != nil || n != 3 { - t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, nil", n, err) + // Write should fail. + if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 { + t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, errClosedPipeWrite", n, err) } if p.b != nil { t.Errorf("buffer should be nil after Write") } - if p.Len() != 6 { + if p.Len() != 3 { t.Errorf("pipe should have 6 unread bytes") } // Read should fail. diff --git a/http2/server.go b/http2/server.go index 8cb14f3c9..cd057f398 100644 --- a/http2/server.go +++ b/http2/server.go @@ -1822,15 +1822,18 @@ func (sc *serverConn) processData(f *DataFrame) error { } if len(data) > 0 { + st.bodyBytes += int64(len(data)) wrote, err := st.body.Write(data) if err != nil { + // The handler has closed the request body. + // Return the connection-level flow control for the discarded data, + // but not the stream-level flow control. sc.sendWindowUpdate(nil, int(f.Length)-wrote) - return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed)) + return nil } if wrote != len(data) { panic("internal error: bad Writer") } - st.bodyBytes += int64(len(data)) } // Return any padded flow control now, since we won't diff --git a/http2/server_test.go b/http2/server_test.go index d32b2d85b..40ab750fc 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -3906,6 +3906,32 @@ func TestUnreadFlowControlReturned_Server(t *testing.T) { } } +func TestServerReturnsStreamAndConnFlowControlOnBodyClose(t *testing.T) { + unblockHandler := make(chan struct{}) + defer close(unblockHandler) + + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + w.WriteHeader(200) + w.(http.Flusher).Flush() + <-unblockHandler + }) + defer st.Close() + + st.greet() + st.writeHeaders(HeadersFrameParam{ + StreamID: 1, + BlockFragment: st.encodeHeader(), + EndHeaders: true, + }) + st.wantHeaders() + const size = inflowMinRefresh // enough to trigger flow control return + st.writeData(1, false, make([]byte, size)) + st.wantWindowUpdate(0, size) // conn-level flow control is returned + unblockHandler <- struct{}{} + st.wantData() +} + func TestServerIdleTimeout(t *testing.T) { if testing.Short() { t.Skip("skipping in short mode") diff --git a/http2/transport.go b/http2/transport.go index 05ba23d3d..c9e1115a5 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -2555,6 +2555,9 @@ func (b transportResponseBody) Close() error { cs := b.cs cc := cs.cc + cs.bufPipe.BreakWithError(errClosedResponseBody) + cs.abortStream(errClosedResponseBody) + unread := cs.bufPipe.Len() if unread > 0 { cc.mu.Lock() @@ -2573,9 +2576,6 @@ func (b transportResponseBody) Close() error { cc.wmu.Unlock() } - cs.bufPipe.BreakWithError(errClosedResponseBody) - cs.abortStream(errClosedResponseBody) - select { case <-cs.donec: case <-cs.ctx.Done():