Skip to content

Commit

Permalink
http2: track unread bytes when the pipe is broken
Browse files Browse the repository at this point in the history
Once the pipe is broken, any remaining data needs to be reported as well
as any data that is written but dropped.

The client side flow control can eventually run out of available bytes
to be sent since no WINDOW_UPDATE is sent to reflect the data that is
never read in the pipe.

Updates golang/go#28634

Change-Id: I83f3c9d3614cd92517af2687489d2ccbf3a65456
Reviewed-on: https://go-review.googlesource.com/c/net/+/187377
Reviewed-by: Brad Fitzpatrick <[email protected]>
Run-TryBot: Brad Fitzpatrick <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
  • Loading branch information
fraenkel authored and bradfitz committed Oct 14, 2019
1 parent 491137f commit 2ba7206
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 28 deletions.
7 changes: 6 additions & 1 deletion http2/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type pipe struct {
mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer // nil when done reading
unread int // bytes unread when done
err error // read error once empty. non-nil means closed.
breakErr error // immediate read error (caller doesn't see rest of b)
donec chan struct{} // closed on error
Expand All @@ -33,7 +34,7 @@ func (p *pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
if p.b == nil {
return 0
return p.unread
}
return p.b.Len()
}
Expand Down Expand Up @@ -80,6 +81,7 @@ func (p *pipe) Write(d []byte) (n int, err error) {
return 0, errClosedPipeWrite
}
if p.breakErr != nil {
p.unread += len(d)
return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
Expand Down Expand Up @@ -117,6 +119,9 @@ func (p *pipe) closeWithError(dst *error, err error, fn func()) {
}
p.readFn = fn
if dst == &p.breakErr {
if p.b != nil {
p.unread += p.b.Len()
}
p.b = nil
}
*dst = err
Expand Down
12 changes: 12 additions & 0 deletions http2/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,19 @@ func TestPipeCloseWithError(t *testing.T) {
if err != a {
t.Logf("read error = %v, %v", err, a)
}
if p.Len() != 0 {
t.Errorf("pipe should have 0 unread bytes")
}
// Read and Write should fail.
if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
t.Errorf("Write(abc) after close\ngot %v, %v\nwant 0, %v", n, err, errClosedPipeWrite)
}
if n, err := p.Read(make([]byte, 1)); err == nil || n != 0 {
t.Errorf("Read() after close\ngot %v, nil\nwant 0, %v", n, errClosedPipeWrite)
}
if p.Len() != 0 {
t.Errorf("pipe should have 0 unread bytes")
}
}

func TestPipeBreakWithError(t *testing.T) {
Expand All @@ -116,13 +122,19 @@ func TestPipeBreakWithError(t *testing.T) {
if p.b != nil {
t.Errorf("buffer should be nil after BreakWithError")
}
if p.Len() != 3 {
t.Errorf("pipe should have 3 unread bytes")
}
// Write should succeed silently.
if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, nil", n, err)
}
if p.b != nil {
t.Errorf("buffer should be nil after Write")
}
if p.Len() != 6 {
t.Errorf("pipe should have 6 unread bytes")
}
// Read should fail.
if n, err := p.Read(make([]byte, 1)); err == nil || n != 0 {
t.Errorf("Read() after close\ngot %v, nil\nwant 0, not nil", n)
Expand Down
90 changes: 63 additions & 27 deletions http2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3658,37 +3658,73 @@ func (f funcReader) Read(p []byte) (n int, err error) { return f(p) }
// golang.org/issue/16481 -- return flow control when streams close with unread data.
// (The Server version of the bug. See also TestUnreadFlowControlReturned_Transport)
func TestUnreadFlowControlReturned_Server(t *testing.T) {
unblock := make(chan bool, 1)
defer close(unblock)
for _, tt := range []struct {
name string
reqFn func(r *http.Request)
}{
{
"body-open",
func(r *http.Request) {},
},
{
"body-closed",
func(r *http.Request) {
r.Body.Close()
},
},
{
"read-1-byte-and-close",
func(r *http.Request) {
b := make([]byte, 1)
r.Body.Read(b)
r.Body.Close()
},
},
} {
t.Run(tt.name, func(t *testing.T) {
unblock := make(chan bool, 1)
defer close(unblock)

st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
// Don't read the 16KB request body. Wait until the client's
// done sending it and then return. This should cause the Server
// to then return those 16KB of flow control to the client.
<-unblock
}, optOnlyServer)
defer st.Close()
timeOut := time.NewTimer(5 * time.Second)
defer timeOut.Stop()
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
// Don't read the 16KB request body. Wait until the client's
// done sending it and then return. This should cause the Server
// to then return those 16KB of flow control to the client.
tt.reqFn(r)
select {
case <-unblock:
case <-timeOut.C:
t.Fatal(tt.name, "timedout")
}
}, optOnlyServer)
defer st.Close()

tr := &Transport{TLSClientConfig: tlsConfigInsecure}
defer tr.CloseIdleConnections()
tr := &Transport{TLSClientConfig: tlsConfigInsecure}
defer tr.CloseIdleConnections()

// This previously hung on the 4th iteration.
for i := 0; i < 6; i++ {
body := io.MultiReader(
io.LimitReader(neverEnding('A'), 16<<10),
funcReader(func([]byte) (n int, err error) {
unblock <- true
return 0, io.EOF
}),
)
req, _ := http.NewRequest("POST", st.ts.URL, body)
res, err := tr.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
res.Body.Close()
// This previously hung on the 4th iteration.
iters := 100
if testing.Short() {
iters = 20
}
for i := 0; i < iters; i++ {
body := io.MultiReader(
io.LimitReader(neverEnding('A'), 16<<10),
funcReader(func([]byte) (n int, err error) {
unblock <- true
return 0, io.EOF
}),
)
req, _ := http.NewRequest("POST", st.ts.URL, body)
res, err := tr.RoundTrip(req)
if err != nil {
t.Fatal(tt.name, err)
}
res.Body.Close()
}
})
}

}

func TestServerIdleTimeout(t *testing.T) {
Expand Down

0 comments on commit 2ba7206

Please sign in to comment.