From 399218d6bcdde008df7f43cf82a92b69e842c049 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Mon, 30 Oct 2023 15:07:39 -0700 Subject: [PATCH] quic: implement stream flush Do not commit data written to a stream to the network until the user explicitly flushes the stream, the stream output buffer fills, or the output buffer contains enough data to fill a packet. We could write data immediately (as net.TCPConn does), but this can require the user to put their own buffer in front of the stream. Since we necessarily need to maintain a retransmit buffer in the stream, this is redundant. We could do something like Nagle's algorithm, but nobody wants that. So make flushes explicit. For golang/go#58547 Change-Id: I29dc9d79556c7a358a360ef79beb38b45040b6bc Reviewed-on: https://go-review.googlesource.com/c/net/+/543083 Auto-Submit: Damien Neil LUCI-TryBot-Result: Go LUCI Reviewed-by: Jonathan Amsterdam --- internal/quic/conn.go | 4 +- internal/quic/conn_flow_test.go | 7 ++ internal/quic/conn_loss_test.go | 5 +- internal/quic/conn_streams_test.go | 16 ++--- internal/quic/quic.go | 6 +- internal/quic/stream.go | 55 +++++++++++---- internal/quic/stream_test.go | 108 ++++++++++++++++++++++++++++- 7 files changed, 171 insertions(+), 30 deletions(-) diff --git a/internal/quic/conn.go b/internal/quic/conn.go index b2b6a0877a..ff96ff7600 100644 --- a/internal/quic/conn.go +++ b/internal/quic/conn.go @@ -136,12 +136,10 @@ func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip } } - // The smallest allowed maximum QUIC datagram size is 1200 bytes. // TODO: PMTU discovery. - const maxDatagramSize = 1200 c.logConnectionStarted(cids.originalDstConnID, peerAddr) c.keysAppData.init() - c.loss.init(c.side, maxDatagramSize, now) + c.loss.init(c.side, smallestMaxDatagramSize, now) c.streamsInit() c.lifetimeInit() c.restartIdleTimer(now) diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go index 03e0757a6d..39c879346c 100644 --- a/internal/quic/conn_flow_test.go +++ b/internal/quic/conn_flow_test.go @@ -262,6 +262,7 @@ func TestConnOutflowBlocked(t *testing.T) { if n != len(data) || err != nil { t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data)) } + s.Flush() tc.wantFrame("stream writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ @@ -310,6 +311,7 @@ func TestConnOutflowMaxDataDecreases(t *testing.T) { if n != len(data) || err != nil { t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data)) } + s.Flush() tc.wantFrame("stream writes data up to MAX_DATA limit", packetType1RTT, debugFrameStream{ @@ -337,7 +339,9 @@ func TestConnOutflowMaxDataRoundRobin(t *testing.T) { } s1.Write(make([]byte, 10)) + s1.Flush() s2.Write(make([]byte, 10)) + s2.Flush() tc.writeFrames(packetType1RTT, debugFrameMaxData{ max: 1, @@ -378,6 +382,7 @@ func TestConnOutflowMetaAndData(t *testing.T) { data := makeTestData(32) s.Write(data) + s.Flush() s.CloseRead() tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled", @@ -405,6 +410,7 @@ func TestConnOutflowResentData(t *testing.T) { data := makeTestData(15) s.Write(data[:8]) + s.Flush() tc.wantFrame("data is under MAX_DATA limit, all sent", packetType1RTT, debugFrameStream{ id: s.id, @@ -421,6 +427,7 @@ func TestConnOutflowResentData(t *testing.T) { }) s.Write(data[8:]) + s.Flush() tc.wantFrame("new data is sent up to the MAX_DATA limit", packetType1RTT, debugFrameStream{ id: s.id, diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go index 5144be6acc..818816335b 100644 --- a/internal/quic/conn_loss_test.go +++ b/internal/quic/conn_loss_test.go @@ -183,7 +183,7 @@ func TestLostStreamFrameEmpty(t *testing.T) { if err != nil { t.Fatalf("NewStream: %v", err) } - c.Write(nil) // open the stream + c.Flush() // open the stream tc.wantFrame("created bidirectional stream 0", packetType1RTT, debugFrameStream{ id: newStreamID(clientSide, bidiStream, 0), @@ -213,6 +213,7 @@ func TestLostStreamWithData(t *testing.T) { p.initialMaxStreamDataUni = 1 << 20 }) s.Write(data[:4]) + s.Flush() tc.wantFrame("send [0,4)", packetType1RTT, debugFrameStream{ id: s.id, @@ -220,6 +221,7 @@ func TestLostStreamWithData(t *testing.T) { data: data[:4], }) s.Write(data[4:8]) + s.Flush() tc.wantFrame("send [4,8)", packetType1RTT, debugFrameStream{ id: s.id, @@ -263,6 +265,7 @@ func TestLostStreamPartialLoss(t *testing.T) { }) for i := range data { s.Write(data[i : i+1]) + s.Flush() tc.wantFrame(fmt.Sprintf("send STREAM frame with byte %v", i), packetType1RTT, debugFrameStream{ id: s.id, diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go index 69f982c3a6..c90354db8a 100644 --- a/internal/quic/conn_streams_test.go +++ b/internal/quic/conn_streams_test.go @@ -19,33 +19,33 @@ func TestStreamsCreate(t *testing.T) { tc := newTestConn(t, clientSide, permissiveTransportParameters) tc.handshake() - c, err := tc.conn.NewStream(ctx) + s, err := tc.conn.NewStream(ctx) if err != nil { t.Fatalf("NewStream: %v", err) } - c.Write(nil) // open the stream + s.Flush() // open the stream tc.wantFrame("created bidirectional stream 0", packetType1RTT, debugFrameStream{ id: 0, // client-initiated, bidi, number 0 data: []byte{}, }) - c, err = tc.conn.NewSendOnlyStream(ctx) + s, err = tc.conn.NewSendOnlyStream(ctx) if err != nil { t.Fatalf("NewStream: %v", err) } - c.Write(nil) // open the stream + s.Flush() // open the stream tc.wantFrame("created unidirectional stream 0", packetType1RTT, debugFrameStream{ id: 2, // client-initiated, uni, number 0 data: []byte{}, }) - c, err = tc.conn.NewStream(ctx) + s, err = tc.conn.NewStream(ctx) if err != nil { t.Fatalf("NewStream: %v", err) } - c.Write(nil) // open the stream + s.Flush() // open the stream tc.wantFrame("created bidirectional stream 1", packetType1RTT, debugFrameStream{ id: 4, // client-initiated, uni, number 4 @@ -177,11 +177,11 @@ func TestStreamsStreamSendOnly(t *testing.T) { tc := newTestConn(t, serverSide, permissiveTransportParameters) tc.handshake() - c, err := tc.conn.NewSendOnlyStream(ctx) + s, err := tc.conn.NewSendOnlyStream(ctx) if err != nil { t.Fatalf("NewStream: %v", err) } - c.Write(nil) // open the stream + s.Flush() // open the stream tc.wantFrame("created unidirectional stream 0", packetType1RTT, debugFrameStream{ id: 3, // server-initiated, uni, number 0 diff --git a/internal/quic/quic.go b/internal/quic/quic.go index 6b60db869a..e4d0d77c7f 100644 --- a/internal/quic/quic.go +++ b/internal/quic/quic.go @@ -64,10 +64,14 @@ const defaultKeepAlivePeriod = 0 // https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2-6 const timerGranularity = 1 * time.Millisecond +// The smallest allowed maximum datagram size. +// https://www.rfc-editor.org/rfc/rfc9000#section-14 +const smallestMaxDatagramSize = 1200 + // Minimum size of a UDP datagram sent by a client carrying an Initial packet, // or a server containing an ack-eliciting Initial packet. // https://www.rfc-editor.org/rfc/rfc9000#section-14.1 -const paddedInitialDatagramSize = 1200 +const paddedInitialDatagramSize = smallestMaxDatagramSize // Maximum number of streams of a given type which may be created. // https://www.rfc-editor.org/rfc/rfc9000.html#section-4.6-2 diff --git a/internal/quic/stream.go b/internal/quic/stream.go index 58d84ed1b0..36c80f6af0 100644 --- a/internal/quic/stream.go +++ b/internal/quic/stream.go @@ -38,10 +38,11 @@ type Stream struct { // the write will fail. outgate gate out pipe // buffered data to send + outflushed int64 // offset of last flush call outwin int64 // maximum MAX_STREAM_DATA received from the peer outmaxsent int64 // maximum data offset we've sent to the peer outmaxbuf int64 // maximum amount of data we will buffer - outunsent rangeset[int64] // ranges buffered but not yet sent + outunsent rangeset[int64] // ranges buffered but not yet sent (only flushed data) outacked rangeset[int64] // ranges sent and acknowledged outopened sentVal // set if we should open the stream outclosed sentVal // set by CloseWrite @@ -240,8 +241,6 @@ func (s *Stream) Write(b []byte) (n int, err error) { // WriteContext writes data to the stream write buffer. // Buffered data is only sent when the buffer is sufficiently full. // Call the Flush method to ensure buffered data is sent. -// -// TODO: Implement Flush. func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) { if s.IsReadOnly() { return 0, errors.New("write to read-only stream") @@ -269,10 +268,6 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) s.outUnlock() return n, errors.New("write to closed stream") } - // We set outopened here rather than below, - // so if this is a zero-length write we still - // open the stream despite not writing any data to it. - s.outopened.set() if len(b) == 0 { break } @@ -282,13 +277,26 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) // Amount to write is min(the full buffer, data up to the write limit). // This is a number of bytes. nn := min(int64(len(b)), lim-s.out.end) - // Copy the data into the output buffer and mark it as unsent. - if s.out.end <= s.outwin { - s.outunsent.add(s.out.end, min(s.out.end+nn, s.outwin)) - } + // Copy the data into the output buffer. s.out.writeAt(b[:nn], s.out.end) b = b[nn:] n += int(nn) + // Possibly flush the output buffer. + // We automatically flush if: + // - We have enough data to consume the send window. + // Sending this data may cause the peer to extend the window. + // - We have buffered as much data as we're willing do. + // We need to send data to clear out buffer space. + // - We have enough data to fill a 1-RTT packet using the smallest + // possible maximum datagram size (1200 bytes, less header byte, + // connection ID, packet number, and AEAD overhead). + const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead + shouldFlush := s.out.end >= s.outwin || // peer send window is full + s.out.end >= lim || // local send buffer is full + (s.out.end-s.outflushed) >= autoFlushSize // enough data buffered + if shouldFlush { + s.flushLocked() + } if s.out.end > s.outwin { // We're blocked by flow control. // Send a STREAM_DATA_BLOCKED frame to let the peer know. @@ -301,6 +309,23 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) return n, nil } +// Flush flushes data written to the stream. +// It does not wait for the peer to acknowledge receipt of the data. +// Use CloseContext to wait for the peer's acknowledgement. +func (s *Stream) Flush() { + s.outgate.lock() + defer s.outUnlock() + s.flushLocked() +} + +func (s *Stream) flushLocked() { + s.outopened.set() + if s.outflushed < s.outwin { + s.outunsent.add(s.outflushed, min(s.outwin, s.out.end)) + } + s.outflushed = s.out.end +} + // Close closes the stream. // See CloseContext for more details. func (s *Stream) Close() error { @@ -363,6 +388,7 @@ func (s *Stream) CloseWrite() { s.outgate.lock() defer s.outUnlock() s.outclosed.set() + s.flushLocked() } // Reset aborts writes on the stream and notifies the peer @@ -612,8 +638,8 @@ func (s *Stream) handleMaxStreamData(maxStreamData int64) error { if maxStreamData <= s.outwin { return nil } - if s.out.end > s.outwin { - s.outunsent.add(s.outwin, min(maxStreamData, s.out.end)) + if s.outflushed > s.outwin { + s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed)) } s.outwin = maxStreamData if s.out.end > s.outwin { @@ -741,10 +767,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b } for { // STREAM - off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto) + off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto) if end := off + size; end > s.outmaxsent { // This will require connection-level flow control to send. end = min(end, s.outmaxsent+s.conn.streams.outflow.avail()) + end = max(end, off) size = end - off } fin := s.outclosed.isSet() && off+size == s.out.end diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go index 9bf2b5871d..93c8839fff 100644 --- a/internal/quic/stream_test.go +++ b/internal/quic/stream_test.go @@ -38,6 +38,7 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) { if n != writeBufferSize || err != context.Canceled { t.Fatalf("s.WriteContext() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize) } + s.Flush() tc.wantFrame("first write buffer of data sent", packetType1RTT, debugFrameStream{ id: s.id, @@ -47,7 +48,9 @@ func TestStreamWriteBlockedByOutputBuffer(t *testing.T) { // Blocking write, which must wait for buffer space. w := runAsync(tc, func(ctx context.Context) (int, error) { - return s.WriteContext(ctx, want[writeBufferSize:]) + n, err := s.WriteContext(ctx, want[writeBufferSize:]) + s.Flush() + return n, err }) tc.wantIdle("write buffer is full, no more data can be sent") @@ -170,6 +173,7 @@ func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) { t.Fatal(err) } s.WriteContext(ctx, want[:1]) + s.Flush() tc.wantFrame("sent data (1 byte) fits within flow control limit", packetType1RTT, debugFrameStream{ id: s.id, @@ -723,7 +727,7 @@ func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFra if err != nil { t.Fatal(err) } - s.Write(nil) // open the stream + s.Flush() // open the stream tc.wantFrame("new stream is opened", packetType1RTT, debugFrameStream{ id: sid, @@ -968,7 +972,9 @@ func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) { want := make([]byte, 4096) rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless w := runAsync(tc, func(ctx context.Context) (int, error) { - return s.WriteContext(ctx, want) + n, err := s.WriteContext(ctx, want) + s.Flush() + return n, err }) got := make([]byte, 0, len(want)) for { @@ -998,6 +1004,7 @@ func TestStreamCloseWaitsForAcks(t *testing.T) { tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters) data := make([]byte, 100) s.WriteContext(ctx, data) + s.Flush() tc.wantFrame("conn sends data for the stream", packetType1RTT, debugFrameStream{ id: s.id, @@ -1064,6 +1071,7 @@ func TestStreamCloseUnblocked(t *testing.T) { tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters) data := make([]byte, 100) s.WriteContext(ctx, data) + s.Flush() tc.wantFrame("conn sends data for the stream", packetType1RTT, debugFrameStream{ id: s.id, @@ -1228,6 +1236,7 @@ func TestStreamPeerStopSendingForActiveStream(t *testing.T) { tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters) for i := 0; i < 4; i++ { s.Write([]byte{byte(i)}) + s.Flush() tc.wantFrame("write sends a STREAM frame to peer", packetType1RTT, debugFrameStream{ id: s.id, @@ -1271,6 +1280,99 @@ func TestStreamReceiveDataBlocked(t *testing.T) { tc.wantIdle("no response to STREAM_DATA_BLOCKED and DATA_BLOCKED") } +func TestStreamFlushExplicit(t *testing.T) { + testStreamTypes(t, "", func(t *testing.T, styp streamType) { + tc, s := newTestConnAndLocalStream(t, clientSide, styp, permissiveTransportParameters) + want := []byte{0, 1, 2, 3} + n, err := s.Write(want) + if n != len(want) || err != nil { + t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want)) + } + tc.wantIdle("unflushed data is not sent") + s.Flush() + tc.wantFrame("data is sent after flush", + packetType1RTT, debugFrameStream{ + id: s.id, + data: want, + }) + }) +} + +func TestStreamFlushImplicitExact(t *testing.T) { + testStreamTypes(t, "", func(t *testing.T, styp streamType) { + const writeBufferSize = 4 + tc, s := newTestConnAndLocalStream(t, clientSide, styp, + permissiveTransportParameters, + func(c *Config) { + c.MaxStreamWriteBufferSize = writeBufferSize + }) + want := []byte{0, 1, 2, 3, 4, 5, 6} + + // This write doesn't quite fill the output buffer. + n, err := s.Write(want[:3]) + if n != 3 || err != nil { + t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want)) + } + tc.wantIdle("unflushed data is not sent") + + // This write fills the output buffer exactly. + n, err = s.Write(want[3:4]) + if n != 1 || err != nil { + t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want)) + } + tc.wantFrame("data is sent after write buffer fills", + packetType1RTT, debugFrameStream{ + id: s.id, + data: want[0:4], + }) + + }) +} + +func TestStreamFlushImplicitLargerThanBuffer(t *testing.T) { + testStreamTypes(t, "", func(t *testing.T, styp streamType) { + const writeBufferSize = 4 + tc, s := newTestConnAndLocalStream(t, clientSide, styp, + permissiveTransportParameters, + func(c *Config) { + c.MaxStreamWriteBufferSize = writeBufferSize + }) + want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + + w := runAsync(tc, func(ctx context.Context) (int, error) { + n, err := s.WriteContext(ctx, want) + return n, err + }) + + tc.wantFrame("data is sent after write buffer fills", + packetType1RTT, debugFrameStream{ + id: s.id, + data: want[0:4], + }) + tc.writeAckForAll() + tc.wantFrame("ack permits sending more data", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 4, + data: want[4:8], + }) + tc.writeAckForAll() + + tc.wantIdle("write buffer is not full") + if n, err := w.result(); n != len(want) || err != nil { + t.Fatalf("Write() = %v, %v; want %v, nil", n, err, len(want)) + } + + s.Flush() + tc.wantFrame("flush sends last buffer of data", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 8, + data: want[8:], + }) + }) +} + type streamSide string const (