From 482a5f589ea2bdb798f01645653975089f40ef44 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 18 Jun 2018 16:01:01 -0700 Subject: [PATCH] fix(lib): return an error instead of panic if execute fails If executing an internal task fails, a new variant of `hyper::Error` is returned to the user, with improved messaging. If a non-critical task fails to spawn, it no longer panics, instead just logging a warning. Closes #1566 --- src/client/mod.rs | 41 +++++++++++++++++++++++++++-------------- src/client/pool.rs | 12 ++++++++---- src/common/exec.rs | 17 ++++++++++++----- src/error.rs | 13 +++++++++++-- src/proto/h2/client.rs | 6 +++--- src/proto/h2/server.rs | 2 +- src/server/conn.rs | 2 +- 7 files changed, 63 insertions(+), 30 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index d822ec9f77..1542c99a48 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -270,13 +270,20 @@ where C: Connect + Sync + 'static, .http2_only(pool_key.1 == Ver::Http2) .handshake(io) .and_then(move |(tx, conn)| { - executor.execute(conn.map_err(|e| { + let bg = executor.execute(conn.map_err(|e| { debug!("client connection error: {}", e) })); + // This task is critical, so an execute error + // should be returned. + if let Err(err) = bg { + warn!("error spawning critical client task: {}", err); + return Either::A(future::err(err)); + } + // Wait for 'conn' to ready up before we // declare this tx as usable - tx.when_ready() + Either::B(tx.when_ready()) }) .map(move |tx| { pool.pooled(connecting, PoolClient { @@ -373,26 +380,32 @@ where C: Connect + Sync + 'static, } else if !res.body().is_end_stream() { let (delayed_tx, delayed_rx) = oneshot::channel(); res.body_mut().delayed_eof(delayed_rx); - executor.execute( - future::poll_fn(move || { - pooled.poll_ready() - }) + let on_idle = future::poll_fn(move || { + pooled.poll_ready() + }) .then(move |_| { // At this point, `pooled` is dropped, and had a chance // to insert into the pool (if conn was idle) drop(delayed_tx); Ok(()) - }) - ); + }); + + if let Err(err) = executor.execute(on_idle) { + // This task isn't critical, so just log and ignore. + warn!("error spawning task to insert idle connection: {}", err); + } } else { // There's no body to delay, but the connection isn't // ready yet. Only re-insert when it's ready - executor.execute( - future::poll_fn(move || { - pooled.poll_ready() - }) - .then(|_| Ok(())) - ); + let on_idle = future::poll_fn(move || { + pooled.poll_ready() + }) + .then(|_| Ok(())); + + if let Err(err) = executor.execute(on_idle) { + // This task isn't critical, so just log and ignore. + warn!("error spawning task to insert idle connection: {}", err); + } } Ok(res) }); diff --git a/src/client/pool.rs b/src/client/pool.rs index 74eb7d6361..11277fd56c 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -407,12 +407,16 @@ impl Connections { let start = Instant::now() + dur; - let interval = Interval::new(start, dur); - self.exec.execute(IdleInterval { - interval: interval, + let interval = IdleInterval { + interval: Interval::new(start, dur), pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx, - }); + }; + + if let Err(err) = self.exec.execute(interval) { + // This task isn't critical, so simply log and ignore. + warn!("error spawning connection pool idle interval: {}", err); + } } } diff --git a/src/common/exec.rs b/src/common/exec.rs index 78cb023386..231939b4c7 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -13,7 +13,7 @@ pub(crate) enum Exec { impl Exec { - pub(crate) fn execute(&self, fut: F) + pub(crate) fn execute(&self, fut: F) -> ::Result<()> where F: Future + Send + 'static, { @@ -21,7 +21,13 @@ impl Exec { Exec::Default => { #[cfg(feature = "runtime")] { - ::tokio_executor::spawn(fut) + use ::tokio_executor::Executor; + ::tokio_executor::DefaultExecutor::current() + .spawn(Box::new(fut)) + .map_err(|err| { + warn!("executor error: {:?}", err); + ::Error::new_execute() + }) } #[cfg(not(feature = "runtime"))] { @@ -30,10 +36,11 @@ impl Exec { } }, Exec::Executor(ref e) => { - let _ = e.execute(Box::new(fut)) + e.execute(Box::new(fut)) .map_err(|err| { - panic!("executor error: {:?}", err.kind()); - }); + warn!("executor error: {:?}", err.kind()); + ::Error::new_execute() + }) }, } } diff --git a/src/error.rs b/src/error.rs index 45feec53d7..90e0a43214 100644 --- a/src/error.rs +++ b/src/error.rs @@ -67,6 +67,9 @@ pub(crate) enum Kind { /// User polled for an upgrade, but low-level API is not using upgrades. ManualUpgrade, + + /// Error trying to call `Executor::execute`. + Execute, } #[derive(Debug, PartialEq)] @@ -114,7 +117,8 @@ impl Error { Kind::Closed | Kind::UnsupportedVersion | Kind::UnsupportedRequestMethod | - Kind::NoUpgrade => true, + Kind::NoUpgrade | + Kind::Execute => true, _ => false, } } @@ -130,7 +134,7 @@ impl Error { } /// Returns the error's cause. - /// + /// /// This is identical to `Error::cause` except that it provides extra /// bounds required to be able to downcast the error. pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> { @@ -244,6 +248,10 @@ impl Error { Error::new(Kind::Shutdown, Some(Box::new(cause))) } + pub(crate) fn new_execute() -> Error { + Error::new(Kind::Execute, None) + } + pub(crate) fn new_h2(cause: ::h2::Error) -> Error { Error::new(Kind::Http2, Some(Box::new(cause))) } @@ -297,6 +305,7 @@ impl StdError for Error { Kind::UnsupportedRequestMethod => "request has unsupported HTTP method", Kind::NoUpgrade => "no upgrade available", Kind::ManualUpgrade => "upgrade expected but low level API in use", + Kind::Execute => "executor failed to spawn task", Kind::Io => "an IO error occurred", } diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index a445eb4124..1570d2ee0a 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -94,7 +94,7 @@ where } Err(Either::B((never, _))) => match never {}, }); - self.executor.execute(fut); + self.executor.execute(fut)?; State::Ready(request_tx, tx) }, State::Ready(ref mut tx, ref conn_dropper) => { @@ -129,7 +129,7 @@ where drop(conn_drop_ref); x }); - self.executor.execute(pipe); + self.executor.execute(pipe)?; } let fut = fut @@ -148,7 +148,7 @@ where } Ok(()) }); - self.executor.execute(fut); + self.executor.execute(fut)?; continue; }, diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index ad5e2d2e6c..6777bfc109 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -132,7 +132,7 @@ where ::Body::h2(stream, content_length) }); let fut = H2Stream::new(service.call(req), respond); - exec.execute(fut); + exec.execute(fut)?; } // no more incoming streams... diff --git a/src/server/conn.rs b/src/server/conn.rs index 5a4d24bee7..fda17ab1f9 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -641,7 +641,7 @@ where // flatten basically .and_then(|conn| conn.with_upgrades()) .map_err(|err| debug!("conn error: {}", err)); - self.serve.protocol.exec.execute(fut); + self.serve.protocol.exec.execute(fut)?; } else { return Ok(Async::Ready(())) }