Skip to content

Commit

Permalink
feat(server): add Server::with_graceful_shutdown method
Browse files Browse the repository at this point in the history
This adds a "combinator" method to `Server`, which accepts a user's
future to "select" on. All connections received by the `Server` will
be tracked, and if the user's future finishes, graceful shutdown will
begin.

- The listener will be closed immediately.
- The currently active connections will all be notified to start a
  graceful shutdown. For HTTP/1, that means finishing the existing
  response and using `connection: clone`. For HTTP/2, the graceful
  `GOAWAY` process is started.
- Once all active connections have terminated, the graceful future
  will return.

Closes #1575
  • Loading branch information
seanmonstar authored Aug 23, 2018
1 parent a3c44de commit 168c7d2
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 8 deletions.
115 changes: 115 additions & 0 deletions src/common/drain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::mem;

use futures::{Async, Future, Poll, Stream};
use futures::future::Shared;
use futures::sync::{mpsc, oneshot};

use super::Never;

pub fn channel() -> (Signal, Watch) {
let (tx, rx) = oneshot::channel();
let (drained_tx, drained_rx) = mpsc::channel(0);
(
Signal {
drained_rx,
tx,
},
Watch {
drained_tx,
rx: rx.shared(),
},
)
}

pub struct Signal {
drained_rx: mpsc::Receiver<Never>,
tx: oneshot::Sender<()>,
}

pub struct Draining {
drained_rx: mpsc::Receiver<Never>,
}

#[derive(Clone)]
pub struct Watch {
drained_tx: mpsc::Sender<Never>,
rx: Shared<oneshot::Receiver<()>>,
}

pub struct Watching<F, FN> {
future: F,
state: State<FN>,
watch: Watch,
}

enum State<F> {
Watch(F),
Draining,
}

impl Signal {
pub fn drain(self) -> Draining {
let _ = self.tx.send(());
Draining {
drained_rx: self.drained_rx,
}
}
}

impl Future for Draining {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.drained_rx.poll()) {
Some(never) => match never {},
None => Ok(Async::Ready(())),
}
}
}

impl Watch {
pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
where
F: Future,
FN: FnOnce(&mut F),
{
Watching {
future,
state: State::Watch(on_drain),
watch: self,
}
}
}

impl<F, FN> Future for Watching<F, FN>
where
F: Future,
FN: FnOnce(&mut F),
{
type Item = F::Item;
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Draining) {
State::Watch(on_drain) => {
match self.watch.rx.poll() {
Ok(Async::Ready(_)) | Err(_) => {
// Drain has been triggered!
on_drain(&mut self.future);
},
Ok(Async::NotReady) => {
self.state = State::Watch(on_drain);
return self.future.poll();
},
}
},
State::Draining => {
return self.future.poll();
},
}
}
}
}

1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod buf;
pub(crate) mod drain;
mod exec;
pub(crate) mod io;
mod lazy;
Expand Down
17 changes: 10 additions & 7 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct Connecting<I, F> {
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S> {
serve: Serve<I, S>,
pub(super) serve: Serve<I, S>,
}

/// A future binding a connection with a Service.
Expand Down Expand Up @@ -618,7 +618,7 @@ impl<I, S> SpawnAll<I, S> {
}
}

impl<I, S, B> Future for SpawnAll<I, S>
impl<I, S, B> SpawnAll<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
Expand All @@ -630,16 +630,19 @@ where
<S::Service as Service>::Future: Send + 'static,
B: Payload,
{
type Item = ();
type Error = ::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
pub(super) fn poll_with<F1, F2, R>(&mut self, per_connection: F1) -> Poll<(), ::Error>
where
F1: Fn() -> F2,
F2: FnOnce(UpgradeableConnection<I::Item, S::Service>) -> R + Send + 'static,
R: Future<Item=(), Error=::Error> + Send + 'static,
{
loop {
if let Some(connecting) = try_ready!(self.serve.poll()) {
let and_then = per_connection();
let fut = connecting
.map_err(::Error::new_user_new_service)
// flatten basically
.and_then(|conn| conn.with_upgrades())
.and_then(|conn| and_then(conn.with_upgrades()))
.map_err(|err| debug!("conn error: {}", err));
self.serve.protocol.exec.execute(fut)?;
} else {
Expand Down
63 changes: 62 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
//! ```
pub mod conn;
mod shutdown;
#[cfg(feature = "runtime")] mod tcp;

use std::fmt;
Expand All @@ -67,6 +68,7 @@ use service::{NewService, Service};
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
// error that `hyper::server::Http` is private...
use self::conn::{Http as Http_, SpawnAll};
use self::shutdown::Graceful;
#[cfg(feature = "runtime")] use self::tcp::AddrIncoming;

/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
Expand Down Expand Up @@ -136,6 +138,65 @@ impl<S> Server<AddrIncoming, S> {
}
}

impl<I, S, B> Server<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B> + Send + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: Send,
S::Future: Send + 'static,
<S::Service as Service>::Future: Send + 'static,
B: Payload,
{
/// Prepares a server to handle graceful shutdown when the provided future
/// completes.
///
/// # Example
///
/// ```
/// # extern crate hyper;
/// # extern crate futures;
/// # use futures::Future;
/// # fn main() {}
/// # #[cfg(feature = "runtime")]
/// # fn run() {
/// # use hyper::{Body, Response, Server};
/// # use hyper::service::service_fn_ok;
/// # let new_service = || {
/// # service_fn_ok(|_req| {
/// # Response::new(Body::from("Hello World"))
/// # })
/// # };
///
/// // Make a server from the previous examples...
/// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
/// .serve(new_service);
///
/// // Prepare some signal for when the server should start
/// // shutting down...
/// let (tx, rx) = futures::sync::oneshot::channel::<()>();
///
/// let graceful = server
/// .with_graceful_shutdown(rx)
/// .map_err(|err| eprintln!("server error: {}", err));
///
/// // Spawn `server` onto an Executor...
/// hyper::rt::spawn(graceful);
///
/// // And later, trigger the signal by calling `tx.send(())`.
/// let _ = tx.send(());
/// # }
/// ```
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F>
where
F: Future<Item=()>
{
Graceful::new(self.spawn_all, signal)
}
}

impl<I, S, B> Future for Server<I, S>
where
I: Stream,
Expand All @@ -152,7 +213,7 @@ where
type Error = ::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.spawn_all.poll()
self.spawn_all.poll_with(|| |conn| conn)
}
}

Expand Down
93 changes: 93 additions & 0 deletions src/server/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use futures::{Async, Future, Stream, Poll};
use tokio_io::{AsyncRead, AsyncWrite};

use body::{Body, Payload};
use common::drain::{self, Draining, Signal, Watch};
use service::{Service, NewService};
use super::SpawnAll;

#[allow(missing_debug_implementations)]
pub struct Graceful<I, S, F> {
state: State<I, S, F>,
}

enum State<I, S, F> {
Running {
drain: Option<(Signal, Watch)>,
spawn_all: SpawnAll<I, S>,
signal: F,
},
Draining(Draining),
}

impl<I, S, F> Graceful<I, S, F> {
pub(super) fn new(spawn_all: SpawnAll<I, S>, signal: F) -> Self {
let drain = Some(drain::channel());
Graceful {
state: State::Running {
drain,
spawn_all,
signal,
},
}
}
}


impl<I, S, B, F> Future for Graceful<I, S, F>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B> + Send + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: Send,
S::Future: Send + 'static,
<S::Service as Service>::Future: Send + 'static,
B: Payload,
F: Future<Item=()>,
{
type Item = ();
type Error = ::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Running {
ref mut drain,
ref mut spawn_all,
ref mut signal,
} => match signal.poll() {
Ok(Async::Ready(())) | Err(_) => {
debug!("signal received, starting graceful shutdown");
let sig = drain
.take()
.expect("drain channel")
.0;
State::Draining(sig.drain())
},
Ok(Async::NotReady) => {
let watch = &drain
.as_ref()
.expect("drain channel")
.1;
return spawn_all.poll_with(|| {
let watch = watch.clone();
move |conn| {
watch.watch(conn, |conn| {
// on_drain, start conn graceful shutdown
conn.graceful_shutdown()
})
}
});
},
},
State::Draining(ref mut draining) => {
return draining.poll()
.map_err(|()| unreachable!("drain mpsc rx never errors"));
}
};
self.state = next;
}
}
}

0 comments on commit 168c7d2

Please sign in to comment.