Skip to content

Commit

Permalink
fix(client): send an error back to client when dispatch misbehaves (f…
Browse files Browse the repository at this point in the history
…ixes #2649)
  • Loading branch information
nox authored and seanmonstar committed Oct 28, 2022
1 parent ef89ae8 commit d508ba9
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 15 deletions.
56 changes: 41 additions & 15 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand All @@ -101,7 +101,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand Down Expand Up @@ -131,15 +131,15 @@ impl<T, U> UnboundedSender<T, U> {
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}

pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand Down Expand Up @@ -215,33 +215,59 @@ impl<T, U> Drop for Envelope<T, U> {

pub(crate) enum Callback<T, U> {
#[allow(unused)]
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
}

impl<T, U> Drop for Callback<T, U> {
fn drop(&mut self) {
// FIXME(nox): What errors do we want here?
let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
"user code panicked"
} else {
"runtime dropped the dispatch task"
});

match self {
Callback::Retry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err((error, None)));
}
}
Callback::NoRetry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err(error));
}
}
}
}
}

impl<T, U> Callback<T, U> {
#[cfg(feature = "http2")]
pub(crate) fn is_canceled(&self) -> bool {
match *self {
Callback::Retry(ref tx) => tx.is_closed(),
Callback::NoRetry(ref tx) => tx.is_closed(),
Callback::Retry(Some(ref tx)) => tx.is_closed(),
Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
_ => unreachable!(),
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_closed(cx),
Callback::NoRetry(ref mut tx) => tx.poll_closed(cx),
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
_ => unreachable!(),
}
}

pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
Callback::Retry(ref mut tx) => {
let _ = tx.take().unwrap().send(val);
}
Callback::NoRetry(tx) => {
let _ = tx.send(val.map_err(|e| e.0));
Callback::NoRetry(ref mut tx) => {
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub(super) enum User {
#[cfg(feature = "http1")]
ManualUpgrade,

/// The dispatch task is gone.
#[cfg(feature = "client")]
DispatchGone,

/// User aborted in an FFI callback.
#[cfg(feature = "ffi")]
AbortedByCallback,
Expand Down Expand Up @@ -314,6 +318,11 @@ impl Error {
Error::new_user(User::AbortedByCallback)
}

#[cfg(feature = "client")]
pub(super) fn new_user_dispatch_gone() -> Error {
Error::new(Kind::User(User::DispatchGone))
}

#[cfg(feature = "http2")]
pub(super) fn new_h2(cause: ::h2::Error) -> Error {
if cause.is_io() {
Expand Down Expand Up @@ -390,6 +399,7 @@ impl Error {
Kind::User(User::NoUpgrade) => "no upgrade available",
#[cfg(feature = "http1")]
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
Kind::User(User::DispatchGone) => "dispatch task is gone",
#[cfg(feature = "ffi")]
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
}
Expand Down
38 changes: 38 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,44 @@ mod conn {
done_tx.send(()).unwrap();
}

#[tokio::test]
async fn test_body_panics() {
use hyper::body::HttpBody;

let _ = pretty_env_logger::try_init();

let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
drain_til_eof(sock).await.expect("server read");
});

let io = tcp_connect(&addr).await.expect("tcp connect");

let (mut client, conn) = conn::Builder::new().handshake(io).await.expect("handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

let req = Request::post("/a")
.body(Body::from("baguette").map_data::<_, &[u8]>(|_| panic!("oopsie")))
.unwrap();

let error = client.send_request(req).await.unwrap_err();

assert!(error.is_user());
assert_eq!(
error.to_string(),
"dispatch task is gone: user code panicked"
);
}

async fn drain_til_eof<T: AsyncRead + Unpin>(mut sock: T) -> io::Result<()> {
let mut buf = [0u8; 1024];
loop {
Expand Down

0 comments on commit d508ba9

Please sign in to comment.