Skip to content

Commit

Permalink
http2: add a more full-featured test net.Conn
Browse files Browse the repository at this point in the history
Add a net.Conn implementation that plays nicely with testsyncGroup,
implements read/write timeouts, and gives control over buffering
to let us write tests that cause writes to a Conn to block at
specific points in time.

Change-Id: I9d870b211ac9d938a8c4a221277981cdb821a6e4
Reviewed-on: https://go-review.googlesource.com/c/net/+/586246
Reviewed-by: Jonathan Amsterdam <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
neild committed May 21, 2024
1 parent 410d19e commit 022530c
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 101 deletions.
116 changes: 17 additions & 99 deletions http2/clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ package http2
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"reflect"
"runtime"
"slices"
Expand Down Expand Up @@ -104,7 +103,7 @@ type testClientConn struct {

roundtrips []*testRoundTrip

netconn testClientConnNetConn
netconn *synctestNetConn
}

func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientConn {
Expand All @@ -114,22 +113,21 @@ func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientCo
cc: cc,
group: cc.t.transportTestHooks.group.(*synctestGroup),
}
cli, srv := synctestNetPipe(tc.group)
srv.SetReadDeadline(tc.group.Now())
tc.netconn = srv
tc.enc = hpack.NewEncoder(&tc.encbuf)
tc.netconn.gate = newGate()

// all writes and reads are finished.
//
// cli is the ClientConn's side, srv is the side controlled by the test.
cc.tconn = &tc.netconn
tc.fr = NewFramer(
(*testClientConnNetConnWriteToClient)(&tc.netconn),
(*testClientConnNetConnReadFromClient)(&tc.netconn),
)
cc.tconn = cli
tc.fr = NewFramer(srv, srv)

tc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
tc.fr.SetMaxReadFrameSize(10 << 20)
t.Cleanup(func() {
tc.closeWrite(io.EOF)
tc.closeWrite()
})
return tc
}
Expand All @@ -138,8 +136,7 @@ func (tc *testClientConn) readClientPreface() {
tc.t.Helper()
// Read the client's HTTP/2 preface, sent prior to any HTTP/2 frames.
buf := make([]byte, len(clientPreface))
r := (*testClientConnNetConnReadFromClient)(&tc.netconn)
if _, err := io.ReadFull(r, buf); err != nil {
if _, err := io.ReadFull(tc.netconn, buf); err != nil {
tc.t.Fatalf("reading preface: %v", err)
}
if !bytes.Equal(buf, clientPreface) {
Expand Down Expand Up @@ -174,26 +171,23 @@ func (tc *testClientConn) advance(d time.Duration) {

// hasFrame reports whether a frame is available to be read.
func (tc *testClientConn) hasFrame() bool {
tc.netconn.lock()
defer tc.netconn.unlock()
return tc.netconn.fromConn.Len() > 0
return len(tc.netconn.Peek()) > 0
}

func (tc *testClientConn) isClosed() bool {
tc.netconn.lock()
defer tc.netconn.unlock()
return tc.netconn.fromConnClosed
return tc.netconn.IsClosedByPeer()
}

// readFrame reads the next frame from the conn.
func (tc *testClientConn) readFrame() Frame {
tc.t.Helper()
tc.sync()
fr, err := tc.fr.ReadFrame()
if err == io.EOF {
if err == io.EOF || err == os.ErrDeadlineExceeded {
return nil
}
if err != nil {
return nil
tc.t.Fatalf("ReadFrame: %v", err)
}
return fr
}
Expand Down Expand Up @@ -597,10 +591,8 @@ func (tc *testClientConn) writeWindowUpdate(streamID, incr uint32) {

// closeWrite causes the net.Conn used by the ClientConn to return a error
// from Read calls.
func (tc *testClientConn) closeWrite(err error) {
tc.netconn.lock()
tc.netconn.toConnErr = err
tc.netconn.unlock()
func (tc *testClientConn) closeWrite() {
tc.netconn.Close()
tc.sync()
}

Expand Down Expand Up @@ -746,80 +738,6 @@ func diffHeaders(got, want http.Header) string {
return fmt.Sprintf("got: %v\nwant: %v", got, want)
}

// testClientConnNetConn implements net.Conn,
// and is the Conn used by a ClientConn under test.
type testClientConnNetConn struct {
gate gate
toConn bytes.Buffer
toConnErr error
fromConn bytes.Buffer
fromConnClosed bool
}

func (c *testClientConnNetConn) lock() {
c.gate.lock()
}

func (c *testClientConnNetConn) unlock() {
c.gate.unlock(c.toConn.Len() > 0 || c.toConnErr != nil)
}

func (c *testClientConnNetConn) Read(b []byte) (n int, err error) {
if err := c.gate.waitAndLock(context.Background()); err != nil {
return 0, err
}
defer c.unlock()
if c.toConn.Len() == 0 && c.toConnErr != nil {
return 0, c.toConnErr
}
return c.toConn.Read(b)
}

func (c *testClientConnNetConn) Write(b []byte) (n int, err error) {
c.lock()
defer c.unlock()
return c.fromConn.Write(b)
}

func (c *testClientConnNetConn) Close() error {
c.lock()
defer c.unlock()
c.fromConnClosed = true
c.toConn.Reset()
if c.toConnErr == nil {
c.toConnErr = errors.New("connection closed")
}
return nil
}

func (*testClientConnNetConn) LocalAddr() (_ net.Addr) { return }
func (*testClientConnNetConn) RemoteAddr() (_ net.Addr) { return }
func (*testClientConnNetConn) SetDeadline(t time.Time) error { return nil }
func (*testClientConnNetConn) SetReadDeadline(t time.Time) error { return nil }
func (*testClientConnNetConn) SetWriteDeadline(t time.Time) error { return nil }

// testClientConnNetConnWriteToClient is a view on a testClientConnNetConn
// that implements an io.Writer that sends to the client conn under test.
type testClientConnNetConnWriteToClient testClientConnNetConn

func (w *testClientConnNetConnWriteToClient) Write(b []byte) (n int, err error) {
c := (*testClientConnNetConn)(w)
c.gate.lock()
defer c.unlock()
return c.toConn.Write(b)
}

// testClientConnNetConnReadFromClient is a view on a testClientConnNetConn
// that implements an io.Reader that reads data sent by the client conn under test.
type testClientConnNetConnReadFromClient testClientConnNetConn

func (w *testClientConnNetConnReadFromClient) Read(b []byte) (n int, err error) {
c := (*testClientConnNetConn)(w)
c.gate.lock()
defer c.unlock()
return c.fromConn.Read(b)
}

// A testTransport allows testing Transport.RoundTrip against fake servers.
// Tests that aren't specifically exercising RoundTrip's retry loop or connection pooling
// should use testClientConn instead.
Expand Down Expand Up @@ -861,7 +779,7 @@ func newTestTransport(t *testing.T, opts ...func(*Transport)) *testTransport {
buf := make([]byte, 16*1024)
n := runtime.Stack(buf, true)
t.Logf("stacks:\n%s", buf[:n])
t.Fatalf("%v goroutines still running after test completed, expect 1", count-1)
t.Fatalf("%v goroutines still running after test completed, expect 1", count)
}
})

Expand Down
Loading

0 comments on commit 022530c

Please sign in to comment.