Skip to content

Commit

Permalink
perf(h1): convert buffer to flatten strategy with auto detection
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Feb 6, 2018
1 parent 3a12446 commit d2fdf1f
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 6 deletions.
11 changes: 11 additions & 0 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct AsyncIo<T> {
flushed: bool,
inner: T,
max_read_vecs: usize,
num_writes: usize,
}

impl<T> AsyncIo<T> {
Expand All @@ -81,6 +82,7 @@ impl<T> AsyncIo<T> {
flushed: false,
inner: inner,
max_read_vecs: READ_VECS_CNT,
num_writes: 0,
}
}

Expand All @@ -107,6 +109,10 @@ impl<T> AsyncIo<T> {
pub fn blocked(&self) -> bool {
self.blocked
}

pub fn num_writes(&self) -> usize {
self.num_writes
}
}

impl AsyncIo<Buf> {
Expand Down Expand Up @@ -160,6 +166,7 @@ impl<T: Read> Read for AsyncIo<T> {

impl<T: Write> Write for AsyncIo<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.num_writes += 1;
if let Some(err) = self.error.take() {
Err(err)
} else if self.bytes_until_block == 0 {
Expand Down Expand Up @@ -198,6 +205,9 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]);
let mut n = 0;
let mut ret = Ok(0);
// each call to write() will increase our count, but we assume
// that if iovecs are used, its really only 1 write call.
let num_writes = self.num_writes;
for iovec in &bufs[..i] {
match self.write(iovec) {
Ok(num) => {
Expand All @@ -216,6 +226,7 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
}
}
}
self.num_writes = num_writes + 1;
ret
};
match r {
Expand Down
195 changes: 189 additions & 6 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -55,7 +56,7 @@ where
self.write_buf.set_strategy(if enabled {
Strategy::Flatten
} else {
Strategy::Queue
Strategy::Auto
});
}

Expand All @@ -68,6 +69,11 @@ where
self.read_buf.as_ref()
}

//TODO(perf): don't return a `&mut Vec<u8>`, but a wrapper
//that protects the Vec when growing. Specifically, if this
//Vec couldn't be reset, as it's position isn't at the end,
//any new reserves will copy the bytes before the position,
//which is unnecessary.
pub fn write_buf_mut(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.head_mut();
buf.maybe_reset();
Expand Down Expand Up @@ -154,7 +160,7 @@ where
try_nb!(self.io.flush());
} else {
loop {
let n = try_ready!(self.io.write_buf(&mut self.write_buf));
let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto()));
debug!("flushed {} bytes", n);
if self.write_buf.remaining() == 0 {
break;
Expand Down Expand Up @@ -263,7 +269,7 @@ impl<B> WriteBuf<B> {
WriteBuf {
buf: BufDeque::new(),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
strategy: Strategy::Queue,
strategy: Strategy::Auto,
}
}
}
Expand All @@ -277,14 +283,19 @@ where
self.strategy = strategy;
}

#[inline]
fn auto(&mut self) -> WriteBufAuto<B> {
WriteBufAuto::new(self)
}

fn buffer(&mut self, buf: B) {
match self.strategy {
Strategy::Flatten => {
let head = self.head_mut();
head.maybe_reset();
head.bytes.put(buf);
},
Strategy::Queue => {
Strategy::Auto | Strategy::Queue => {
self.buf.bufs.push_back(VecOrBuf::Buf(buf));
},
}
Expand All @@ -295,8 +306,7 @@ where
Strategy::Flatten => {
self.remaining() < self.max_buf_size
},
Strategy::Queue => {
// for now, the simplest of heuristics
Strategy::Auto | Strategy::Queue => {
self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS
&& self.remaining() < self.max_buf_size
},
Expand Down Expand Up @@ -355,8 +365,68 @@ impl<B: Buf> Buf for WriteBuf<B> {
}
}

/// Detects when wrapped `WriteBuf` is used for vectored IO, and
/// adjusts the `WriteBuf` strategy if not.
struct WriteBufAuto<'a, B: Buf + 'a> {
bytes_called: Cell<bool>,
bytes_vec_called: Cell<bool>,
inner: &'a mut WriteBuf<B>,
}

impl<'a, B: Buf> WriteBufAuto<'a, B> {
fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
WriteBufAuto {
bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false),
inner: inner,
}
}
}

impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
#[inline]
fn remaining(&self) -> usize {
self.inner.remaining()
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bytes_called.set(true);
self.inner.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}

#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
self.bytes_vec_called.set(true);
self.inner.bytes_vec(dst)
}
}

impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let Strategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = Strategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = Strategy::Flatten;
let mut vec = Vec::new();
vec.put(&mut self.inner.buf);
self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec)));
}
}
}
}


#[derive(Debug)]
enum Strategy {
Auto,
Flatten,
Queue,
}
Expand Down Expand Up @@ -536,4 +606,117 @@ mod tests {
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello");
}

#[test]
fn write_buf_queue() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();

let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);

buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();

assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
}

#[test]
fn write_buf_reclaim_vec() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();

let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);

buffered.write_buf_mut().extend(b"hello ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"world, ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);

// after flushing, reclaim the Vec
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.remaining(), 0);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);

// add a user buf in the way
buffered.buffer(Cursor::new(b"it's ".to_vec()));
// and then add more hyper bytes
buffered.write_buf_mut().extend(b"hyper!");
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);

assert_eq!(buffered.io, b"hello world, it's hyper!");
}

#[test]
fn write_buf_flatten() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();

let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);

buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);

buffered.flush().unwrap();

assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}

#[test]
fn write_buf_auto_flatten() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();

let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);

// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's hyper!");
//buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();

assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 2);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}

#[test]
fn write_buf_queue_does_not_auto() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();

let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Queue);

// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();

assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 4);
}
}

0 comments on commit d2fdf1f

Please sign in to comment.