From ced949cb6b798f25c2ffbdb3ebda6858c18393a7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 16 Oct 2018 12:42:24 -0700 Subject: [PATCH] feat(server): allow `!Send` Servers Until this commit, servers have required that `Service` and their `Future` to be `Send`, since the server needs to spawn some internal tasks to an executor, and by default, that is `tokio::spawn`, which could be spawning to a threadpool. This was true even if the user were certain there was no threadpool involved, and was instead using a different single-threaded runtime, like `tokio::runtime::current_thread`. This changes makes all the server pieces generic over an `E`, which is essentially `Executor>`. There's a new set of internal traits, `H2Exec` and `NewSvcExec`, which allow for the type signature to only show the generics that the user is providing. The traits cannot be implemented explicitly, but there are blanket implementations for `E: Executor`. If the user provides their own executor, it simply needs to have a generic `impl Executor for MyExec`. That impl can have bounds deciding whether to require `F: Send`. If the executor does require `Send`, and the `Service` futures are `!Send`, there will be compiler errors. To prevent a breaking change, all the types that gained the `E` generic have a default type set, which is the original `tokio::spawn` executor. --- Cargo.toml | 5 + examples/README.md | 2 + examples/single_threaded.rs | 51 +++++++ src/common/drain.rs | 1 + src/common/exec.rs | 75 ++++++++++- src/common/mod.rs | 2 +- src/proto/h2/mod.rs | 2 +- src/proto/h2/server.rs | 27 ++-- src/server/conn.rs | 257 ++++++++++++++++++++++++++---------- src/server/mod.rs | 63 +++++---- src/server/shutdown.rs | 74 +++++++---- 11 files changed, 426 insertions(+), 133 deletions(-) create mode 100644 examples/single_threaded.rs diff --git a/Cargo.toml b/Cargo.toml index cca9fc9d3e..48535f56d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,11 @@ name = "send_file" path = "examples/send_file.rs" required-features = ["runtime"] +[[example]] +name = "single_threaded" +path = "examples/single_threaded.rs" +required-features = ["runtime"] + [[example]] name = "state" path = "examples/state.rs" diff --git a/examples/README.md b/examples/README.md index e8e23b3555..6635d04adc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -21,6 +21,8 @@ parses it with serde and outputs the result. * [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously. +* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter). + * [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count. * [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling). diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs new file mode 100644 index 0000000000..d39158bc45 --- /dev/null +++ b/examples/single_threaded.rs @@ -0,0 +1,51 @@ +#![deny(warnings)] +extern crate futures; +extern crate hyper; +extern crate pretty_env_logger; +extern crate tokio; + +use std::cell::Cell; +use std::rc::Rc; + +use hyper::{Body, Response, Server}; +use hyper::service::service_fn_ok; +use hyper::rt::Future; +use tokio::runtime::current_thread; + +fn main() { + pretty_env_logger::init(); + + let addr = ([127, 0, 0, 1], 3000).into(); + + // Using a !Send request counter is fine on 1 thread... + let counter = Rc::new(Cell::new(0)); + + let new_service = move || { + // For each connection, clone the counter to use in our service... + let cnt = counter.clone(); + + service_fn_ok(move |_| { + let prev = cnt.get(); + cnt.set(prev + 1); + Response::new(Body::from(format!("Request count: {}", prev + 1))) + }) + }; + + // Since the Server needs to spawn some background tasks, we needed + // to configure an Executor that can spawn !Send futures... + let exec = current_thread::TaskExecutor::current(); + + let server = Server::bind(&addr) + .executor(exec) + .serve(new_service) + .map_err(|e| eprintln!("server error: {}", e)); + + println!("Listening on http://{}", addr); + + current_thread::Runtime::new() + .expect("rt new") + .spawn(server) + .run() + .expect("rt run"); +} + diff --git a/src/common/drain.rs b/src/common/drain.rs index 0222c2ec41..4f7a539a55 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -36,6 +36,7 @@ pub struct Watch { rx: Shared>, } +#[allow(missing_debug_implementations)] pub struct Watching { future: F, state: State, diff --git a/src/common/exec.rs b/src/common/exec.rs index 231939b4c7..a5ea66564d 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -3,14 +3,28 @@ use std::sync::Arc; use futures::future::{Executor, Future}; -/// Either the user provides an executor for background tasks, or we use -/// `tokio::spawn`. +use body::Payload; +use proto::h2::server::H2Stream; +use server::conn::spawn_all::{NewSvcTask, Watcher}; +use service::Service; + +pub trait H2Exec: Clone { + fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()>; +} + +pub trait NewSvcExec>: Clone { + fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()>; +} + +// Either the user provides an executor for background tasks, or we use +// `tokio::spawn`. #[derive(Clone)] -pub(crate) enum Exec { +pub enum Exec { Default, Executor(Arc + Send>> + Send + Sync>), } +// ===== impl Exec ===== impl Exec { pub(crate) fn execute(&self, fut: F) -> ::Result<()> @@ -52,3 +66,58 @@ impl fmt::Debug for Exec { .finish() } } + + +impl H2Exec for Exec +where + H2Stream: Future + Send + 'static, + B: Payload, +{ + fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()> { + self.execute(fut) + } +} + +impl NewSvcExec for Exec +where + NewSvcTask: Future + Send + 'static, + S: Service, + W: Watcher, +{ + fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()> { + self.execute(fut) + } +} + +// ==== impl Executor ===== + +impl H2Exec for E +where + E: Executor> + Clone, + H2Stream: Future, + B: Payload, +{ + fn execute_h2stream(&self, fut: H2Stream) -> ::Result<()> { + self.execute(fut) + .map_err(|err| { + warn!("executor error: {:?}", err.kind()); + ::Error::new_execute() + }) + } +} + +impl NewSvcExec for E +where + E: Executor> + Clone, + NewSvcTask: Future, + S: Service, + W: Watcher, +{ + fn execute_new_svc(&self, fut: NewSvcTask) -> ::Result<()> { + self.execute(fut) + .map_err(|err| { + warn!("executor error: {:?}", err.kind()); + ::Error::new_execute() + }) + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 7e2a170d2b..400836ad3f 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,6 @@ mod buf; pub(crate) mod drain; -mod exec; +pub(crate) mod exec; pub(crate) mod io; mod lazy; mod never; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 6620877210..a1863b2377 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -10,7 +10,7 @@ use http::HeaderMap; use body::Payload; mod client; -mod server; +pub(crate) mod server; pub(crate) use self::client::Client; pub(crate) use self::server::Server; diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 0dc8e7a866..fdd2727252 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,7 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use ::headers::content_length_parse_all; use ::body::Payload; -use ::common::Exec; +use ::common::exec::H2Exec; use ::headers; use ::service::Service; use ::proto::Dispatched; @@ -13,12 +13,12 @@ use super::{PipeToSendStream, SendBuf}; use ::{Body, Response}; -pub(crate) struct Server +pub(crate) struct Server where S: Service, B: Payload, { - exec: Exec, + exec: E, service: S, state: State, } @@ -40,15 +40,16 @@ where } -impl Server +impl Server where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, - S::Future: Send + 'static, + //S::Future: Send + 'static, B: Payload, + E: H2Exec, { - pub(crate) fn new(io: T, service: S, exec: Exec) -> Server { + pub(crate) fn new(io: T, service: S, exec: E) -> Server { let handshake = Builder::new() .handshake(io); Server { @@ -76,13 +77,14 @@ where } } -impl Future for Server +impl Future for Server where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, - S::Future: Send + 'static, + //S::Future: Send + 'static, B: Payload, + E: H2Exec, { type Item = Dispatched; type Error = ::Error; @@ -116,14 +118,14 @@ where T: AsyncRead + AsyncWrite, B: Payload, { - fn poll_server(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error> + fn poll_server(&mut self, service: &mut S, exec: &E) -> Poll<(), ::Error> where S: Service< ReqBody=Body, ResBody=B, >, S::Error: Into>, - S::Future: Send + 'static, + E: H2Exec, { while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { trace!("incoming request"); @@ -132,7 +134,7 @@ where ::Body::h2(stream, content_length) }); let fut = H2Stream::new(service.call(req), respond); - exec.execute(fut)?; + exec.execute_h2stream(fut)?; } // no more incoming streams... @@ -141,7 +143,8 @@ where } } -struct H2Stream +#[allow(missing_debug_implementations)] +pub struct H2Stream where B: Payload, { diff --git a/src/server/conn.rs b/src/server/conn.rs index 551dccb969..2198aa5a31 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -21,14 +21,17 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use tokio_reactor::Handle; use body::{Body, Payload}; -use common::Exec; +use common::exec::{Exec, H2Exec, NewSvcExec}; use common::io::Rewind; use error::{Kind, Parse}; use proto; use service::{NewService, Service}; use upgrade::Upgraded; -use self::upgrades::UpgradeableConnection; +pub(super) use self::spawn_all::NoopWatcher; +use self::spawn_all::NewSvcTask; +pub(super) use self::spawn_all::Watcher; +pub(super) use self::upgrades::UpgradeableConnection; #[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming; @@ -39,8 +42,8 @@ use self::upgrades::UpgradeableConnection; /// If you don't have need to manage connections yourself, consider using the /// higher-level [Server](super) API. #[derive(Clone, Debug)] -pub struct Http { - exec: Exec, +pub struct Http { + exec: E, h1_writev: bool, mode: ConnectionMode, keep_alive: bool, @@ -64,10 +67,10 @@ enum ConnectionMode { /// Yields `Connecting`s that are futures that should be put on a reactor. #[must_use = "streams do nothing unless polled"] #[derive(Debug)] -pub struct Serve { +pub struct Serve { incoming: I, new_service: S, - protocol: Http, + protocol: Http, } /// A future building a new `Service` to a `Connection`. @@ -76,23 +79,23 @@ pub struct Serve { /// a `Connection`. #[must_use = "futures do nothing unless polled"] #[derive(Debug)] -pub struct Connecting { +pub struct Connecting { future: F, io: Option, - protocol: Http, + protocol: Http, } #[must_use = "futures do nothing unless polled"] #[derive(Debug)] -pub(super) struct SpawnAll { - pub(super) serve: Serve, +pub(super) struct SpawnAll { + pub(super) serve: Serve, } /// A future binding a connection with a Service. /// /// Polling this future will drive HTTP forward. #[must_use = "futures do nothing unless polled"] -pub struct Connection +pub struct Connection where S: Service, { @@ -108,18 +111,19 @@ where Rewind, S, S::ResBody, + E, >, >>, - fallback: Fallback, + fallback: Fallback, } #[derive(Clone, Debug)] -enum Fallback { - ToHttp2(Exec), +enum Fallback { + ToHttp2(E), Http1Only, } -impl Fallback { +impl Fallback { fn to_h2(&self) -> bool { match *self { Fallback::ToHttp2(_) => true, @@ -166,6 +170,18 @@ impl Http { } } + #[doc(hidden)] + #[deprecated(note = "use Http::with_executor instead")] + pub fn executor(&mut self, exec: E) -> &mut Self + where + E: Executor + Send>> + Send + Sync + 'static + { + self.exec = Exec::Executor(Arc::new(exec)); + self + } +} + +impl Http { /// Sets whether HTTP1 is required. /// /// Default is false @@ -241,12 +257,15 @@ impl Http { /// Set the executor used to spawn background tasks. /// /// Default uses implicit default (like `tokio::spawn`). - pub fn executor(&mut self, exec: E) -> &mut Self - where - E: Executor + Send>> + Send + Sync + 'static - { - self.exec = Exec::Executor(Arc::new(exec)); - self + pub fn with_executor(self, exec: E2) -> Http { + Http { + exec, + h1_writev: self.h1_writev, + mode: self.mode, + keep_alive: self.keep_alive, + max_buf_size: self.max_buf_size, + pipeline_flush: self.pipeline_flush, + } } /// Bind a connection together with a [`Service`](::service::Service). @@ -285,13 +304,14 @@ impl Http { /// # } /// # fn main() {} /// ``` - pub fn serve_connection(&self, io: I, service: S) -> Connection + pub fn serve_connection(&self, io: I, service: S) -> Connection where S: Service, S::Error: Into>, - S::Future: Send + 'static, + //S::Future: Send + 'static, Bd: Payload, I: AsyncRead + AsyncWrite, + E: H2Exec//Box + Send>>, { let either = match self.mode { ConnectionMode::H1Only | ConnectionMode::Fallback => { @@ -333,11 +353,12 @@ impl Http { /// `new_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> + pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> where S: NewService, S::Error: Into>, Bd: Payload, + E: H2Exec<::Future, Bd>, { let mut incoming = AddrIncoming::new(addr, None)?; if self.keep_alive { @@ -353,11 +374,12 @@ impl Http { /// `new_service` object provided, creating a new service per /// connection. #[cfg(feature = "runtime")] - pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where S: NewService, S::Error: Into>, Bd: Payload, + E: H2Exec<::Future, Bd>, { let mut incoming = AddrIncoming::new(addr, Some(handle))?; if self.keep_alive { @@ -367,7 +389,7 @@ impl Http { } /// Bind the provided stream of incoming IO objects with a `NewService`. - pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve + pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve where I: Stream, I::Error: Into>, @@ -375,6 +397,7 @@ impl Http { S: NewService, S::Error: Into>, Bd: Payload, + E: H2Exec<::Future, Bd>, { Serve { incoming: incoming, @@ -387,13 +410,13 @@ impl Http { // ===== impl Connection ===== -impl Connection +impl Connection where - S: Service + 'static, + S: Service, S::Error: Into>, - S::Future: Send, - I: AsyncRead + AsyncWrite + 'static, + I: AsyncRead + AsyncWrite, B: Payload + 'static, + E: H2Exec, { /// Start a graceful shutdown process for this connection. /// @@ -497,7 +520,7 @@ where /// Enable this connection to support higher-level HTTP upgrades. /// /// See [the `upgrade` module](::upgrade) for more. - pub fn with_upgrades(self) -> UpgradeableConnection + pub fn with_upgrades(self) -> UpgradeableConnection where I: Send, { @@ -507,13 +530,13 @@ where } } -impl Future for Connection +impl Future for Connection where S: Service + 'static, S::Error: Into>, - S::Future: Send, I: AsyncRead + AsyncWrite + 'static, B: Payload + 'static, + E: H2Exec, { type Item = (); type Error = ::Error; @@ -556,9 +579,9 @@ where } // ===== impl Serve ===== -impl Serve { +impl Serve { /// Spawn all incoming connections onto the executor in `Http`. - pub(super) fn spawn_all(self) -> SpawnAll { + pub(super) fn spawn_all(self) -> SpawnAll { SpawnAll { serve: self, } @@ -577,17 +600,17 @@ impl Serve { } } -impl Stream for Serve +impl Stream for Serve where I: Stream, I::Item: AsyncRead + AsyncWrite, I::Error: Into>, S: NewService, S::Error: Into>, - ::Future: Send + 'static, B: Payload, + E: H2Exec<::Future, B>, { - type Item = Connecting; + type Item = Connecting; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -606,15 +629,15 @@ where // ===== impl Connecting ===== -impl Future for Connecting +impl Future for Connecting where I: AsyncRead + AsyncWrite, F: Future, S: Service, - S::Future: Send + 'static, B: Payload, + E: H2Exec, { - type Item = Connection; + type Item = Connection; type Error = F::Error; fn poll(&mut self) -> Poll { @@ -627,45 +650,37 @@ where // ===== impl SpawnAll ===== #[cfg(feature = "runtime")] -impl SpawnAll { +impl SpawnAll { pub(super) fn local_addr(&self) -> SocketAddr { self.serve.incoming.local_addr() } } -impl SpawnAll { +impl SpawnAll { pub(super) fn incoming_ref(&self) -> &I { self.serve.incoming_ref() } } -impl SpawnAll +impl SpawnAll where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService + Send + 'static, + S: NewService, S::Error: Into>, - S::Service: Send, - S::Future: Send + 'static, - ::Future: Send + 'static, B: Payload, + E: H2Exec<::Future, B>, { - pub(super) fn poll_with(&mut self, per_connection: F1) -> Poll<(), ::Error> + pub(super) fn poll_watch(&mut self, watcher: &W) -> Poll<(), ::Error> where - F1: Fn() -> F2, - F2: FnOnce(UpgradeableConnection) -> R + Send + 'static, - R: Future + Send + 'static, + E: NewSvcExec, + W: Watcher, { loop { if let Some(connecting) = try_ready!(self.serve.poll()) { - let and_then = per_connection(); - let fut = connecting - .map_err(::Error::new_user_new_service) - // flatten basically - .and_then(|conn| and_then(conn.with_upgrades())) - .map_err(|err| debug!("conn error: {}", err)); - self.serve.protocol.exec.execute(fut)?; + let fut = NewSvcTask::new(connecting, watcher.clone()); + self.serve.protocol.exec.execute_new_svc(fut)?; } else { return Ok(Async::Ready(())) } @@ -673,6 +688,114 @@ where } } +pub(crate) mod spawn_all { + use futures::{Future, Poll}; + use tokio_io::{AsyncRead, AsyncWrite}; + + use body::{Body, Payload}; + use common::exec::H2Exec; + use service::Service; + use super::{Connecting, UpgradeableConnection}; + + // Used by `SpawnAll` to optionally watch a `Connection` future. + // + // The regular `hyper::Server` just uses a `NoopWatcher`, which does + // not need to watch anything, and so returns the `Connection` untouched. + // + // The `Server::with_graceful_shutdown` needs to keep track of all active + // connections, and signal that they start to shutdown when prompted, so + // it has a `GracefulWatcher` implementation to do that. + pub trait Watcher: Clone { + type Future: Future; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future; + } + + #[allow(missing_debug_implementations)] + #[derive(Copy, Clone)] + pub struct NoopWatcher; + + impl Watcher for NoopWatcher + where + I: AsyncRead + AsyncWrite + Send + 'static, + S: Service + 'static, + E: H2Exec, + { + type Future = UpgradeableConnection; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future { + conn + } + } + + // This is a `Future` spawned to an `Executor` inside + // the `SpawnAll`. By being a nameable type, we can be generic over the + // user's `Service::Future`, and thus an `Executor` can execute it. + // + // Doing this allows for the server to conditionally require `Send` futures, + // depending on the `Executor` configured. + // + // Users cannot import this type, nor the associated `NewSvcExec`. Instead, + // a blanket implementation for `Executor` is sufficient. + #[allow(missing_debug_implementations)] + pub struct NewSvcTask> { + state: State, + } + + enum State> { + Connecting(Connecting, W), + Connected(W::Future), + } + + impl> NewSvcTask { + pub(super) fn new(connecting: Connecting, watcher: W) -> Self { + NewSvcTask { + state: State::Connecting(connecting, watcher), + } + } + } + + impl Future for NewSvcTask + where + I: AsyncRead + AsyncWrite + Send + 'static, + N: Future, + N::Error: Into>, + S: Service, + B: Payload, + E: H2Exec, + W: Watcher, + { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + State::Connecting(ref mut connecting, ref watcher) => { + let conn = try_ready!(connecting + .poll() + .map_err(|err| { + let err = ::Error::new_user_new_service(err); + debug!("connection error: {}", err); + })); + let connected = watcher.watch(conn.with_upgrades()); + State::Connected(connected) + }, + State::Connected(ref mut future) => { + return future + .poll() + .map_err(|err| { + debug!("connection error: {}", err); + }); + } + }; + + self.state = next; + } + } + } +} + mod upgrades { use super::*; @@ -682,20 +805,20 @@ mod upgrades { // `impl Future`, without requiring Rust 1.26. #[must_use = "futures do nothing unless polled"] #[allow(missing_debug_implementations)] - pub struct UpgradeableConnection + pub struct UpgradeableConnection where S: Service, { - pub(super) inner: Connection, + pub(super) inner: Connection, } - impl UpgradeableConnection + impl UpgradeableConnection where - S: Service + 'static, + S: Service,// + 'static, S::Error: Into>, - S::Future: Send, - I: AsyncRead + AsyncWrite + Send + 'static, + I: AsyncRead + AsyncWrite, B: Payload + 'static, + E: H2Exec, { /// Start a graceful shutdown process for this connection. /// @@ -706,13 +829,13 @@ mod upgrades { } } - impl Future for UpgradeableConnection + impl Future for UpgradeableConnection where S: Service + 'static, S::Error: Into>, - S::Future: Send, I: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, + E: super::H2Exec, { type Item = (); type Error = ::Error; diff --git a/src/server/mod.rs b/src/server/mod.rs index ddb80c0a1c..a02afc4557 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -64,11 +64,12 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use tokio_reactor; use body::{Body, Payload}; +use common::exec::{Exec, H2Exec, NewSvcExec}; use service::{NewService, Service}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... -use self::conn::{Http as Http_, SpawnAll}; -use self::shutdown::Graceful; +use self::conn::{Http as Http_, NoopWatcher, SpawnAll}; +use self::shutdown::{Graceful, GracefulWatcher}; #[cfg(feature = "runtime")] use self::tcp::AddrIncoming; /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. @@ -77,15 +78,15 @@ use self::shutdown::Graceful; /// handlers. It is built using the [`Builder`](Builder), and the future /// completes when the server has been shutdown. It should be run by an /// `Executor`. -pub struct Server { - spawn_all: SpawnAll, +pub struct Server { + spawn_all: SpawnAll, } /// A builder for a [`Server`](Server). #[derive(Debug)] -pub struct Builder { +pub struct Builder { incoming: I, - protocol: Http_, + protocol: Http_, } // ===== impl Server ===== @@ -138,17 +139,17 @@ impl Server { } } -impl Server +impl Server where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService + Send + 'static, + S: NewService, S::Error: Into>, - S::Service: Send, - S::Future: Send + 'static, - ::Future: Send + 'static, + S::Service: 'static, B: Payload, + E: H2Exec<::Future, B>, + E: NewSvcExec, { /// Prepares a server to handle graceful shutdown when the provided future /// completes. @@ -189,7 +190,7 @@ where /// let _ = tx.send(()); /// # } /// ``` - pub fn with_graceful_shutdown(self, signal: F) -> Graceful + pub fn with_graceful_shutdown(self, signal: F) -> Graceful where F: Future { @@ -197,23 +198,23 @@ where } } -impl Future for Server +impl Future for Server where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService + Send + 'static, + S: NewService, S::Error: Into>, - S::Service: Send, - S::Future: Send + 'static, - ::Future: Send + 'static, + S::Service: 'static, B: Payload, + E: H2Exec<::Future, B>, + E: NewSvcExec, { type Item = (); type Error = ::Error; fn poll(&mut self) -> Poll { - self.spawn_all.poll_with(|| |conn| conn) + self.spawn_all.poll_watch(&NoopWatcher) } } @@ -227,11 +228,11 @@ impl fmt::Debug for Server { // ===== impl Builder ===== -impl Builder { +impl Builder { /// Start a new builder, wrapping an incoming stream and low-level options. /// /// For a more convenient constructor, see [`Server::bind`](Server::bind). - pub fn new(incoming: I, protocol: Http_) -> Self { + pub fn new(incoming: I, protocol: Http_) -> Self { Builder { incoming, protocol, @@ -287,6 +288,16 @@ impl Builder { self } + /// Sets the `Executor` to deal with connection tasks. + /// + /// Default is `tokio::spawn`. + pub fn executor(self, executor: E2) -> Builder { + Builder { + incoming: self.incoming, + protocol: self.protocol.with_executor(executor), + } + } + /// Consume this `Builder`, creating a [`Server`](Server). /// /// # Example @@ -316,16 +327,17 @@ impl Builder { /// // Finally, spawn `server` onto an Executor... /// # } /// ``` - pub fn serve(self, new_service: S) -> Server + pub fn serve(self, new_service: S) -> Server where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService + Send + 'static, + S: NewService, S::Error: Into>, - S::Service: Send, - ::Future: Send + 'static, + S::Service: 'static, B: Payload, + E: NewSvcExec, + E: H2Exec<::Future, B>, { let serve = self.protocol.serve_incoming(self.incoming, new_service); let spawn_all = serve.spawn_all(); @@ -336,7 +348,7 @@ impl Builder { } #[cfg(feature = "runtime")] -impl Builder { +impl Builder { /// Set whether TCP keepalive messages are enabled on accepted connections. /// /// If `None` is specified, keepalive is disabled, otherwise the duration @@ -353,3 +365,4 @@ impl Builder { self } } + diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 1240f41f63..6d6c4db3a6 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -2,26 +2,27 @@ use futures::{Async, Future, Stream, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; -use common::drain::{self, Draining, Signal, Watch}; +use common::drain::{self, Draining, Signal, Watch, Watching}; +use common::exec::{H2Exec, NewSvcExec}; use service::{Service, NewService}; -use super::SpawnAll; +use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; #[allow(missing_debug_implementations)] -pub struct Graceful { - state: State, +pub struct Graceful { + state: State, } -enum State { +enum State { Running { drain: Option<(Signal, Watch)>, - spawn_all: SpawnAll, + spawn_all: SpawnAll, signal: F, }, Draining(Draining), } -impl Graceful { - pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { +impl Graceful { + pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { @@ -34,18 +35,18 @@ impl Graceful { } -impl Future for Graceful +impl Future for Graceful where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService + Send + 'static, + S: NewService, + S::Service: 'static, S::Error: Into>, - S::Service: Send, - S::Future: Send + 'static, - ::Future: Send + 'static, B: Payload, F: Future, + E: H2Exec<::Future, B>, + E: NewSvcExec, { type Item = (); type Error = ::Error; @@ -67,19 +68,12 @@ where State::Draining(sig.drain()) }, Ok(Async::NotReady) => { - let watch = &drain + let watch = drain .as_ref() .expect("drain channel") - .1; - return spawn_all.poll_with(|| { - let watch = watch.clone(); - move |conn| { - watch.watch(conn, |conn| { - // on_drain, start conn graceful shutdown - conn.graceful_shutdown() - }) - } - }); + .1 + .clone(); + return spawn_all.poll_watch(&GracefulWatcher(watch)); }, }, State::Draining(ref mut draining) => { @@ -91,3 +85,35 @@ where } } } + +#[allow(missing_debug_implementations)] +#[derive(Clone)] +pub struct GracefulWatcher(Watch); + +impl Watcher for GracefulWatcher +where + I: AsyncRead + AsyncWrite + Send + 'static, + S: Service + 'static, + E: H2Exec, +{ + type Future = Watching, fn(&mut UpgradeableConnection)>; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future { + self + .0 + .clone() + .watch(conn, on_drain) + } +} + +fn on_drain(conn: &mut UpgradeableConnection) +where + S: Service, + S::Error: Into>, + I: AsyncRead + AsyncWrite, + S::ResBody: Payload + 'static, + E: H2Exec, +{ + conn.graceful_shutdown() +} +