Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple transport flow control from application read. #1265

Merged
merged 5 commits into from
Jun 1, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,3 @@ func (f *inFlow) onRead(n uint32) uint32 {
}
return 0
}

func (f *inFlow) resetPendingData() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
n := f.pendingData
f.pendingData = 0
return n
}
22 changes: 4 additions & 18 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
}
}
if s.state == streamDone {
s.mu.Unlock()
return
Expand Down Expand Up @@ -831,9 +826,6 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand All @@ -845,22 +837,19 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
// Decouple connection's flow control form application's read.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/form/from
Also, explain a bit more here?
We won't remember what we decoupled when we get used to the new flow control structure.

if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
Expand All @@ -872,9 +861,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand Down
21 changes: 4 additions & 17 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,6 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
if s.state == streamDone {
return
}
if w := t.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand All @@ -477,22 +474,20 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.Close()
return
}
// Decouple connection's flow control from application's read.
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// The stream has been closed. Release the corresponding quota.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment

if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
Expand All @@ -502,9 +497,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
Expand Down Expand Up @@ -1066,11 +1058,6 @@ func (t *http2Server) closeStream(s *Stream) {
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if w := t.fc.onRead(q); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
if s.state == streamDone {
s.mu.Unlock()
return
Expand Down
126 changes: 90 additions & 36 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,90 @@ func TestServerContextCanceledOnClosedConnection(t *testing.T) {
server.stop()
}

func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
serverConfig := &ServerConfig{
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()

if len(server.conns) == 0 {
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
}
return false, nil
})
var st *http2Server
server.mu.Lock()
for k := range server.conns {
st = k.(*http2Server)
}
server.mu.Unlock()
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create 1st stream. Err: %v", err)
}
// Exhaust server's connection window.
if err := client.Write(cstream1, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
//Client should be able to create another stream and send data on it.
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
}
if err := client.Write(cstream2, make([]byte, defaultWindowSize), &Options{}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
// Get the streams on server.
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()

if len(st.activeStreams) != 2 {
return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
}
return false, nil
})
var sstream1 *Stream
st.mu.Lock()
for _, v := range st.activeStreams {
if v.id == 1 {
sstream1 = v
}
}
st.mu.Unlock()
// Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
ct := client.(*http2Client)
<-ct.writableChan
if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil {
t.Fatalf("Client failed to write. Err: %v", err)
}
ct.writableChan <- 0
code := http2ErrConvTab[http2.ErrCodeFlowControl]
waitWhileTrue(t, func() (bool, error) {
cstream2.mu.Lock()
defer cstream2.mu.Unlock()
if cstream2.status.Code() != code {
return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
}
return false, nil
})
// Reading from the stream on server should succeed.
if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
t.Fatalf("_.Read(_) = %v, want <nil>", err)
}

if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("_.Read(_) = %v, want io.EOF", err)
}

}

func TestServerWithMisbehavedClient(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
Expand Down Expand Up @@ -1224,7 +1308,7 @@ func TestServerWithMisbehavedClient(t *testing.T) {
}
ss.fc.mu.Unlock()
}
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 {
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen {
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
}
// Keep sending until the server inbound window is drained for that stream.
Expand All @@ -1245,24 +1329,10 @@ func TestServerWithMisbehavedClient(t *testing.T) {
t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
}

if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
}
ct.CloseStream(s, nil)
// Test server behavior for violation of connection flow control window size restriction.
//
// Keep creating new streams until the connection window is drained on the server and
// the server tears down the connection.
for {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
// The server tears down the connection.
break
}
<-cc.writableChan
cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
cc.writableChan <- 0
}
ct.Close()
server.stop()
}
Expand Down Expand Up @@ -1293,7 +1363,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
break
}
}
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 {
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
}

Expand All @@ -1305,25 +1375,9 @@ func TestClientWithMisbehavedServer(t *testing.T) {
}

conn.CloseStream(s, err)
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
}
// Test the logic for the violation of the connection flow control window size restriction.
//
// Generate enough streams to drain the connection window. Make the server flood the traffic
// to violate flow control window size of the connection.
callHdr.Method = "foo.Connection"
for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
break
}
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil {
break
}
if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
}
// http2Client.errChan is closed due to connection flow control window size violation.
<-conn.Error()
ct.Close()
server.stop()
}
Expand Down