diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index e743aae1d6..b61ea735d4 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -4,8 +4,8 @@ use futures::sync::{mpsc, oneshot}; use common::Never; use super::cancel::{Cancel, Canceled}; -pub type Callback = oneshot::Sender<::Result>; -pub type Promise = oneshot::Receiver<::Result>; +pub type Callback = oneshot::Sender)>>; +pub type Promise = oneshot::Receiver)>>; pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); @@ -23,7 +23,7 @@ pub fn channel() -> (Sender, Receiver) { pub struct Sender { cancel: Cancel, - inner: mpsc::UnboundedSender<(T, Callback)>, + inner: mpsc::UnboundedSender<(T, Callback)>, } impl Sender { @@ -35,7 +35,7 @@ impl Sender { self.cancel.cancel(); } - pub fn send(&self, val: T) -> Result, T> { + pub fn send(&self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); self.inner.unbounded_send((val, tx)) .map(move |_| rx) @@ -54,11 +54,11 @@ impl Clone for Sender { pub struct Receiver { canceled: Canceled, - inner: mpsc::UnboundedReceiver<(T, Callback)>, + inner: mpsc::UnboundedReceiver<(T, Callback)>, } impl Stream for Receiver { - type Item = (T, Callback); + type Item = (T, Callback); type Error = Never; fn poll(&mut self) -> Poll, Self::Error> { @@ -83,9 +83,9 @@ impl Drop for Receiver { // - Ready(None): the end. we want to stop looping // - NotReady: unreachable // - Err: unreachable - while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() { + while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() { // maybe in future, we pass the value along with the error? - let _ = cb.send(Err(::Error::new_canceled(None))); + let _ = cb.send(Err((::Error::new_canceled(None), Some(val)))); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 6b88998a31..4dbf0fbabc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -38,12 +38,12 @@ mod pool; pub mod compat; /// A Client to make outgoing HTTP requests. -// If the Connector is clone, then the Client can be clone easily. pub struct Client { - connector: C, + connector: Rc, executor: Exec, h1_writev: bool, pool: Pool>, + retry_canceled_requests: bool, } impl Client { @@ -95,10 +95,11 @@ impl Client { #[inline] fn configured(config: Config, exec: Exec) -> Client { Client { - connector: config.connector, + connector: Rc::new(config.connector), executor: exec, h1_writev: config.h1_writev, - pool: Pool::new(config.keep_alive, config.keep_alive_timeout) + pool: Pool::new(config.keep_alive, config.keep_alive_timeout), + retry_canceled_requests: config.retry_canceled_requests, } } } @@ -116,54 +117,7 @@ where C: Connect, /// Send a constructed Request using this Client. #[inline] - pub fn request(&self, req: Request) -> FutureResponse { - self.call(req) - } - - /// Send an `http::Request` using this Client. - #[inline] - #[cfg(feature = "compat")] - pub fn request_compat(&self, req: http::Request) -> compat::CompatFutureResponse { - self::compat::future(self.call(req.into())) - } - - /// Convert into a client accepting `http::Request`. - #[cfg(feature = "compat")] - pub fn into_compat(self) -> compat::CompatClient { - self::compat::client(self) - } -} - -/// A `Future` that will resolve to an HTTP Response. -#[must_use = "futures do nothing unless polled"] -pub struct FutureResponse(Box + 'static>); - -impl fmt::Debug for FutureResponse { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("Future") - } -} - -impl Future for FutureResponse { - type Item = Response; - type Error = ::Error; - - fn poll(&mut self) -> Poll { - self.0.poll() - } -} - -impl Service for Client -where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, -{ - type Request = Request; - type Response = Response; - type Error = ::Error; - type Future = FutureResponse; - - fn call(&self, req: Self::Request) -> Self::Future { + pub fn request(&self, mut req: Request) -> FutureResponse { match req.version() { HttpVersion::Http10 | HttpVersion::Http11 => (), @@ -173,8 +127,7 @@ where C: Connect, } } - let url = req.uri().clone(); - let domain = match uri::scheme_and_authority(&url) { + let domain = match uri::scheme_and_authority(req.uri()) { Some(uri) => uri, None => { return FutureResponse(Box::new(future::err(::Error::Io( @@ -185,15 +138,44 @@ where C: Connect, )))); } }; - let (mut head, body) = request::split(req); - if !head.headers.has::() { + if !req.headers().has::() { let host = Host::new( domain.host().expect("authority implies host").to_owned(), domain.port(), ); - head.headers.set_pos(0, host); + req.headers_mut().set_pos(0, host); } + let client = self.clone(); + let is_proxy = req.is_proxy(); + let uri = req.uri().clone(); + let fut = RetryableSendRequest { + client: client, + future: self.send_request(req, &domain), + domain: domain, + is_proxy: is_proxy, + uri: uri, + }; + FutureResponse(Box::new(fut)) + } + + /// Send an `http::Request` using this Client. + #[inline] + #[cfg(feature = "compat")] + pub fn request_compat(&self, req: http::Request) -> compat::CompatFutureResponse { + self::compat::future(self.call(req.into())) + } + + /// Convert into a client accepting `http::Request`. + #[cfg(feature = "compat")] + pub fn into_compat(self) -> compat::CompatClient { + self::compat::client(self) + } + + //TODO: replace with `impl Future` when stable + fn send_request(&self, req: Request, domain: &Uri) -> Box>> { + let url = req.uri().clone(); + let (head, body) = request::split(req); let checkout = self.pool.checkout(domain.as_ref()); let connect = { let executor = self.executor.clone(); @@ -220,53 +202,147 @@ where C: Connect, let race = checkout.select(connect) .map(|(client, _work)| client) - .map_err(|(e, _work)| { + .map_err(|(e, _checkout)| { // the Pool Checkout cannot error, so the only error // is from the Connector // XXX: should wait on the Checkout? Problem is // that if the connector is failing, it may be that we // never had a pooled stream at all - e.into() + ClientError::Normal(e.into()) }); let resp = race.and_then(move |client| { + let conn_reused = client.is_reused(); match client.tx.send((head, body)) { Ok(rx) => { client.should_close.set(false); - Either::A(rx.then(|res| { + Either::A(rx.then(move |res| { match res { Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(err), + Ok(Err((err, orig_req))) => Err(match orig_req { + Some(req) => ClientError::Canceled { + connection_reused: conn_reused, + reason: err, + req: req, + }, + None => ClientError::Normal(err), + }), + // this is definite bug if it happens, but it shouldn't happen! Err(_) => panic!("dispatch dropped without returning error"), } })) }, - Err(_) => { - error!("pooled connection was not ready, this is a hyper bug"); - Either::B(future::err(::Error::new_canceled(None))) + Err(req) => { + debug!("pooled connection was not ready"); + let err = ClientError::Canceled { + connection_reused: conn_reused, + reason: ::Error::new_canceled(None), + req: req, + }; + Either::B(future::err(err)) } } }); - FutureResponse(Box::new(resp)) + Box::new(resp) } +} + +impl Service for Client +where C: Connect, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Request = Request; + type Response = Response; + type Error = ::Error; + type Future = FutureResponse; + fn call(&self, req: Self::Request) -> Self::Future { + self.request(req) + } } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Client { Client { connector: self.connector.clone(), executor: self.executor.clone(), h1_writev: self.h1_writev, pool: self.pool.clone(), + retry_canceled_requests: self.retry_canceled_requests, } } } impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("Client") + f.debug_struct("Client") + .finish() + } +} + +/// A `Future` that will resolve to an HTTP Response. +#[must_use = "futures do nothing unless polled"] +pub struct FutureResponse(Box + 'static>); + +impl fmt::Debug for FutureResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Future") + } +} + +impl Future for FutureResponse { + type Item = Response; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + self.0.poll() + } +} + +struct RetryableSendRequest { + client: Client, + domain: Uri, + future: Box>>, + is_proxy: bool, + uri: Uri, +} + +impl Future for RetryableSendRequest +where + C: Connect, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Item = Response; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + match self.future.poll() { + Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(ClientError::Normal(err)) => return Err(err), + Err(ClientError::Canceled { + connection_reused, + req, + reason, + }) => { + if !self.client.retry_canceled_requests || !connection_reused { + // if client disabled, don't retry + // a fresh connection means we definitely can't retry + return Err(reason); + } + + trace!("unstarted request canceled, trying again (reason={:?})", reason); + let mut req = request::join(req); + req.set_proxy(self.is_proxy); + req.set_uri(self.uri.clone()); + self.future = self.client.send_request(req, &self.domain); + } + } + } } } @@ -303,6 +379,15 @@ impl Drop for HyperClient { } } +pub(crate) enum ClientError { + Normal(::Error), + Canceled { + connection_reused: bool, + req: (::proto::RequestHead, Option), + reason: ::Error, + } +} + /// Configuration for a Client pub struct Config { _body_type: PhantomData, @@ -313,6 +398,7 @@ pub struct Config { h1_writev: bool, //TODO: make use of max_idle config max_idle: usize, + retry_canceled_requests: bool, } /// Phantom type used to signal that `Config` should create a `HttpConnector`. @@ -323,12 +409,12 @@ impl Default for Config { fn default() -> Config { Config { _body_type: PhantomData::, - //connect_timeout: Duration::from_secs(10), connector: UseDefaultConnector(()), keep_alive: true, keep_alive_timeout: Some(Duration::from_secs(90)), h1_writev: true, max_idle: 5, + retry_canceled_requests: true, } } } @@ -347,12 +433,12 @@ impl Config { pub fn body(self) -> Config { Config { _body_type: PhantomData::, - //connect_timeout: self.connect_timeout, connector: self.connector, keep_alive: self.keep_alive, keep_alive_timeout: self.keep_alive_timeout, h1_writev: self.h1_writev, max_idle: self.max_idle, + retry_canceled_requests: self.retry_canceled_requests, } } @@ -361,12 +447,12 @@ impl Config { pub fn connector(self, val: CC) -> Config { Config { _body_type: self._body_type, - //connect_timeout: self.connect_timeout, connector: val, keep_alive: self.keep_alive, keep_alive_timeout: self.keep_alive_timeout, h1_writev: self.h1_writev, max_idle: self.max_idle, + retry_canceled_requests: self.retry_canceled_requests, } } @@ -390,17 +476,6 @@ impl Config { self } - /* - /// Set the timeout for connecting to a URL. - /// - /// Default is 10 seconds. - #[inline] - pub fn connect_timeout(mut self, val: Duration) -> Config { - self.connect_timeout = val; - self - } - */ - /// Set whether HTTP/1 connections should try to use vectored writes, /// or always flatten into a single buffer. /// @@ -408,13 +483,30 @@ impl Config { /// but may also improve performance when an IO transport doesn't /// support vectored writes well, such as most TLS implementations. /// - /// Default is true. + /// Default is `true`. #[inline] pub fn http1_writev(mut self, val: bool) -> Config { self.h1_writev = val; self } + /// Set whether to retry requests that get disrupted before ever starting + /// to write. + /// + /// This means a request that is queued, and gets given an idle, reused + /// connection, and then encounters an error immediately as the idle + /// connection was found to be unusable. + /// + /// When this is set to `false`, the related `FutureResponse` would instead + /// resolve to an `Error::Cancel`. + /// + /// Default is `true`. + #[inline] + pub fn retry_canceled_requests(mut self, val: bool) -> Config { + self.retry_canceled_requests = val; + self + } + #[doc(hidden)] #[deprecated(since="0.11.11", note="no_proto is always enabled")] pub fn no_proto(self) -> Config { diff --git a/src/client/pool.rs b/src/client/pool.rs index 4c7d90221d..095b79ad62 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -211,6 +211,12 @@ pub struct Pooled { pool: Weak>>, } +impl Pooled { + pub fn is_reused(&self) -> bool { + self.entry.is_reused + } +} + impl Deref for Pooled { type Target = T; fn deref(&self) -> &T { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 12c9e977e1..88f24eccd2 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -32,7 +32,7 @@ pub struct Server { } pub struct Client { - callback: Option>>, + callback: Option>)>>>, rx: ClientRx, } @@ -398,12 +398,13 @@ where }, Err(err) => { if let Some(cb) = self.callback.take() { - let _ = cb.send(Err(err)); + let _ = cb.send(Err((err, None))); Ok(()) - } else if let Ok(Async::Ready(Some((_, cb)))) = self.rx.poll() { + } else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll() { + trace!("canceling queued request with connection error: {}", err); // in this case, the message was never even started, so it's safe to tell // the user that the request was completely canceled - let _ = cb.send(Err(::Error::new_canceled(Some(err)))); + let _ = cb.send(Err((::Error::new_canceled(Some(err)), Some(req)))); Ok(()) } else { Err(err) diff --git a/src/proto/request.rs b/src/proto/request.rs index adeebdb688..a344138c75 100644 --- a/src/proto/request.rs +++ b/src/proto/request.rs @@ -105,6 +105,8 @@ impl Request { /// protected by TLS. #[inline] pub fn set_proxy(&mut self, is_proxy: bool) { self.is_proxy = is_proxy; } + + pub(crate) fn is_proxy(&self) -> bool { self.is_proxy } } impl Request { @@ -165,16 +167,9 @@ impl From> for Request { /// Constructs a request using a received ResponseHead and optional body pub fn from_wire(addr: Option, incoming: RequestHead, body: Option) -> Request { - let MessageHead { version, subject: RequestLine(method, uri), headers } = incoming; - Request { - method: method, - uri: uri, - headers: headers, - version: version, remote_addr: addr, - body: body, - is_proxy: false, + ..join((incoming, body)) } } @@ -192,6 +187,20 @@ pub fn split(req: Request) -> (RequestHead, Option) { (head, req.body) } +pub fn join((head, body): (RequestHead, Option)) -> Request { + let MessageHead { version, subject: RequestLine(method, uri), headers } = head; + + Request { + method: method, + uri: uri, + headers: headers, + version: version, + remote_addr: None, + body: body, + is_proxy: false, + } +} + pub fn addr(req: &mut Request, addr: SocketAddr) { req.remote_addr = Some(addr); } diff --git a/tests/client.rs b/tests/client.rs index 0fdde5abc5..39fab3d2d7 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -573,7 +573,6 @@ fn client_keep_alive_connreset() { let mut core = Core::new().unwrap(); let handle = core.handle(); - // This one seems to hang forever let client = client(&handle); let (tx1, rx1) = oneshot::channel(); @@ -594,6 +593,14 @@ fn client_keep_alive_connreset() { // Let client know it can try to reuse the connection let _ = tx1.send(()); + + // use sock2 so that sock isn't dropped yet + let mut sock2 = server.accept().unwrap().0; + sock2.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock2.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock2.read(&mut buf).expect("read 2"); + sock2.write_all(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); }); @@ -606,7 +613,11 @@ fn client_keep_alive_connreset() { core.run(rx).unwrap(); let t = Timeout::new(Duration::from_millis(100), &handle).unwrap(); - let res = client.get(format!("http://{}/b", addr).parse().unwrap()); + let res = client.get(format!("http://{}/b", addr).parse().unwrap()) + .map(|res| { + assert_eq!(res.status().as_u16(), 222); + }); + let fut = res.select2(t).then(|result| match result { Ok(Either::A((resp, _))) => Ok(resp), Err(Either::A((err, _))) => Err(err), @@ -614,16 +625,7 @@ fn client_keep_alive_connreset() { Err(Either::B(_)) => Err(hyper::Error::Timeout), }); - // for now, the 2nd request is just canceled, since the connection is found to be dead - // at the same time the request is scheduled. - // - // in the future, it'd be nice to auto retry the request, but can't really be done yet - // as the `connector` isn't clone so we can't use it "later", when the future resolves. - let err = core.run(fut).unwrap_err(); - match err { - hyper::Error::Cancel(..) => (), - other => panic!("expected Cancel error, got {:?}", other), - } + core.run(fut).expect("req 2"); } #[test]