Skip to content

Commit a314e77

Browse files
committed
fix(client): send an error back to client when dispatch misbehaves (fixes #2649)
1 parent f193ea0 commit a314e77

File tree

3 files changed

+89
-14
lines changed

3 files changed

+89
-14
lines changed

Diff for: src/client/dispatch.rs

+40-14
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl<T, U> Sender<T, U> {
8686
}
8787
let (tx, rx) = oneshot::channel();
8888
self.inner
89-
.send(Envelope(Some((val, Callback::Retry(tx)))))
89+
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
9090
.map(move |_| rx)
9191
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
9292
}
@@ -97,7 +97,7 @@ impl<T, U> Sender<T, U> {
9797
}
9898
let (tx, rx) = oneshot::channel();
9999
self.inner
100-
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
100+
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
101101
.map(move |_| rx)
102102
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
103103
}
@@ -124,7 +124,7 @@ impl<T, U> UnboundedSender<T, U> {
124124
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
125125
let (tx, rx) = oneshot::channel();
126126
self.inner
127-
.send(Envelope(Some((val, Callback::Retry(tx)))))
127+
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
128128
.map(move |_| rx)
129129
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
130130
}
@@ -198,33 +198,59 @@ impl<T, U> Drop for Envelope<T, U> {
198198
}
199199

200200
pub(crate) enum Callback<T, U> {
201-
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
202-
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
201+
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
202+
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
203+
}
204+
205+
impl<T, U> Drop for Callback<T, U> {
206+
fn drop(&mut self) {
207+
// FIXME(nox): What errors do we want here?
208+
let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
209+
"user code panicked"
210+
} else {
211+
"runtime dropped the dispatch task"
212+
});
213+
214+
match self {
215+
Callback::Retry(tx) => {
216+
if let Some(tx) = tx.take() {
217+
let _ = tx.send(Err((error, None)));
218+
}
219+
}
220+
Callback::NoRetry(tx) => {
221+
if let Some(tx) = tx.take() {
222+
let _ = tx.send(Err(error));
223+
}
224+
}
225+
}
226+
}
203227
}
204228

205229
impl<T, U> Callback<T, U> {
206230
#[cfg(feature = "http2")]
207231
pub(crate) fn is_canceled(&self) -> bool {
208232
match *self {
209-
Callback::Retry(ref tx) => tx.is_closed(),
210-
Callback::NoRetry(ref tx) => tx.is_closed(),
233+
Callback::Retry(Some(ref tx)) => tx.is_closed(),
234+
Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
235+
_ => unreachable!(),
211236
}
212237
}
213238

214239
pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
215240
match *self {
216-
Callback::Retry(ref mut tx) => tx.poll_closed(cx),
217-
Callback::NoRetry(ref mut tx) => tx.poll_closed(cx),
241+
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
242+
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
243+
_ => unreachable!(),
218244
}
219245
}
220246

221-
pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
247+
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
222248
match self {
223-
Callback::Retry(tx) => {
224-
let _ = tx.send(val);
249+
Callback::Retry(ref mut tx) => {
250+
let _ = tx.take().unwrap().send(val);
225251
}
226-
Callback::NoRetry(tx) => {
227-
let _ = tx.send(val.map_err(|e| e.0));
252+
Callback::NoRetry(ref mut tx) => {
253+
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
228254
}
229255
}
230256
}

Diff for: src/error.rs

+11
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ pub(super) enum User {
137137
#[cfg(feature = "server")]
138138
WithoutShutdownNonHttp1,
139139

140+
/// The dispatch task is gone.
141+
#[cfg(feature = "client")]
142+
DispatchGone,
143+
140144
/// User aborted in an FFI callback.
141145
#[cfg(feature = "ffi")]
142146
AbortedByCallback,
@@ -387,6 +391,11 @@ impl Error {
387391
Error::new_user(User::AbortedByCallback)
388392
}
389393

394+
#[cfg(feature = "client")]
395+
pub(super) fn new_user_dispatch_gone() -> Error {
396+
Error::new(Kind::User(User::DispatchGone))
397+
}
398+
390399
#[cfg(feature = "http2")]
391400
pub(super) fn new_h2(cause: ::h2::Error) -> Error {
392401
if cause.is_io() {
@@ -483,6 +492,8 @@ impl Error {
483492
Kind::User(User::WithoutShutdownNonHttp1) => {
484493
"without_shutdown() called on a non-HTTP/1 connection"
485494
}
495+
#[cfg(feature = "client")]
496+
Kind::User(User::DispatchGone) => "dispatch task is gone",
486497
#[cfg(feature = "ffi")]
487498
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
488499
}

Diff for: tests/client.rs

+38
Original file line numberDiff line numberDiff line change
@@ -3114,6 +3114,44 @@ mod conn {
31143114
done_tx.send(()).unwrap();
31153115
}
31163116

3117+
#[tokio::test]
3118+
async fn test_body_panics() {
3119+
use hyper::body::HttpBody;
3120+
3121+
let _ = pretty_env_logger::try_init();
3122+
3123+
let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
3124+
.await
3125+
.unwrap();
3126+
let addr = listener.local_addr().unwrap();
3127+
3128+
// spawn a server that reads but doesn't write
3129+
tokio::spawn(async move {
3130+
let sock = listener.accept().await.unwrap().0;
3131+
drain_til_eof(sock).await.expect("server read");
3132+
});
3133+
3134+
let io = tcp_connect(&addr).await.expect("tcp connect");
3135+
3136+
let (mut client, conn) = conn::Builder::new().handshake(io).await.expect("handshake");
3137+
3138+
tokio::spawn(async move {
3139+
conn.await.expect("client conn shouldn't error");
3140+
});
3141+
3142+
let req = Request::post("/a")
3143+
.body(Body::from("baguette").map_data::<_, &[u8]>(|_| panic!("oopsie")))
3144+
.unwrap();
3145+
3146+
let error = client.send_request(req).await.unwrap_err();
3147+
3148+
assert!(error.is_user());
3149+
assert_eq!(
3150+
error.to_string(),
3151+
"dispatch task is gone: user code panicked"
3152+
);
3153+
}
3154+
31173155
async fn drain_til_eof<T: AsyncRead + Unpin>(mut sock: T) -> io::Result<()> {
31183156
let mut buf = [0u8; 1024];
31193157
loop {

0 commit comments

Comments
 (0)