@@ -90,7 +90,7 @@ impl<T, U> Sender<T, U> {
90
90
}
91
91
let ( tx, rx) = oneshot:: channel ( ) ;
92
92
self . inner
93
- . send ( Envelope ( Some ( ( val, Callback :: Retry ( tx ) ) ) ) )
93
+ . send ( Envelope ( Some ( ( val, Callback :: Retry ( Some ( tx ) ) ) ) ) )
94
94
. map ( move |_| rx)
95
95
. map_err ( |mut e| ( e. 0 ) . 0 . take ( ) . expect ( "envelope not dropped" ) . 0 )
96
96
}
@@ -101,7 +101,7 @@ impl<T, U> Sender<T, U> {
101
101
}
102
102
let ( tx, rx) = oneshot:: channel ( ) ;
103
103
self . inner
104
- . send ( Envelope ( Some ( ( val, Callback :: NoRetry ( tx ) ) ) ) )
104
+ . send ( Envelope ( Some ( ( val, Callback :: NoRetry ( Some ( tx ) ) ) ) ) )
105
105
. map ( move |_| rx)
106
106
. map_err ( |mut e| ( e. 0 ) . 0 . take ( ) . expect ( "envelope not dropped" ) . 0 )
107
107
}
@@ -131,7 +131,7 @@ impl<T, U> UnboundedSender<T, U> {
131
131
pub ( crate ) fn try_send ( & mut self , val : T ) -> Result < RetryPromise < T , U > , T > {
132
132
let ( tx, rx) = oneshot:: channel ( ) ;
133
133
self . inner
134
- . send ( Envelope ( Some ( ( val, Callback :: Retry ( tx ) ) ) ) )
134
+ . send ( Envelope ( Some ( ( val, Callback :: Retry ( Some ( tx ) ) ) ) ) )
135
135
. map ( move |_| rx)
136
136
. map_err ( |mut e| ( e. 0 ) . 0 . take ( ) . expect ( "envelope not dropped" ) . 0 )
137
137
}
@@ -215,33 +215,60 @@ impl<T, U> Drop for Envelope<T, U> {
215
215
216
216
pub ( crate ) enum Callback < T , U > {
217
217
#[ allow( unused) ]
218
- Retry ( oneshot:: Sender < Result < U , ( crate :: Error , Option < T > ) > > ) ,
219
- NoRetry ( oneshot:: Sender < Result < U , crate :: Error > > ) ,
218
+ Retry ( Option < oneshot:: Sender < Result < U , ( crate :: Error , Option < T > ) > > > ) ,
219
+ NoRetry ( Option < oneshot:: Sender < Result < U , crate :: Error > > > ) ,
220
+ }
221
+
222
+ impl < T , U > Drop for Callback < T , U > {
223
+ fn drop ( & mut self ) {
224
+ // FIXME(nox): What errors do we want here?
225
+ let error = crate :: Error :: new_user_dispatch_gone ( ) . with ( if std:: thread:: panicking ( ) {
226
+ "user code panicked"
227
+ } else {
228
+ "runtime dropped the dispatch task"
229
+ } ) ;
230
+
231
+ match self {
232
+ Callback :: Retry ( tx) => {
233
+ if let Some ( tx) = tx. take ( ) {
234
+ let _ = tx. send ( Err ( ( error, None ) ) ) ;
235
+ }
236
+ }
237
+ Callback :: NoRetry ( tx) => {
238
+ if let Some ( tx) = tx. take ( ) {
239
+ let _ = tx. send ( Err ( error) ) ;
240
+ }
241
+ }
242
+ }
243
+ }
244
+ >>>>>>> 9 fa36382 ( fix( client) : send an error back to client when dispatch misbehaves ( fixes #2649 ) )
220
245
}
221
246
222
247
impl < T , U > Callback < T , U > {
223
248
#[ cfg( feature = "http2" ) ]
224
249
pub ( crate ) fn is_canceled ( & self ) -> bool {
225
250
match * self {
226
- Callback :: Retry ( ref tx) => tx. is_closed ( ) ,
227
- Callback :: NoRetry ( ref tx) => tx. is_closed ( ) ,
251
+ Callback :: Retry ( Some ( ref tx) ) => tx. is_closed ( ) ,
252
+ Callback :: NoRetry ( Some ( ref tx) ) => tx. is_closed ( ) ,
253
+ _ => unreachable ! ( ) ,
228
254
}
229
255
}
230
256
231
257
pub ( crate ) fn poll_canceled ( & mut self , cx : & mut task:: Context < ' _ > ) -> Poll < ( ) > {
232
258
match * self {
233
- Callback :: Retry ( ref mut tx) => tx. poll_closed ( cx) ,
234
- Callback :: NoRetry ( ref mut tx) => tx. poll_closed ( cx) ,
259
+ Callback :: Retry ( Some ( ref mut tx) ) => tx. poll_closed ( cx) ,
260
+ Callback :: NoRetry ( Some ( ref mut tx) ) => tx. poll_closed ( cx) ,
261
+ _ => unreachable ! ( ) ,
235
262
}
236
263
}
237
264
238
- pub ( crate ) fn send ( self , val : Result < U , ( crate :: Error , Option < T > ) > ) {
265
+ pub ( crate ) fn send ( mut self , val : Result < U , ( crate :: Error , Option < T > ) > ) {
239
266
match self {
240
- Callback :: Retry ( tx) => {
241
- let _ = tx. send ( val) ;
267
+ Callback :: Retry ( ref mut tx) => {
268
+ let _ = tx. take ( ) . unwrap ( ) . send ( val) ;
242
269
}
243
- Callback :: NoRetry ( tx) => {
244
- let _ = tx. send ( val. map_err ( |e| e. 0 ) ) ;
270
+ Callback :: NoRetry ( ref mut tx) => {
271
+ let _ = tx. take ( ) . unwrap ( ) . send ( val. map_err ( |e| e. 0 ) ) ;
245
272
}
246
273
}
247
274
}
0 commit comments