From dd54f20b55420e258f73927700ca56dc4b2b4542 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 21 Sep 2017 18:09:28 -0700 Subject: [PATCH] feat(server): add experimental pipeline flush aggregation option to Http By enabling `Http::pipeline`, the connection will aggregate response writes to try to improve sending more responses in a single syscall. --- src/http/conn.rs | 3 +++ src/http/io.rs | 10 ++++++- src/server/mod.rs | 17 +++++++++++- tests/server.rs | 67 +++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/http/conn.rs b/src/http/conn.rs index 4c8d63b967..f6c8b8575a 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -47,6 +47,9 @@ where I: AsyncRead + AsyncWrite, } } + pub fn set_flush_pipeline(&mut self, enabled: bool) { + self.io.set_flush_pipeline(enabled); + } fn poll2(&mut self) -> Poll, http::Chunk, ::Error>>, io::Error> { trace!("Conn::poll()"); diff --git a/src/http/io.rs b/src/http/io.rs index fec172302c..6e31ba0a40 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -13,6 +13,7 @@ const INIT_BUFFER_SIZE: usize = 8192; pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; pub struct Buffered { + flush_pipeline: bool, io: T, read_blocked: bool, read_buf: BytesMut, @@ -31,6 +32,7 @@ impl fmt::Debug for Buffered { impl Buffered { pub fn new(io: T) -> Buffered { Buffered { + flush_pipeline: false, io: io, read_buf: BytesMut::with_capacity(0), write_buf: WriteBuf::new(), @@ -38,6 +40,10 @@ impl Buffered { } } + pub fn set_flush_pipeline(&mut self, enabled: bool) { + self.flush_pipeline = enabled; + } + pub fn read_buf(&self) -> &[u8] { self.read_buf.as_ref() } @@ -139,7 +145,9 @@ impl Write for Buffered { } fn flush(&mut self) -> io::Result<()> { - if self.write_buf.remaining() == 0 { + if self.flush_pipeline && self.read_buf.is_empty() { + Ok(()) + } else if self.write_buf.remaining() == 0 { self.io.flush() } else { loop { diff --git a/src/server/mod.rs b/src/server/mod.rs index 6ab80458a3..9960294427 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -49,6 +49,7 @@ pub use http::request::Request; /// configured with various protocol-level options such as keepalive. pub struct Http { keep_alive: bool, + pipeline: bool, _marker: PhantomData, } @@ -73,6 +74,7 @@ impl + 'static> Http { pub fn new() -> Http { Http { keep_alive: true, + pipeline: false, _marker: PhantomData, } } @@ -85,6 +87,16 @@ impl + 'static> Http { self } + /// Aggregates flushes to better support pipelined responses. + /// + /// Experimental, may be have bugs. + /// + /// Default is false. + pub fn pipeline(&mut self, enabled: bool) -> &mut Self { + self.pipeline = enabled; + self + } + /// Bind the provided `addr` and return a server ready to handle /// connections. /// @@ -185,6 +197,7 @@ impl fmt::Debug for Http { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Http") .field("keep_alive", &self.keep_alive) + .field("pipeline", &self.pipeline) .finish() } } @@ -223,8 +236,10 @@ impl ServerProto for Http } else { http::KA::Disabled }; + let mut conn = http::Conn::new(io, ka); + conn.set_flush_pipeline(self.pipeline); __ProtoBindTransport { - inner: future::ok(http::Conn::new(io, ka)), + inner: future::ok(conn), } } } diff --git a/tests/server.rs b/tests/server.rs index 834925be18..bf9dc40672 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -437,6 +437,58 @@ fn expect_continue() { assert_eq!(body, msg); } +#[test] +fn pipline_disabled() { + let server = serve(); + let mut req = connect(server.addr()); + server.reply().status(hyper::Ok); + server.reply().status(hyper::Ok); + + req.write_all(b"\ + GET / HTTP/1.1\r\n\ + Host: example.domain\r\n\ + \r\n\ + GET / HTTP/1.1\r\n\ + Host: example.domain\r\n\ + \r\n\ + ").expect("write 1"); + + let mut buf = vec![0; 4096]; + let n = req.read(&mut buf).expect("read 1"); + assert_ne!(n, 0); + let n = req.read(&mut buf).expect("read 2"); + assert_ne!(n, 0); +} + +#[test] +fn pipeline_enabled() { + let server = serve_with_options(ServeOptions { + pipeline: true, + .. Default::default() + }); + let mut req = connect(server.addr()); + server.reply().status(hyper::Ok); + server.reply().status(hyper::Ok); + + req.write_all(b"\ + GET / HTTP/1.1\r\n\ + Host: example.domain\r\n\ + \r\n\ + GET / HTTP/1.1\r\n\ + Host: example.domain\r\n\ + Connection: close\r\n\ + \r\n\ + ").expect("write 1"); + + let mut buf = vec![0; 4096]; + let n = req.read(&mut buf).expect("read 1"); + assert_ne!(n, 0); + // with pipeline enabled, both responses should have been in the first read + // so a second read should be EOF + let n = req.read(&mut buf).expect("read 2"); + assert_eq!(n, 0); +} + // ------------------------------------------------- // the Server that is used to run all the tests with // ------------------------------------------------- @@ -577,6 +629,7 @@ fn serve() -> Serve { #[derive(Default)] struct ServeOptions { keep_alive_disabled: bool, + pipeline: bool, timeout: Option, } @@ -591,15 +644,19 @@ fn serve_with_options(options: ServeOptions) -> Serve { let addr = "127.0.0.1:0".parse().unwrap(); let keep_alive = !options.keep_alive_disabled; + let pipeline = options.pipeline; let dur = options.timeout; let thread_name = format!("test-server-{:?}", dur); let thread = thread::Builder::new().name(thread_name).spawn(move || { - let srv = Http::new().keep_alive(keep_alive).bind(&addr, TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: dur, - reply: reply_rx, - }).unwrap(); + let srv = Http::new() + .keep_alive(keep_alive) + .pipeline(pipeline) + .bind(&addr, TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx, + }).unwrap(); addr_tx.send(srv.local_addr().unwrap()).unwrap(); srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); }).unwrap();