From 6ade21aa7f16dfeb6c0c53fe39c3f168f5f8aec1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 28 Dec 2017 18:56:15 -0800 Subject: [PATCH] feat(server): change default dispatcher - Deprecates the `no_proto` configuration on `Server`. It is always enabled. - Deprecates all pieces related to tokio-proto. - Makes the tokio-proto crate optional, and the `server-proto` feature can be used to completely remove the dependency. It is enabled by default. --- .travis.yml | 7 ++-- Cargo.toml | 4 +- examples/hello.rs | 3 +- examples/params.rs | 3 +- examples/send_file.rs | 3 +- examples/server.rs | 3 +- src/client/mod.rs | 2 +- src/lib.rs | 17 ++++---- src/mock.rs | 9 ++++ src/proto/body.rs | 96 ++++++++++++++++++++++++++----------------- src/proto/conn.rs | 23 +++++++---- src/proto/dispatch.rs | 2 + src/proto/mod.rs | 4 +- src/server/mod.rs | 47 ++++++++------------- tests/server.rs | 19 ++------- 15 files changed, 124 insertions(+), 118 deletions(-) diff --git a/.travis.yml b/.travis.yml index f7f4840132..3021d84f6f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,9 @@ matrix: env: FEATURES="--features nightly" - rust: beta - rust: stable + env: HYPER_DOCS=1 - rust: stable - env: HYPER_NO_PROTO=1 + env: FEATURES="--no-default-features" - rust: stable env: FEATURES="--features compat" - rust: 1.17.0 @@ -37,7 +38,7 @@ addons: after_success: - - '[ $TRAVIS_RUST_VERSION = stable ] && + - '[ "$HYPER_DOCS" = "1" ] && LOCAL="~/.local" && export PATH=$LOCAL/bin:$PATH && wget https://github.com/SimonKagstrom/kcov/archive/master.tar.gz && tar xzf master.tar.gz && mkdir kcov-master/build && cd kcov-master/build && @@ -51,7 +52,7 @@ after_success: fi; done && kcov --coveralls-id=$TRAVIS_JOB_ID --merge target/cov target/cov/*' - - '[ $TRAVIS_PULL_REQUEST = false ] && [ $TRAVIS_RUST_VERSION = stable ] && + - '[ $TRAVIS_PULL_REQUEST = false ] && [ "$HYPER_DOCS" = "1" ] && { [ "$TRAVIS_TAG" != "" ] || [ "$TRAVIS_BRANCH" == "master" ]; } && ./.travis/docs.sh' diff --git a/Cargo.toml b/Cargo.toml index de78f1ef37..62e2bfc680 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ percent-encoding = "1.0" relay = "0.1" time = "0.1" tokio-core = "0.1.6" -tokio-proto = "0.1" +tokio-proto = { version = "0.1", optional = true } tokio-service = "0.1" tokio-io = "0.1" unicase = "2.0" @@ -48,4 +48,4 @@ default = ["server-proto"] nightly = [] raw_status = [] compat = [ "http" ] -server-proto = [] +server-proto = ["tokio-proto"] diff --git a/examples/hello.rs b/examples/hello.rs index 2286b60dc8..14b9035152 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,8 +19,7 @@ fn main() { .with_body(PHRASE)) })); - let mut server = Http::new().bind(&addr, new_service).unwrap(); - server.no_proto(); + let server = Http::new().bind(&addr, new_service).unwrap(); println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); server.run().unwrap(); } diff --git a/examples/params.rs b/examples/params.rs index 86c00cbc5a..a21fea619e 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -99,8 +99,7 @@ fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:1337".parse().unwrap(); - let mut server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap(); - server.no_proto(); + let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap(); println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); server.run().unwrap(); } diff --git a/examples/send_file.rs b/examples/send_file.rs index e0ce7cee1e..071fde025b 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -135,8 +135,7 @@ fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:1337".parse().unwrap(); - let mut server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap(); - server.no_proto(); + let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap(); println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); server.run().unwrap(); } diff --git a/examples/server.rs b/examples/server.rs index 4c790a7775..4e164bf906 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -47,8 +47,7 @@ fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:1337".parse().unwrap(); - let mut server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); - server.no_proto(); + let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); server.run().unwrap(); } diff --git a/src/client/mod.rs b/src/client/mod.rs index 6b9c8bacb6..4aac289670 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -198,7 +198,7 @@ where C: Connect, let pooled = pool.pooled(pool_key, tx); let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); - handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err))); + handle.spawn(dispatch.map_err(|err| error!("client connection error: {}", err))); pooled }) }; diff --git a/src/lib.rs b/src/lib.rs index 2963df80ae..40cab25c24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ extern crate relay; extern crate time; extern crate tokio_core as tokio; #[macro_use] extern crate tokio_io; +#[cfg(feature = "tokio-proto")] extern crate tokio_proto; extern crate tokio_service; extern crate unicase; @@ -55,17 +56,13 @@ pub use proto::RawStatus; macro_rules! feat_server_proto { ($($i:item)*) => ($( - #[cfg_attr( - not(feature = "server-proto"), - deprecated( - since="0.11.7", - note="server-proto was recently added to default features, but you have disabled default features. A future version will remove these types if the server-proto feature is not enabled." - ) - )] - #[cfg_attr( - not(feature = "server-proto"), - allow(deprecated) + #[cfg(feature = "server-proto")] + #[deprecated( + since="0.11.11", + note="All usage of the tokio-proto crate is going away." )] + #[doc(hidden)] + #[allow(deprecated)] $i )*) } diff --git a/src/mock.rs b/src/mock.rs index a459348521..39c0312a31 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -87,14 +87,23 @@ impl AsyncIo { } impl AsyncIo { + #[cfg(feature = "tokio-proto")] + //TODO: fix proto::conn::tests to not use tokio-proto API, + //and then this cfg flag go away pub fn new_buf>>(buf: T, bytes: usize) -> AsyncIo { AsyncIo::new(Buf::wrap(buf.into()), bytes) } + #[cfg(feature = "tokio-proto")] + //TODO: fix proto::conn::tests to not use tokio-proto API, + //and then this cfg flag go away pub fn new_eof() -> AsyncIo { AsyncIo::new(Buf::wrap(Vec::new().into()), 1) } + #[cfg(feature = "tokio-proto")] + //TODO: fix proto::conn::tests to not use tokio-proto API, + //and then this cfg flag go away pub fn flushed(&self) -> bool { self.flushed } diff --git a/src/proto/body.rs b/src/proto/body.rs index 5a6c773d7e..fedd9b0938 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -1,11 +1,13 @@ use bytes::Bytes; use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; use futures::sync::{mpsc, oneshot}; +#[cfg(feature = "tokio-proto")] use tokio_proto; use std::borrow::Cow; use super::Chunk; +#[cfg(feature = "tokio-proto")] pub type TokioBody = tokio_proto::streaming::Body; pub type BodySender = mpsc::Sender>; @@ -16,17 +18,21 @@ pub struct Body(Inner); #[derive(Debug)] enum Inner { + #[cfg(feature = "tokio-proto")] Tokio(TokioBody), - Hyper { - close_tx: oneshot::Sender<()>, + Chan { + close_tx: oneshot::Sender, rx: mpsc::Receiver>, - } + }, + Once(Option), + Empty, } //pub(crate) #[derive(Debug)] pub struct ChunkSender { - close_rx: oneshot::Receiver<()>, + close_rx: oneshot::Receiver, + close_rx_check: bool, tx: BodySender, } @@ -34,15 +40,14 @@ impl Body { /// Return an empty body stream #[inline] pub fn empty() -> Body { - Body(Inner::Tokio(TokioBody::empty())) + Body(Inner::Empty) } /// Return a body stream with an associated sender half #[inline] pub fn pair() -> (mpsc::Sender>, Body) { - let (tx, rx) = TokioBody::pair(); - let rx = Body(Inner::Tokio(rx)); - (tx, rx) + let (tx, rx) = channel(); + (tx.tx, rx) } } @@ -60,13 +65,16 @@ impl Stream for Body { #[inline] fn poll(&mut self) -> Poll, ::Error> { match self.0 { + #[cfg(feature = "tokio-proto")] Inner::Tokio(ref mut rx) => rx.poll(), - Inner::Hyper { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { + Inner::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), Async::Ready(Some(Err(err))) => Err(err), Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), }, + Inner::Once(ref mut val) => Ok(Async::Ready(val.take())), + Inner::Empty => Ok(Async::Ready(None)), } } } @@ -78,9 +86,10 @@ pub fn channel() -> (ChunkSender, Body) { let tx = ChunkSender { close_rx: close_rx, + close_rx_check: true, tx: tx, }; - let rx = Body(Inner::Hyper { + let rx = Body(Inner::Chan { close_tx: close_tx, rx: rx, }); @@ -90,9 +99,16 @@ pub fn channel() -> (ChunkSender, Body) { impl ChunkSender { pub fn poll_ready(&mut self) -> Poll<(), ()> { - match self.close_rx.poll() { - Ok(Async::Ready(())) | Err(_) => return Err(()), - Ok(Async::NotReady) => (), + if self.close_rx_check { + match self.close_rx.poll() { + Ok(Async::Ready(true)) | Err(_) => return Err(()), + Ok(Async::Ready(false)) => { + // needed to allow converting into a plain mpsc::Receiver + // if it has been, the tx will send false to disable this check + self.close_rx_check = false; + } + Ok(Async::NotReady) => (), + } } self.tx.poll_ready().map_err(|_| ()) @@ -107,63 +123,67 @@ impl ChunkSender { } } -// deprecate soon, but can't really deprecate trait impls -#[doc(hidden)] -impl From for tokio_proto::streaming::Body { - #[inline] - fn from(b: Body) -> tokio_proto::streaming::Body { - match b.0 { - Inner::Tokio(b) => b, - Inner::Hyper { close_tx, rx } => { - warn!("converting hyper::Body into a tokio_proto Body is deprecated"); - ::std::mem::forget(close_tx); - rx.into() +feat_server_proto! { + impl From for tokio_proto::streaming::Body { + fn from(b: Body) -> tokio_proto::streaming::Body { + match b.0 { + Inner::Tokio(b) => b, + Inner::Chan { close_tx, rx } => { + // disable knowing if the Rx gets dropped, since we cannot + // pass this tx along. + let _ = close_tx.send(false); + rx.into() + }, + Inner::Once(Some(chunk)) => TokioBody::from(chunk), + Inner::Once(None) | + Inner::Empty => TokioBody::empty(), } } } -} -// deprecate soon, but can't really deprecate trait impls -#[doc(hidden)] -impl From> for Body { - #[inline] - fn from(tokio_body: tokio_proto::streaming::Body) -> Body { - Body(Inner::Tokio(tokio_body)) + impl From> for Body { + fn from(tokio_body: tokio_proto::streaming::Body) -> Body { + Body(Inner::Tokio(tokio_body)) + } } } impl From>> for Body { #[inline] fn from(src: mpsc::Receiver>) -> Body { - TokioBody::from(src).into() + let (tx, _) = oneshot::channel(); + Body(Inner::Chan { + close_tx: tx, + rx: src, + }) } } impl From for Body { #[inline] fn from (chunk: Chunk) -> Body { - TokioBody::from(chunk).into() + Body(Inner::Once(Some(chunk))) } } impl From for Body { #[inline] fn from (bytes: Bytes) -> Body { - Body::from(TokioBody::from(Chunk::from(bytes))) + Body::from(Chunk::from(bytes)) } } impl From> for Body { #[inline] fn from (vec: Vec) -> Body { - Body::from(TokioBody::from(Chunk::from(vec))) + Body::from(Chunk::from(vec)) } } impl From<&'static [u8]> for Body { #[inline] fn from (slice: &'static [u8]) -> Body { - Body::from(TokioBody::from(Chunk::from(slice))) + Body::from(Chunk::from(slice)) } } @@ -180,14 +200,14 @@ impl From> for Body { impl From for Body { #[inline] fn from (s: String) -> Body { - Body::from(TokioBody::from(Chunk::from(s.into_bytes()))) + Body::from(Chunk::from(s.into_bytes())) } } impl From<&'static str> for Body { #[inline] fn from(slice: &'static str) -> Body { - Body::from(TokioBody::from(Chunk::from(slice.as_bytes()))) + Body::from(Chunk::from(slice.as_bytes())) } } diff --git a/src/proto/conn.rs b/src/proto/conn.rs index 2bdcef758e..d03fe1e39b 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -2,9 +2,12 @@ use std::fmt; use std::io::{self, Write}; use std::marker::PhantomData; -use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; +use futures::{Async, AsyncSink, Poll, StartSend}; +#[cfg(feature = "tokio-proto")] +use futures::{Sink, Stream}; use futures::task::Task; use tokio_io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "tokio-proto")] use tokio_proto::streaming::pipeline::{Frame, Transport}; use proto::Http1Transaction; @@ -51,6 +54,7 @@ where I: AsyncRead + AsyncWrite, self.io.set_flush_pipeline(enabled); } + #[cfg(feature = "tokio-proto")] fn poll_incoming(&mut self) -> Poll, super::Chunk, ::Error>>, io::Error> { trace!("Conn::poll_incoming()"); @@ -123,7 +127,7 @@ where I: AsyncRead + AsyncWrite, } } - fn can_write_continue(&self) -> bool { + pub fn can_write_continue(&self) -> bool { match self.state.writing { Writing::Continue(..) => true, _ => false, @@ -511,11 +515,6 @@ where I: AsyncRead + AsyncWrite, } - pub fn close_and_shutdown(&mut self) -> Poll<(), io::Error> { - try_ready!(self.flush()); - self.shutdown() - } - pub fn shutdown(&mut self) -> Poll<(), io::Error> { match self.io.io_mut().shutdown() { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -549,6 +548,7 @@ where I: AsyncRead + AsyncWrite, // ==== tokio_proto impl ==== +#[cfg(feature = "tokio-proto")] impl Stream for Conn where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, @@ -567,6 +567,7 @@ where I: AsyncRead + AsyncWrite, } } +#[cfg(feature = "tokio-proto")] impl Sink for Conn where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, @@ -630,10 +631,12 @@ where I: AsyncRead + AsyncWrite, #[inline] fn close(&mut self) -> Poll<(), Self::SinkError> { - self.close_and_shutdown() + try_ready!(self.flush()); + self.shutdown() } } +#[cfg(feature = "tokio-proto")] impl Transport for Conn where I: AsyncRead + AsyncWrite + 'static, B: AsRef<[u8]> + 'static, @@ -838,8 +841,10 @@ impl State { // The DebugFrame and DebugChunk are simple Debug implementations that allow // us to dump the frame into logs, without logging the entirety of the bytes. +#[cfg(feature = "tokio-proto")] struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame, B, ::Error>); +#[cfg(feature = "tokio-proto")] impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self.0 { @@ -868,6 +873,8 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, } #[cfg(test)] +#[cfg(feature = "tokio-proto")] +//TODO: rewrite these using dispatch instead of tokio-proto API mod tests { use futures::{Async, Future, Stream, Sink}; use futures::future; diff --git a/src/proto/dispatch.rs b/src/proto/dispatch.rs index 88b16dd003..cc345eda44 100644 --- a/src/proto/dispatch.rs +++ b/src/proto/dispatch.rs @@ -108,6 +108,8 @@ where return Ok(Async::Ready(())); } } + } else if self.conn.can_write_continue() { + try_nb!(self.conn.flush()); } else if let Some(mut body) = self.body_tx.take() { let can_read_body = self.conn.can_read_body(); match body.poll_ready() { diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 15de1345a9..b165492855 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -13,7 +13,9 @@ use version::HttpVersion; use version::HttpVersion::{Http10, Http11}; pub use self::conn::{Conn, KeepAlive, KA}; -pub use self::body::{Body, TokioBody}; +pub use self::body::Body; +#[cfg(feature = "tokio-proto")] +pub use self::body::TokioBody; pub use self::chunk::Chunk; mod body; diff --git a/src/server/mod.rs b/src/server/mod.rs index b06f90a4f8..c4d6329cdc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -49,8 +49,7 @@ feat_server_proto! { pub use self::service::{const_service, service_fn}; -/// An instance of the HTTP protocol, and implementation of tokio-proto's -/// `ServerProto` trait. +/// A configuration of the HTTP protocol. /// /// This structure is used to create instances of `Server` or to spawn off tasks /// which handle a connection to an HTTP server. Each instance of `Http` can be @@ -74,7 +73,6 @@ where B: Stream, reactor: Core, listener: TcpListener, shutdown_timeout: Duration, - no_proto: bool, } /// A stream mapping incoming IOs to new services. @@ -127,24 +125,17 @@ where // ===== impl Http ===== -// This is wrapped in this macro because using `Http` as a `ServerProto` will -// never trigger a deprecation warning, so we have to annoy more people to -// protect some others. -feat_server_proto! { - impl + 'static> Http { - /// Creates a new instance of the HTTP protocol, ready to spawn a server or - /// start accepting connections. - pub fn new() -> Http { - Http { - keep_alive: true, - pipeline: false, - _marker: PhantomData, - } +impl + 'static> Http { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Http { + Http { + keep_alive: true, + pipeline: false, + _marker: PhantomData, } } -} -impl + 'static> Http { /// Enables or disables HTTP keep-alive. /// /// Default is true. @@ -187,7 +178,6 @@ impl + 'static> Http { listener: listener, protocol: self.clone(), shutdown_timeout: Duration::new(1, 0), - no_proto: false, }) } @@ -320,9 +310,9 @@ impl Server self } - /// Configure this server to not use tokio-proto infrastructure internally. + #[doc(hidden)] + #[deprecated(since="0.11.11", note="no_proto is always enabled")] pub fn no_proto(&mut self) -> &mut Self { - self.no_proto = true; self } @@ -350,7 +340,7 @@ impl Server pub fn run_until(self, shutdown_signal: F) -> ::Result<()> where F: Future, { - let Server { protocol, new_service, mut reactor, listener, shutdown_timeout, no_proto } = self; + let Server { protocol, new_service, mut reactor, listener, shutdown_timeout } = self; let handle = reactor.handle(); @@ -367,15 +357,10 @@ impl Server info: Rc::downgrade(&info), }; info.borrow_mut().active += 1; - if no_proto { - let fut = protocol.serve_connection(socket, s) - .map(|_| ()) - .map_err(|err| error!("no_proto error: {}", err)); - handle.spawn(fut); - } else { - #[allow(deprecated)] - protocol.bind_connection(&handle, socket, addr, s); - } + let fut = protocol.serve_connection(socket, s) + .map(|_| ()) + .map_err(move |err| error!("server connection error: ({}) {}", addr, err)); + handle.spawn(fut); Ok(()) }); diff --git a/tests/server.rs b/tests/server.rs index e043515900..b328b5acaf 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -669,7 +669,7 @@ fn disable_keep_alive_post_request() { } #[test] -fn no_proto_empty_parse_eof_does_not_return_error() { +fn empty_parse_eof_does_not_return_error() { let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -690,7 +690,7 @@ fn no_proto_empty_parse_eof_does_not_return_error() { } #[test] -fn no_proto_nonempty_parse_eof_returns_error() { +fn nonempty_parse_eof_returns_error() { let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -868,7 +868,6 @@ fn serve() -> Serve { struct ServeOptions { keep_alive_disabled: bool, - no_proto: bool, pipeline: bool, timeout: Option, } @@ -877,20 +876,12 @@ impl Default for ServeOptions { fn default() -> Self { ServeOptions { keep_alive_disabled: false, - no_proto: env("HYPER_NO_PROTO", "1"), pipeline: false, timeout: None, } } } -fn env(name: &str, val: &str) -> bool { - match ::std::env::var(name) { - Ok(var) => var == val, - Err(_) => false, - } -} - fn serve_with_options(options: ServeOptions) -> Serve { let _ = pretty_env_logger::init(); @@ -902,13 +893,12 @@ fn serve_with_options(options: ServeOptions) -> Serve { let addr = "127.0.0.1:0".parse().unwrap(); let keep_alive = !options.keep_alive_disabled; - let no_proto = !options.no_proto; let pipeline = options.pipeline; let dur = options.timeout; let thread_name = format!("test-server-{:?}", dur); let thread = thread::Builder::new().name(thread_name).spawn(move || { - let mut srv = Http::new() + let srv = Http::new() .keep_alive(keep_alive) .pipeline(pipeline) .bind(&addr, TestService { @@ -916,9 +906,6 @@ fn serve_with_options(options: ServeOptions) -> Serve { _timeout: dur, reply: reply_rx, }).unwrap(); - if no_proto { - srv.no_proto(); - } addr_tx.send(srv.local_addr().unwrap()).unwrap(); srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); }).unwrap();