From eb0c64639503bbd4f6e3b1ce3a02bff8eeea7ee8 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 26 May 2021 16:47:36 -0700 Subject: [PATCH] fix(http1): reduce memory used with flatten write strategy If the write buffer was filled with large bufs from the user, such that it couldn't be fully written to the transport, the write buffer could start to grow significantly as it moved its cursor without shifting over the unwritten bytes. This will now try to shift over the unwritten bytes if the next buf wouldn't fit in the already allocated space. --- src/proto/h1/io.rs | 70 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 5cf670991b..48c6d87885 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -56,7 +56,12 @@ where B: Buf, { pub(crate) fn new(io: T) -> Buffered { - let write_buf = WriteBuf::new(&io); + let strategy = if io.is_write_vectored() { + WriteStrategy::Queue + } else { + WriteStrategy::Flatten + }; + let write_buf = WriteBuf::new(strategy); Buffered { flush_pipeline: false, io, @@ -419,6 +424,24 @@ impl> Cursor { } impl Cursor> { + /// If we've advanced the position a bit in this cursor, and wish to + /// extend the underlying vector, we may wish to unshift the "read" bytes + /// off, and move everything else over. + fn maybe_unshift(&mut self, additional: usize) { + if self.pos == 0 { + // nothing to do + return; + } + + if self.bytes.capacity() - self.bytes.len() >= additional { + // there's room! + return; + } + + self.bytes.drain(0..self.pos); + self.pos = 0; + } + fn reset(&mut self) { self.pos = 0; self.bytes.clear(); @@ -463,12 +486,7 @@ pub(super) struct WriteBuf { } impl WriteBuf { - fn new(io: &impl AsyncWrite) -> WriteBuf { - let strategy = if io.is_write_vectored() { - WriteStrategy::Queue - } else { - WriteStrategy::Flatten - }; + fn new(strategy: WriteStrategy) -> WriteBuf { WriteBuf { headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, @@ -492,6 +510,8 @@ where match self.strategy { WriteStrategy::Flatten => { let head = self.headers_mut(); + + head.maybe_unshift(buf.remaining()); //perf: This is a little faster than >::put, //but accomplishes the same result. loop { @@ -804,7 +824,6 @@ mod tests { let _ = pretty_env_logger::try_init(); let mock = Mock::new() - // Just a single write .write(b"hello world, it's hyper!") .build(); @@ -820,6 +839,41 @@ mod tests { buffered.flush().await.expect("flush"); } + #[test] + fn write_buf_flatten_partially_flushed() { + let _ = pretty_env_logger::try_init(); + + let b = |s: &str| Cursor::new(s.as_bytes().to_vec()); + + let mut write_buf = WriteBuf::>>::new(WriteStrategy::Flatten); + + write_buf.buffer(b("hello ")); + write_buf.buffer(b("world, ")); + + assert_eq!(write_buf.chunk(), b"hello world, "); + + // advance most of the way, but not all + write_buf.advance(11); + + assert_eq!(write_buf.chunk(), b", "); + assert_eq!(write_buf.headers.pos, 11); + assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE); + + // there's still room in the headers buffer, so just push on the end + write_buf.buffer(b("it's hyper!")); + + assert_eq!(write_buf.chunk(), b", it's hyper!"); + assert_eq!(write_buf.headers.pos, 11); + + let rem1 = write_buf.remaining(); + let cap = write_buf.headers.bytes.capacity(); + + // but when this would go over capacity, don't copy the old bytes + write_buf.buffer(Cursor::new(vec![b'X'; cap])); + assert_eq!(write_buf.remaining(), cap + rem1); + assert_eq!(write_buf.headers.pos, 0); + } + #[tokio::test] async fn write_buf_queue_disable_auto() { let _ = pretty_env_logger::try_init();