diff --git a/Cargo.lock b/Cargo.lock index 86ba2a4351..d167b3bee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -740,6 +740,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.11" @@ -914,6 +920,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -1090,6 +1102,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1559,6 +1581,7 @@ dependencies = [ "fastbloom", "getrandom 0.3.3", "hex-literal", + "indexmap", "lazy_static", "lru-slab", "rand", diff --git a/Cargo.toml b/Cargo.toml index 5b622a37e8..18ca6b678e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ futures-io = "0.3.19" getrandom = { version = "0.3", default-features = false } hdrhistogram = { version = "7.2", default-features = false } hex-literal = "0.4" +indexmap = "2.9.0" lru-slab = "0.1.2" lazy_static = "1" log = "0.4" diff --git a/fuzz/fuzz_targets/streams.rs b/fuzz/fuzz_targets/streams.rs index 340078e5f1..29bc87bf77 100644 --- a/fuzz/fuzz_targets/streams.rs +++ b/fuzz/fuzz_targets/streams.rs @@ -50,7 +50,8 @@ fuzz_target!(|input: (StreamParams, Vec)| { Streams::new(&mut state, &conn_state).accept(dir); } Operation::Finish(id) => { - let _ = SendStream::new(id, &mut state, &mut pending, &conn_state).finish(); + let _ = + SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state).finish(); } Operation::ReceivedStopSending(sid, err_code) => { Streams::new(&mut state, &conn_state) @@ -63,8 +64,8 @@ fuzz_target!(|input: (StreamParams, Vec)| { .received_reset(rs); } Operation::Reset(id) => { - let _ = - SendStream::new(id, &mut state, &mut pending, &conn_state).reset(0u32.into()); + let _ = SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state) + .reset(0u32.into()); } } } diff --git a/quinn-proto/Cargo.toml b/quinn-proto/Cargo.toml index e1d88a45b7..ae077b1e21 100644 --- a/quinn-proto/Cargo.toml +++ b/quinn-proto/Cargo.toml @@ -39,6 +39,7 @@ arbitrary = { workspace = true, optional = true } aws-lc-rs = { workspace = true, optional = true } bytes = { workspace = true } fastbloom = { workspace = true, optional = true } +indexmap = { workspace = true } lru-slab = { workspace = true } rustc-hash = { workspace = true } rand = { workspace = true } diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 10a4fc0efd..f42c765018 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -426,12 +426,12 @@ impl Connection { #[must_use] pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> { assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side()); - SendStream { + SendStream::new( id, - state: &mut self.streams, - pending: &mut self.spaces[SpaceId::Data].pending, - conn_state: &self.state, - } + &mut self.streams, + &mut self.spaces[SpaceId::Data].pending, + &self.state, + ) } /// Returns packets to transmit diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 53e42815ee..fd6b2a05f0 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -8,20 +8,15 @@ use thiserror::Error; use tracing::trace; use super::spaces::{Retransmits, ThinRetransmits}; -use crate::{ - Dir, StreamId, VarInt, - connection::streams::state::{get_or_insert_recv, get_or_insert_send}, - frame, -}; +use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame}; mod recv; use recv::Recv; pub use recv::{Chunks, ReadError, ReadableError}; mod send; -pub(crate) use send::{ByteSlice, BytesArray}; -use send::{BytesSource, Send, SendState}; pub use send::{FinishError, WriteError, Written}; +use send::{Send, SendState}; mod state; #[allow(unreachable_pub)] // fuzzing only @@ -195,6 +190,7 @@ impl RecvStream<'_> { /// Access to streams pub struct SendStream<'a> { pub(super) id: StreamId, + pub(super) index: Option, pub(super) state: &'a mut StreamsState, pub(super) pending: &'a mut Retransmits, pub(super) conn_state: &'a super::State, @@ -203,14 +199,25 @@ pub struct SendStream<'a> { #[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing) impl<'a> SendStream<'a> { #[cfg(fuzzing)] - pub fn new( + pub fn new_for_fuzzing( id: StreamId, state: &'a mut StreamsState, pending: &'a mut Retransmits, conn_state: &'a super::State, ) -> Self { + Self::new(id, state, pending, conn_state) + } + + pub(super) fn new( + id: StreamId, + state: &'a mut StreamsState, + pending: &'a mut Retransmits, + conn_state: &'a super::State, + ) -> Self { + let index = state.send.get_index_of(&id); Self { id, + index, state, pending, conn_state, @@ -221,7 +228,9 @@ impl<'a> SendStream<'a> { /// /// Returns the number of bytes successfully written. pub fn write(&mut self, data: &[u8]) -> Result { - Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes) + let prefix_len = self.write_limit()?.min(data.len()); + self.write_unchecked(data[..prefix_len].to_vec().into()); + Ok(prefix_len) } /// Send data on the given stream @@ -231,10 +240,27 @@ impl<'a> SendStream<'a> { /// [`Written::chunks`] will not count this chunk as fully written. However /// the chunk will be advanced and contain only non-written data after the call. pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result { - self.write_source(&mut BytesArray::from_chunks(data)) + let limit = self.write_limit()?; + let mut written = Written::default(); + for chunk in data { + let prefix = chunk.split_to(chunk.len().min(limit - written.bytes)); + written.bytes += prefix.len(); + self.write_unchecked(prefix); + + if chunk.is_empty() { + written.chunks += 1; + } + + debug_assert!(written.bytes <= limit); + if written.bytes == limit { + break; + } + } + Ok(written) } - fn write_source(&mut self, source: &mut B) -> Result { + /// Get how many bytes could be written immediately, or mark as blocked if zero + fn write_limit(&mut self) -> Result { if self.conn_state.is_closed() { trace!(%self.id, "write blocked; connection draining"); return Err(WriteError::Blocked); @@ -244,12 +270,8 @@ impl<'a> SendStream<'a> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(WriteError::ClosedStream)?; + let index = self.index.ok_or(WriteError::ClosedStream)?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); if limit == 0 { trace!( @@ -263,24 +285,26 @@ impl<'a> SendStream<'a> { return Err(WriteError::Blocked); } + stream.write_limit(limit) + } + + /// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()` + fn write_unchecked(&mut self, chunk: Bytes) { + let stream = self.state.send[self.index.unwrap()].as_mut().unwrap(); let was_pending = stream.is_pending(); - let written = stream.write(source, limit)?; - self.state.data_sent += written.bytes as u64; - self.state.unacked_data += written.bytes as u64; - trace!(stream = %self.id, "wrote {} bytes", written.bytes); + self.state.data_sent += chunk.len() as u64; + self.state.unacked_data += chunk.len() as u64; + trace!(stream = %self.id, "wrote {} bytes", chunk.len()); + stream.pending.write(chunk); if !was_pending { self.state.pending.push_pending(self.id, stream.priority); } - Ok(written) } /// Check if this stream was stopped, get the reason if it was pub fn stopped(&self) -> Result, ClosedStream> { - match self.state.send.get(&self.id).as_ref() { - Some(Some(s)) => Ok(s.stop_reason), - Some(None) => Ok(None), - None => Err(ClosedStream { _private: () }), - } + let index = self.index.ok_or(ClosedStream { _private: () })?; + Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason)) } /// Finish a send stream, signalling that no more data will be sent. @@ -290,12 +314,9 @@ impl<'a> SendStream<'a> { /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished pub fn finish(&mut self) -> Result<(), FinishError> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(FinishError::ClosedStream)?; + + let index = self.index.ok_or(FinishError::ClosedStream)?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); let was_pending = stream.is_pending(); stream.finish()?; @@ -312,12 +333,8 @@ impl<'a> SendStream<'a> { /// - when applied to a receive stream pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); if matches!(stream.state, SendState::ResetSent) { // Redundant reset call @@ -341,12 +358,8 @@ impl<'a> SendStream<'a> { /// - when applied to a receive stream pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); - let stream = self - .state - .send - .get_mut(&self.id) - .map(get_or_insert_send(max_send_data)) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; + let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); stream.priority = priority; Ok(()) @@ -357,13 +370,12 @@ impl<'a> SendStream<'a> { /// # Panics /// - when applied to a receive stream pub fn priority(&self) -> Result { - let stream = self - .state - .send - .get(&self.id) - .ok_or(ClosedStream { _private: () })?; + let index = self.index.ok_or(ClosedStream { _private: () })?; - Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default()) + Ok(self.state.send[index] + .as_ref() + .map(|s| s.priority) + .unwrap_or_default()) } } diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs index 7b3db809a1..759c04c48d 100644 --- a/quinn-proto/src/connection/streams/send.rs +++ b/quinn-proto/src/connection/streams/send.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use thiserror::Error; use crate::{VarInt, connection::send_buffer::SendBuffer, frame}; @@ -49,11 +48,7 @@ impl Send { } } - pub(super) fn write( - &mut self, - source: &mut S, - limit: u64, - ) -> Result { + pub(super) fn write_limit(&self, limit: u64) -> Result { if !self.is_writable() { return Err(WriteError::ClosedStream); } @@ -64,23 +59,7 @@ impl Send { if budget == 0 { return Err(WriteError::Blocked); } - let mut limit = limit.min(budget) as usize; - - let mut result = Written::default(); - loop { - let (chunk, chunks_consumed) = source.pop_chunk(limit); - result.chunks += chunks_consumed; - result.bytes += chunk.len(); - - if chunk.is_empty() { - break; - } - - limit -= chunk.len(); - self.pending.write(chunk); - } - - Ok(result) + Ok(limit.min(budget) as usize) } /// Update stream state due to a reset sent by the local application @@ -143,106 +122,6 @@ impl Send { } } -/// A [`BytesSource`] implementation for `&'a mut [Bytes]` -/// -/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to -/// a configured limit. -pub(crate) struct BytesArray<'a> { - /// The wrapped slice of `Bytes` - chunks: &'a mut [Bytes], - /// The amount of chunks consumed from this source - consumed: usize, -} - -impl<'a> BytesArray<'a> { - pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self { - Self { - chunks, - consumed: 0, - } - } -} - -impl BytesSource for BytesArray<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { - // The loop exists to skip empty chunks while still marking them as - // consumed - let mut chunks_consumed = 0; - - while self.consumed < self.chunks.len() { - let chunk = &mut self.chunks[self.consumed]; - - if chunk.len() <= limit { - let chunk = std::mem::take(chunk); - self.consumed += 1; - chunks_consumed += 1; - if chunk.is_empty() { - continue; - } - return (chunk, chunks_consumed); - } else if limit > 0 { - let chunk = chunk.split_to(limit); - return (chunk, chunks_consumed); - } else { - break; - } - } - - (Bytes::new(), chunks_consumed) - } -} - -/// A [`BytesSource`] implementation for `&[u8]` -/// -/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily -/// created from a reference. This allows to defer the allocation until it is -/// known how much data needs to be copied. -pub(crate) struct ByteSlice<'a> { - /// The wrapped byte slice - data: &'a [u8], -} - -impl<'a> ByteSlice<'a> { - pub(crate) fn from_slice(data: &'a [u8]) -> Self { - Self { data } - } -} - -impl BytesSource for ByteSlice<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { - let limit = limit.min(self.data.len()); - if limit == 0 { - return (Bytes::new(), 0); - } - - let chunk = Bytes::from(self.data[..limit].to_owned()); - self.data = &self.data[chunk.len()..]; - - let chunks_consumed = usize::from(self.data.is_empty()); - (chunk, chunks_consumed) - } -} - -/// A source of one or more buffers which can be converted into `Bytes` buffers on demand -/// -/// The purpose of this data type is to defer conversion as long as possible, -/// so that no heap allocation is required in case no data is writable. -pub(super) trait BytesSource { - /// Returns the next chunk from the source of owned chunks. - /// - /// This method will consume parts of the source. - /// Calling it will yield `Bytes` elements up to the configured `limit`. - /// - /// The method returns a tuple: - /// - The first item is the yielded `Bytes` element. The element will be - /// empty if the limit is zero or no more data is available. - /// - The second item returns how many complete chunks inside the source had - /// had been consumed. This can be less than 1, if a chunk inside the - /// source had been truncated in order to adhere to the limit. It can also - /// be more than 1, if zero-length chunks had been skipped. - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize); -} - /// Indicates how many bytes and chunks had been transferred in a write operation #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct Written { @@ -303,100 +182,3 @@ pub enum FinishError { #[error("closed stream")] ClosedStream, } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn bytes_array() { - let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned(); - for limit in 0..full.len() { - let mut chunks = [ - Bytes::from_static(b""), - Bytes::from_static(b"Hello "), - Bytes::from_static(b"Wo"), - Bytes::from_static(b""), - Bytes::from_static(b"r"), - Bytes::from_static(b"ld"), - Bytes::from_static(b""), - Bytes::from_static(b" 12345678"), - Bytes::from_static(b"9 ABCDE"), - Bytes::from_static(b"F"), - Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"), - ]; - let num_chunks = chunks.len(); - let last_chunk_len = chunks[chunks.len() - 1].len(); - - let mut array = BytesArray::from_chunks(&mut chunks); - - let mut buf = Vec::new(); - let mut chunks_popped = 0; - let mut chunks_consumed = 0; - let mut remaining = limit; - loop { - let (chunk, consumed) = array.pop_chunk(remaining); - chunks_consumed += consumed; - - if !chunk.is_empty() { - buf.extend_from_slice(&chunk); - remaining -= chunk.len(); - chunks_popped += 1; - } else { - break; - } - } - - assert_eq!(&buf[..], &full[..limit]); - - if limit == full.len() { - // Full consumption of the last chunk - assert_eq!(chunks_consumed, num_chunks); - // Since there are empty chunks, we consume more than there are popped - assert_eq!(chunks_consumed, chunks_popped + 3); - } else if limit > full.len() - last_chunk_len { - // Partial consumption of the last chunk - assert_eq!(chunks_consumed, num_chunks - 1); - assert_eq!(chunks_consumed, chunks_popped + 2); - } - } - } - - #[test] - fn byte_slice() { - let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned(); - for limit in 0..full.len() { - let mut array = ByteSlice::from_slice(&full[..]); - - let mut buf = Vec::new(); - let mut chunks_popped = 0; - let mut chunks_consumed = 0; - let mut remaining = limit; - loop { - let (chunk, consumed) = array.pop_chunk(remaining); - chunks_consumed += consumed; - - if !chunk.is_empty() { - buf.extend_from_slice(&chunk); - remaining -= chunk.len(); - chunks_popped += 1; - } else { - break; - } - } - - assert_eq!(&buf[..], &full[..limit]); - if limit != 0 { - assert_eq!(chunks_popped, 1); - } else { - assert_eq!(chunks_popped, 0); - } - - if limit == full.len() { - assert_eq!(chunks_consumed, 1); - } else { - assert_eq!(chunks_consumed, 0); - } - } - } -} diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e05311423d..d0723f5084 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1,11 +1,8 @@ -use std::{ - collections::{VecDeque, hash_map}, - convert::TryFrom, - mem, -}; +use std::{collections::VecDeque, convert::TryFrom, mem}; use bytes::BufMut; -use rustc_hash::FxHashMap; +use indexmap::IndexMap; +use rustc_hash::{FxBuildHasher, FxHashMap}; use tracing::{debug, trace}; use super::{ @@ -69,7 +66,7 @@ impl StreamRecv { pub struct StreamsState { pub(super) side: Side, // Set of streams that are currently open, or could be immediately opened by the peer - pub(super) send: FxHashMap>>, + pub(super) send: IndexMap>, FxBuildHasher>, pub(super) recv: FxHashMap>, pub(super) free_recv: Vec, pub(super) next: [u64; 2], @@ -149,7 +146,7 @@ impl StreamsState { ) -> Self { let mut this = Self { side, - send: FxHashMap::default(), + send: IndexMap::default(), recv: FxHashMap::default(), free_recv: Vec::new(), next: [0, 0], @@ -225,7 +222,7 @@ impl StreamsState { // We don't bother calling `stream_freed` here because we explicitly reset affected // counters below. let id = StreamId::new(self.side, dir, i); - self.send.remove(&id).unwrap(); + self.send.swap_remove(&id).unwrap(); if let Dir::Bi = dir { self.recv.remove(&id).unwrap(); } @@ -379,10 +376,10 @@ impl StreamsState { pub(crate) fn reset_acked(&mut self, id: StreamId) { match self.send.entry(id) { - hash_map::Entry::Vacant(_) => {} - hash_map::Entry::Occupied(e) => { + indexmap::map::Entry::Vacant(_) => {} + indexmap::map::Entry::Occupied(e) => { if let Some(SendState::ResetSent) = e.get().as_ref().map(|s| s.state) { - e.remove_entry(); + e.swap_remove_entry(); self.stream_freed(id, StreamHalf::Send); } } @@ -636,8 +633,8 @@ impl StreamsState { pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) { let mut entry = match self.send.entry(frame.id) { - hash_map::Entry::Vacant(_) => return, - hash_map::Entry::Occupied(e) => e, + indexmap::map::Entry::Vacant(_) => return, + indexmap::map::Entry::Occupied(e) => e, }; let stream = match entry.get_mut().as_mut() { @@ -662,7 +659,7 @@ impl StreamsState { return; } - entry.remove_entry(); + entry.swap_remove_entry(); self.stream_freed(id, StreamHalf::Send); self.events.push_back(StreamEvent::Finished { id }); } @@ -1290,12 +1287,7 @@ mod tests { .open(Dir::Uni) .unwrap(); - let mut stream = SendStream { - id, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream = SendStream::new(id, &mut server, &mut pending, &state); let error_code = 0u32.into(); stream.state.received_stop_sending(id, error_code); @@ -1353,29 +1345,14 @@ mod tests { let id_mid = streams.open(Dir::Bi).unwrap(); let id_low = streams.open(Dir::Bi).unwrap(); - let mut mid = SendStream { - id: id_mid, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut mid = SendStream::new(id_mid, &mut server, &mut pending, &state); mid.write(b"mid").unwrap(); - let mut low = SendStream { - id: id_low, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut low = SendStream::new(id_low, &mut server, &mut pending, &state); low.set_priority(-1).unwrap(); low.write(b"low").unwrap(); - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(1).unwrap(); high.write(b"high").unwrap(); @@ -1408,21 +1385,11 @@ mod tests { let id_high = streams.open(Dir::Bi).unwrap(); let id_mid = streams.open(Dir::Bi).unwrap(); - let mut mid = SendStream { - id: id_mid, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut mid = SendStream::new(id_mid, &mut server, &mut pending, &state); assert_eq!(mid.write(b"mid").unwrap(), 3); assert_eq!(server.pending.len(), 1); - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(1).unwrap(); assert_eq!(high.write(&[0; 200]).unwrap(), 200); assert_eq!(server.pending.len(), 2); @@ -1430,12 +1397,7 @@ mod tests { // Requeue the high priority stream to lowest priority. The initial send // still uses high priority since it's queued that way. After that it will // switch to low priority - let mut high = SendStream { - id: id_high, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut high = SendStream::new(id_high, &mut server, &mut pending, &state); high.set_priority(-1).unwrap(); let mut buf = Vec::with_capacity(1000); @@ -1478,28 +1440,13 @@ mod tests { let id_b = streams.open(Dir::Bi).unwrap(); let id_c = streams.open(Dir::Bi).unwrap(); - let mut stream_a = SendStream { - id: id_a, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_a = SendStream::new(id_a, &mut server, &mut pending, &state); stream_a.write(&[b'a'; 100]).unwrap(); - let mut stream_b = SendStream { - id: id_b, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_b = SendStream::new(id_b, &mut server, &mut pending, &state); stream_b.write(&[b'b'; 100]).unwrap(); - let mut stream_c = SendStream { - id: id_c, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_c = SendStream::new(id_c, &mut server, &mut pending, &state); stream_c.write(&[b'c'; 100]).unwrap(); let mut metas = vec![]; @@ -1558,20 +1505,10 @@ mod tests { let id_b = streams.open(Dir::Bi).unwrap(); let id_c = streams.open(Dir::Bi).unwrap(); - let mut stream_a = SendStream { - id: id_a, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_a = SendStream::new(id_a, &mut server, &mut pending, &state); stream_a.write(&[b'a'; 100]).unwrap(); - let mut stream_b = SendStream { - id: id_b, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_b = SendStream::new(id_b, &mut server, &mut pending, &state); stream_b.write(&[b'b'; 100]).unwrap(); let mut metas = vec![]; @@ -1584,12 +1521,7 @@ mod tests { metas.extend(meta); // Queue stream_c which has higher priority - let mut stream_c = SendStream { - id: id_c, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream_c = SendStream::new(id_c, &mut server, &mut pending, &state); stream_c.set_priority(1).unwrap(); stream_c.write(&[b'b'; 100]).unwrap(); @@ -1658,12 +1590,7 @@ mod tests { }; let id = streams.open(Dir::Uni).unwrap(); - let mut stream = SendStream { - id, - state: &mut server, - pending: &mut pending, - conn_state: &state, - }; + let mut stream = SendStream::new(id, &mut server, &mut pending, &state); stream.write(b"hello").unwrap(); stream.reset(0u32.into()).unwrap();