From 729617b4947581c71647e7f7c1034075e8d969b1 Mon Sep 17 00:00:00 2001 From: Phoenix Kahlo Date: Tue, 3 Jun 2025 18:21:06 -0500 Subject: [PATCH 1/4] proto: Internally use SendStream::new Makes it so that SendStream is always constructed with SendStream::new. This is in preparation for subsequent commit which will make constructor non-trivial. Previously, `new` was `#[cfg(fuzzing)] pub`. Now, the `#[cfg(fuzzing)]` is removed and `pub` is changed to `pub(super)`. Fuzz code is modified to call a new `#[cfg(fuzzing)] pub fn new_for_fuzzing` delegate method. --- fuzz/fuzz_targets/streams.rs | 7 +- quinn-proto/src/connection/mod.rs | 10 +-- quinn-proto/src/connection/streams/mod.rs | 11 ++- quinn-proto/src/connection/streams/state.rs | 98 +++------------------ 4 files changed, 33 insertions(+), 93 deletions(-) diff --git a/fuzz/fuzz_targets/streams.rs b/fuzz/fuzz_targets/streams.rs index 340078e5f1..29bc87bf77 100644 --- a/fuzz/fuzz_targets/streams.rs +++ b/fuzz/fuzz_targets/streams.rs @@ -50,7 +50,8 @@ fuzz_target!(|input: (StreamParams, Vec)| { Streams::new(&mut state, &conn_state).accept(dir); } Operation::Finish(id) => { - let _ = SendStream::new(id, &mut state, &mut pending, &conn_state).finish(); + let _ = + SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state).finish(); } Operation::ReceivedStopSending(sid, err_code) => { Streams::new(&mut state, &conn_state) @@ -63,8 +64,8 @@ fuzz_target!(|input: (StreamParams, Vec)| { .received_reset(rs); } Operation::Reset(id) => { - let _ = - SendStream::new(id, &mut state, &mut pending, &conn_state).reset(0u32.into()); + let _ = SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state) + .reset(0u32.into()); } } } diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 10a4fc0efd..f42c765018 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -426,12 +426,12 @@ impl Connection { #[must_use] pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> { assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side()); - SendStream { + SendStream::new( id, - state: &mut self.streams, - pending: &mut self.spaces[SpaceId::Data].pending, - conn_state: &self.state, - } + &mut self.streams, + &mut self.spaces[SpaceId::Data].pending, + &self.state, + ) } /// Returns packets to transmit diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 53e42815ee..9911706ff7 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -203,7 +203,16 @@ pub struct SendStream<'a> { #[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing) impl<'a> SendStream<'a> { #[cfg(fuzzing)] - pub fn new( + pub fn new_for_fuzzing( + id: StreamId, + state: &'a mut StreamsState, + pending: &'a mut Retransmits, + conn_state: &'a super::State, + ) -> Self { + Self::new(id, state, pending, conn_state) + } + + pub(super) fn new( id: StreamId, state: &'a mut StreamsState, pending: &'a mut Retransmits, diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e05311423d..ec699f140a 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1290,12 +1290,7 @@ mod tests { .open(Dir::Uni) .unwrap(); - let mut stream = SendStream { - id, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream = SendStream::new(id, &mut server, &mut pending, &state); let error_code = 0u32.into(); stream.state.received_stop_sending(id, error_code); @@ -1353,29 +1348,14 @@ mod tests { let id_mid = streams.open(Dir::Bi).unwrap(); let id_low = streams.open(Dir::Bi).unwrap(); - let mut mid = SendStream { - id: id_mid, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut mid = SendStream::new(id_mid, &mut server, &mut pending, &state); mid.write(b"mid").unwrap(); - let mut low = SendStream { - id: id_low, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut low = SendStream::new(id_low, &mut server, &mut pending, &state); low.set_priority(-1).unwrap(); low.write(b"low").unwrap(); - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(1).unwrap(); high.write(b"high").unwrap(); @@ -1408,21 +1388,11 @@ mod tests { let id_high = streams.open(Dir::Bi).unwrap(); let id_mid = streams.open(Dir::Bi).unwrap(); - let mut mid = SendStream { - id: id_mid, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut mid = SendStream::new(id_mid, &mut server, &mut pending, &state); assert_eq!(mid.write(b"mid").unwrap(), 3); assert_eq!(server.pending.len(), 1); - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(1).unwrap(); assert_eq!(high.write(&[0; 200]).unwrap(), 200); assert_eq!(server.pending.len(), 2); @@ -1430,12 +1400,7 @@ mod tests { // Requeue the high priority stream to lowest priority. The initial send // still uses high priority since it's queued that way. After that it will // switch to low priority - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(-1).unwrap(); let mut buf = Vec::with_capacity(1000); @@ -1478,28 +1443,13 @@ mod tests { let id_b = streams.open(Dir::Bi).unwrap(); let id_c = streams.open(Dir::Bi).unwrap(); - let mut stream_a = SendStream { - id: id_a, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_a = SendStream::new(id_a, &mut server, &mut pending, &state); stream_a.write(&[b'a'; 100]).unwrap(); - let mut stream_b = SendStream { - id: id_b, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_b = SendStream::new(id_b, &mut server, &mut pending, &state); stream_b.write(&[b'b'; 100]).unwrap(); - let mut stream_c = SendStream { - id: id_c, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_c = SendStream::new(id_c, &mut server, &mut pending, &state); stream_c.write(&[b'c'; 100]).unwrap(); let mut metas = vec![]; @@ -1558,20 +1508,10 @@ mod tests { let id_b = streams.open(Dir::Bi).unwrap(); let id_c = streams.open(Dir::Bi).unwrap(); - let mut stream_a = SendStream { - id: id_a, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_a = SendStream::new(id_a, &mut server, &mut pending, &state); stream_a.write(&[b'a'; 100]).unwrap(); - let mut stream_b = SendStream { - id: id_b, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_b = SendStream::new(id_b, &mut server, &mut pending, &state); stream_b.write(&[b'b'; 100]).unwrap(); let mut metas = vec![]; @@ -1584,12 +1524,7 @@ mod tests { metas.extend(meta); // Queue stream_c which has higher priority - let mut stream_c = SendStream { - id: id_c, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_c = SendStream::new(id_c, &mut server, &mut pending, &state); stream_c.set_priority(1).unwrap(); stream_c.write(&[b'b'; 100]).unwrap(); @@ -1658,12 +1593,7 @@ mod tests { }; let id = streams.open(Dir::Uni).unwrap(); - let mut stream = SendStream { - id, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream = SendStream::new(id, &mut server, &mut pending, &state); stream.write(b"hello").unwrap(); stream.reset(0u32.into()).unwrap(); From d676d4dbc0e4745b5feaa205ed3a88f5967fc711 Mon Sep 17 00:00:00 2001 From: Phoenix Kahlo Date: Tue, 3 Jun 2025 18:27:28 -0500 Subject: [PATCH 2/4] proto: Change StreamsState.send to IndexMap Adds dependency on indexmap crate, and changes StreamState.send from a HashMap to an IndexMap. Updates some usages to be idiomatic / performant. This is in preparation for a subsequent commit which will exploit useful properties of IndexMap. --- Cargo.lock | 23 ++++++++++++++++++ Cargo.toml | 1 + quinn-proto/Cargo.toml | 1 + quinn-proto/src/connection/streams/state.rs | 27 +++++++++------------ 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86ba2a4351..d167b3bee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -740,6 +740,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.11" @@ -914,6 +920,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -1090,6 +1102,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1559,6 +1581,7 @@ dependencies = [ "fastbloom", "getrandom 0.3.3", "hex-literal", + "indexmap", "lazy_static", "lru-slab", "rand", diff --git a/Cargo.toml b/Cargo.toml index 5b622a37e8..18ca6b678e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ futures-io = "0.3.19" getrandom = { version = "0.3", default-features = false } hdrhistogram = { version = "7.2", default-features = false } hex-literal = "0.4" +indexmap = "2.9.0" lru-slab = "0.1.2" lazy_static = "1" log = "0.4" diff --git a/quinn-proto/Cargo.toml b/quinn-proto/Cargo.toml index e1d88a45b7..ae077b1e21 100644 --- a/quinn-proto/Cargo.toml +++ b/quinn-proto/Cargo.toml @@ -39,6 +39,7 @@ arbitrary = { workspace = true, optional = true } aws-lc-rs = { workspace = true, optional = true } bytes = { workspace = true } fastbloom = { workspace = true, optional = true } +indexmap = { workspace = true } lru-slab = { workspace = true } rustc-hash = { workspace = true } rand = { workspace = true } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index ec699f140a..d0723f5084 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1,11 +1,8 @@ -use std::{ - collections::{VecDeque, hash_map}, - convert::TryFrom, - mem, -}; +use std::{collections::VecDeque, convert::TryFrom, mem}; use bytes::BufMut; -use rustc_hash::FxHashMap; +use indexmap::IndexMap; +use rustc_hash::{FxBuildHasher, FxHashMap}; use tracing::{debug, trace}; use super::{ @@ -69,7 +66,7 @@ impl StreamRecv { pub struct StreamsState { pub(super) side: Side, // Set of streams that are currently open, or could be immediately opened by the peer - pub(super) send: FxHashMap>>, + pub(super) send: IndexMap>, FxBuildHasher>, pub(super) recv: FxHashMap>, pub(super) free_recv: Vec, pub(super) next: [u64; 2], @@ -149,7 +146,7 @@ impl StreamsState { ) -> Self { let mut this = Self { side, - send: FxHashMap::default(), + send: IndexMap::default(), recv: FxHashMap::default(), free_recv: Vec::new(), next: [0, 0], @@ -225,7 +222,7 @@ impl StreamsState { // We don't bother calling `stream_freed` here because we explicitly reset affected // counters below. let id = StreamId::new(self.side, dir, i); - self.send.remove(&id).unwrap(); + self.send.swap_remove(&id).unwrap(); if let Dir::Bi = dir { self.recv.remove(&id).unwrap(); } @@ -379,10 +376,10 @@ impl StreamsState { pub(crate) fn reset_acked(&mut self, id: StreamId) { match self.send.entry(id) { - hash_map::Entry::Vacant(_) => {} - hash_map::Entry::Occupied(e) => { + indexmap::map::Entry::Vacant(_) => {} + indexmap::map::Entry::Occupied(e) => { if let Some(SendState::ResetSent) = e.get().as_ref().map(|s| s.state) { - e.remove_entry(); + e.swap_remove_entry(); self.stream_freed(id, StreamHalf::Send); } } @@ -636,8 +633,8 @@ impl StreamsState { pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) { let mut entry = match self.send.entry(frame.id) { - hash_map::Entry::Vacant(_) => return, - hash_map::Entry::Occupied(e) => e, + indexmap::map::Entry::Vacant(_) => return, + indexmap::map::Entry::Occupied(e) => e, }; let stream = match entry.get_mut().as_mut() { @@ -662,7 +659,7 @@ impl StreamsState { return; } - entry.remove_entry(); + entry.swap_remove_entry(); self.stream_freed(id, StreamHalf::Send); self.events.push_back(StreamEvent::Finished { id }); } From 5d78e29170728c3632824c9dea0a82085afaa338 Mon Sep 17 00:00:00 2001 From: Phoenix Kahlo Date: Tue, 3 Jun 2025 18:35:42 -0500 Subject: [PATCH 3/4] proto: Preemptive SendStream hashmap lookup Previously, all 5 primitive methods on SendStream (write_source, stopped finish, set_priority, and priority) performed a self.state.send[self.id] hashmap lookup. Now that self.state.send is an IndexMap, this commit replaces these individual lookups with a single key-to-index lookup upon construction, and saves the Option result to a new field. This removes certain tradeoffs between performance and borrowing complexity, and thereby facilitates subsequent refactor commits. --- quinn-proto/src/connection/streams/mod.rs | 56 ++++++++--------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 9911706ff7..22ce34aab5 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -10,7 +10,7 @@ use tracing::trace; use super::spaces::{Retransmits, ThinRetransmits}; use crate::{ Dir, StreamId, VarInt, - connection::streams::state::{get_or_insert_recv, get_or_insert_send}, + connection::streams::state::get_or_insert_recv, frame, }; @@ -195,6 +195,7 @@ impl RecvStream<'_> { /// Access to streams pub struct SendStream<'a> { pub(super) id: StreamId, + pub(super) index: Option, pub(super) state: &'a mut StreamsState, pub(super) pending: &'a mut Retransmits, pub(super) conn_state: &'a super::State, @@ -218,8 +219,10 @@ impl<'a> SendStream<'a> { pending: &'a mut Retransmits, conn_state: &'a super::State, ) -> Self { + let index = state.send.get_index_of(&id); Self { id, + index, state, pending, conn_state, @@ -253,12 +256,8 @@ impl<'a> SendStream<'a> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(WriteError::ClosedStream)?; + let index = self.index.ok_or(WriteError::ClosedStream)?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); if limit == 0 { trace!( @@ -285,11 +284,8 @@ impl<'a> SendStream<'a> { /// Check if this stream was stopped, get the reason if it was pub fn stopped(&self) -> Result, ClosedStream> { - match self.state.send.get(&self.id).as_ref() { - Some(Some(s)) => Ok(s.stop_reason), - Some(None) => Ok(None), - None => Err(ClosedStream { _private: () }), - } + let index = self.index.ok_or(ClosedStream { _private: () })?; + Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason)) } /// Finish a send stream, signalling that no more data will be sent. @@ -299,12 +295,9 @@ impl<'a> SendStream<'a> { /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished pub fn finish(&mut self) -> Result<(), FinishError> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(FinishError::ClosedStream)?; + + let index = self.index.ok_or(FinishError::ClosedStream)?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); let was_pending = stream.is_pending(); stream.finish()?; @@ -321,12 +314,8 @@ impl<'a> SendStream<'a> { /// - when applied to a receive stream pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); if matches!(stream.state, SendState::ResetSent) { // Redundant reset call @@ -350,12 +339,8 @@ impl<'a> SendStream<'a> { /// - when applied to a receive stream pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); stream.priority = priority; Ok(()) @@ -366,13 +351,12 @@ impl<'a> SendStream<'a> { /// # Panics /// - when applied to a receive stream pub fn priority(&self) -> Result { - let stream = self - .state - .send - .get(&self.id) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; - Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default()) + Ok(self.state.send[index] + .as_ref() + .map(|s| s.priority) + .unwrap_or_default()) } } From df184d15ea089808b5eaa11c4216fe3ab453b1e8 Mon Sep 17 00:00:00 2001 From: Phoenix Kahlo Date: Tue, 3 Jun 2025 18:52:08 -0500 Subject: [PATCH 4/4] proto: Remove write_source Prior to this commit, SendStream had several different write methods each of which had to: 1. Perform some shared logic to check how much can be written. 2. Do source-specific logic to iterate over a series of `Bytes` chunk to write 3. For each one of those, perform some shared logic to write the bytes. Prior to this commit, this this was achieved with the WriteSource trait. Previous attempts to simplify this and remove the WriteSource trait were complicated by the fact that it is desirable to cache a single StreamId hashmap lookup over the course of this whole process. However, that is no longer an obstacle due to previous commits. Thus, this commit basically splits the SendStream::write_source method into two methods (both of which are still private): 1. A `write_limit` method, which checks how many bytes can be written. 2. A `write_unchecked` method, which writes bytes under the assumption that the write limit is being expected, and does not itself check limits or error conditions. As such, the `write` and `write_chunks` methods are rewritten to call these methods directly, and the `WriteSource` trait system is removed entirely, achieving significant simplification. --- quinn-proto/src/connection/streams/mod.rs | 49 +++-- quinn-proto/src/connection/streams/send.rs | 222 +-------------------- 2 files changed, 36 insertions(+), 235 deletions(-) diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 22ce34aab5..fd6b2a05f0 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -8,20 +8,15 @@ use thiserror::Error; use tracing::trace; use super::spaces::{Retransmits, ThinRetransmits}; -use crate::{ - Dir, StreamId, VarInt, - connection::streams::state::get_or_insert_recv, - frame, -}; +use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame}; mod recv; use recv::Recv; pub use recv::{Chunks, ReadError, ReadableError}; mod send; -pub(crate) use send::{ByteSlice, BytesArray}; -use send::{BytesSource, Send, SendState}; pub use send::{FinishError, WriteError, Written}; +use send::{Send, SendState}; mod state; #[allow(unreachable_pub)] // fuzzing only @@ -233,7 +228,9 @@ impl<'a> SendStream<'a> { /// /// Returns the number of bytes successfully written. pub fn write(&mut self, data: &[u8]) -> Result { - Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes) + let prefix_len = self.write_limit()?.min(data.len()); + self.write_unchecked(data[..prefix_len].to_vec().into()); + Ok(prefix_len) } /// Send data on the given stream @@ -243,10 +240,27 @@ impl<'a> SendStream<'a> { /// [`Written::chunks`] will not count this chunk as fully written. However /// the chunk will be advanced and contain only non-written data after the call. pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result { - self.write_source(&mut BytesArray::from_chunks(data)) + let limit = self.write_limit()?; + let mut written = Written::default(); + for chunk in data { + let prefix = chunk.split_to(chunk.len().min(limit - written.bytes)); + written.bytes += prefix.len(); + self.write_unchecked(prefix); + + if chunk.is_empty() { + written.chunks += 1; + } + + debug_assert!(written.bytes <= limit); + if written.bytes == limit { + break; + } + } + Ok(written) } - fn write_source(&mut self, source: &mut B) -> Result { + /// Get how many bytes could be written immediately, or mark as blocked if zero + fn write_limit(&mut self) -> Result { if self.conn_state.is_closed() { trace!(%self.id, "write blocked; connection draining"); return Err(WriteError::Blocked); @@ -271,15 +285,20 @@ impl<'a> SendStream<'a> { return Err(WriteError::Blocked); } + stream.write_limit(limit) + } + + /// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()` + fn write_unchecked(&mut self, chunk: Bytes) { + let stream = self.state.send[self.index.unwrap()].as_mut().unwrap(); let was_pending = stream.is_pending(); - let written = stream.write(source, limit)?; - self.state.data_sent += written.bytes as u64; - self.state.unacked_data += written.bytes as u64; - trace!(stream = %self.id, "wrote {} bytes", written.bytes); + self.state.data_sent += chunk.len() as u64; + self.state.unacked_data += chunk.len() as u64; + trace!(stream = %self.id, "wrote {} bytes", chunk.len()); + stream.pending.write(chunk); if !was_pending { self.state.pending.push_pending(self.id, stream.priority); } - Ok(written) } /// Check if this stream was stopped, get the reason if it was diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs index 7b3db809a1..759c04c48d 100644 --- a/quinn-proto/src/connection/streams/send.rs +++ b/quinn-proto/src/connection/streams/send.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use thiserror::Error; use crate::{VarInt, connection::send_buffer::SendBuffer, frame}; @@ -49,11 +48,7 @@ impl Send { } } - pub(super) fn write( - &mut self, - source: &mut S, - limit: u64, - ) -> Result { + pub(super) fn write_limit(&self, limit: u64) -> Result { if !self.is_writable() { return Err(WriteError::ClosedStream); } @@ -64,23 +59,7 @@ impl Send { if budget == 0 { return Err(WriteError::Blocked); } - let mut limit = limit.min(budget) as usize; - - let mut result = Written::default(); - loop { - let (chunk, chunks_consumed) = source.pop_chunk(limit); - result.chunks += chunks_consumed; - result.bytes += chunk.len(); - - if chunk.is_empty() { - break; - } - - limit -= chunk.len(); - self.pending.write(chunk); - } - - Ok(result) + Ok(limit.min(budget) as usize) } /// Update stream state due to a reset sent by the local application @@ -143,106 +122,6 @@ impl Send { } } -/// A [`BytesSource`] implementation for `&'a mut [Bytes]` -/// -/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to -/// a configured limit. -pub(crate) struct BytesArray<'a> { - /// The wrapped slice of `Bytes` - chunks: &'a mut [Bytes], - /// The amount of chunks consumed from this source - consumed: usize, -} - -impl<'a> BytesArray<'a> { - pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self { - Self { - chunks, - consumed: 0, - } - } -} - -impl BytesSource for BytesArray<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { - // The loop exists to skip empty chunks while still marking them as - // consumed - let mut chunks_consumed = 0; - - while self.consumed < self.chunks.len() { - let chunk = &mut self.chunks[self.consumed]; - - if chunk.len() <= limit { - let chunk = std::mem::take(chunk); - self.consumed += 1; - chunks_consumed += 1; - if chunk.is_empty() { - continue; - } - return (chunk, chunks_consumed); - } else if limit > 0 { - let chunk = chunk.split_to(limit); - return (chunk, chunks_consumed); - } else { - break; - } - } - - (Bytes::new(), chunks_consumed) - } -} - -/// A [`BytesSource`] implementation for `&[u8]` -/// -/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily -/// created from a reference. This allows to defer the allocation until it is -/// known how much data needs to be copied. -pub(crate) struct ByteSlice<'a> { - /// The wrapped byte slice - data: &'a [u8], -} - -impl<'a> ByteSlice<'a> { - pub(crate) fn from_slice(data: &'a [u8]) -> Self { - Self { data } - } -} - -impl BytesSource for ByteSlice<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { - let limit = limit.min(self.data.len()); - if limit == 0 { - return (Bytes::new(), 0); - } - - let chunk = Bytes::from(self.data[..limit].to_owned()); - self.data = &self.data[chunk.len()..]; - - let chunks_consumed = usize::from(self.data.is_empty()); - (chunk, chunks_consumed) - } -} - -/// A source of one or more buffers which can be converted into `Bytes` buffers on demand -/// -/// The purpose of this data type is to defer conversion as long as possible, -/// so that no heap allocation is required in case no data is writable. -pub(super) trait BytesSource { - /// Returns the next chunk from the source of owned chunks. - /// - /// This method will consume parts of the source. - /// Calling it will yield `Bytes` elements up to the configured `limit`. - /// - /// The method returns a tuple: - /// - The first item is the yielded `Bytes` element. The element will be - /// empty if the limit is zero or no more data is available. - /// - The second item returns how many complete chunks inside the source had - /// had been consumed. This can be less than 1, if a chunk inside the - /// source had been truncated in order to adhere to the limit. It can also - /// be more than 1, if zero-length chunks had been skipped. - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize); -} - /// Indicates how many bytes and chunks had been transferred in a write operation #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct Written { @@ -303,100 +182,3 @@ pub enum FinishError { #[error("closed stream")] ClosedStream, } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn bytes_array() { - let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned(); - for limit in 0..full.len() { - let mut chunks = [ - Bytes::from_static(b""), - Bytes::from_static(b"Hello "), - Bytes::from_static(b"Wo"), - Bytes::from_static(b""), - Bytes::from_static(b"r"), - Bytes::from_static(b"ld"), - Bytes::from_static(b""), - Bytes::from_static(b" 12345678"), - Bytes::from_static(b"9 ABCDE"), - Bytes::from_static(b"F"), - Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"), - ]; - let num_chunks = chunks.len(); - let last_chunk_len = chunks[chunks.len() - 1].len(); - - let mut array = BytesArray::from_chunks(&mut chunks); - - let mut buf = Vec::new(); - let mut chunks_popped = 0; - let mut chunks_consumed = 0; - let mut remaining = limit; - loop { - let (chunk, consumed) = array.pop_chunk(remaining); - chunks_consumed += consumed; - - if !chunk.is_empty() { - buf.extend_from_slice(&chunk); - remaining -= chunk.len(); - chunks_popped += 1; - } else { - break; - } - } - - assert_eq!(&buf[..], &full[..limit]); - - if limit == full.len() { - // Full consumption of the last chunk - assert_eq!(chunks_consumed, num_chunks); - // Since there are empty chunks, we consume more than there are popped - assert_eq!(chunks_consumed, chunks_popped + 3); - } else if limit > full.len() - last_chunk_len { - // Partial consumption of the last chunk - assert_eq!(chunks_consumed, num_chunks - 1); - assert_eq!(chunks_consumed, chunks_popped + 2); - } - } - } - - #[test] - fn byte_slice() { - let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned(); - for limit in 0..full.len() { - let mut array = ByteSlice::from_slice(&full[..]); - - let mut buf = Vec::new(); - let mut chunks_popped = 0; - let mut chunks_consumed = 0; - let mut remaining = limit; - loop { - let (chunk, consumed) = array.pop_chunk(remaining); - chunks_consumed += consumed; - - if !chunk.is_empty() { - buf.extend_from_slice(&chunk); - remaining -= chunk.len(); - chunks_popped += 1; - } else { - break; - } - } - - assert_eq!(&buf[..], &full[..limit]); - if limit != 0 { - assert_eq!(chunks_popped, 1); - } else { - assert_eq!(chunks_popped, 0); - } - - if limit == full.len() { - assert_eq!(chunks_consumed, 1); - } else { - assert_eq!(chunks_consumed, 0); - } - } - } -}