diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 5cf670991b..48c6d87885 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -56,7 +56,12 @@ where B: Buf, { pub(crate) fn new(io: T) -> Buffered { - let write_buf = WriteBuf::new(&io); + let strategy = if io.is_write_vectored() { + WriteStrategy::Queue + } else { + WriteStrategy::Flatten + }; + let write_buf = WriteBuf::new(strategy); Buffered { flush_pipeline: false, io, @@ -419,6 +424,24 @@ impl> Cursor { } impl Cursor> { + /// If we've advanced the position a bit in this cursor, and wish to + /// extend the underlying vector, we may wish to unshift the "read" bytes + /// off, and move everything else over. + fn maybe_unshift(&mut self, additional: usize) { + if self.pos == 0 { + // nothing to do + return; + } + + if self.bytes.capacity() - self.bytes.len() >= additional { + // there's room! + return; + } + + self.bytes.drain(0..self.pos); + self.pos = 0; + } + fn reset(&mut self) { self.pos = 0; self.bytes.clear(); @@ -463,12 +486,7 @@ pub(super) struct WriteBuf { } impl WriteBuf { - fn new(io: &impl AsyncWrite) -> WriteBuf { - let strategy = if io.is_write_vectored() { - WriteStrategy::Queue - } else { - WriteStrategy::Flatten - }; + fn new(strategy: WriteStrategy) -> WriteBuf { WriteBuf { headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, @@ -492,6 +510,8 @@ where match self.strategy { WriteStrategy::Flatten => { let head = self.headers_mut(); + + head.maybe_unshift(buf.remaining()); //perf: This is a little faster than >::put, //but accomplishes the same result. loop { @@ -804,7 +824,6 @@ mod tests { let _ = pretty_env_logger::try_init(); let mock = Mock::new() - // Just a single write .write(b"hello world, it's hyper!") .build(); @@ -820,6 +839,41 @@ mod tests { buffered.flush().await.expect("flush"); } + #[test] + fn write_buf_flatten_partially_flushed() { + let _ = pretty_env_logger::try_init(); + + let b = |s: &str| Cursor::new(s.as_bytes().to_vec()); + + let mut write_buf = WriteBuf::>>::new(WriteStrategy::Flatten); + + write_buf.buffer(b("hello ")); + write_buf.buffer(b("world, ")); + + assert_eq!(write_buf.chunk(), b"hello world, "); + + // advance most of the way, but not all + write_buf.advance(11); + + assert_eq!(write_buf.chunk(), b", "); + assert_eq!(write_buf.headers.pos, 11); + assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE); + + // there's still room in the headers buffer, so just push on the end + write_buf.buffer(b("it's hyper!")); + + assert_eq!(write_buf.chunk(), b", it's hyper!"); + assert_eq!(write_buf.headers.pos, 11); + + let rem1 = write_buf.remaining(); + let cap = write_buf.headers.bytes.capacity(); + + // but when this would go over capacity, don't copy the old bytes + write_buf.buffer(Cursor::new(vec![b'X'; cap])); + assert_eq!(write_buf.remaining(), cap + rem1); + assert_eq!(write_buf.headers.pos, 0); + } + #[tokio::test] async fn write_buf_queue_disable_auto() { let _ = pretty_env_logger::try_init();