diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 12ce64bb28..e743aae1d6 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -85,7 +85,7 @@ impl Drop for Receiver { // - Err: unreachable while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() { // maybe in future, we pass the value along with the error? - let _ = cb.send(Err(::Error::new_canceled())); + let _ = cb.send(Err(::Error::new_canceled(None))); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 899cf9670d..6b88998a31 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -243,11 +243,7 @@ where C: Connect, }, Err(_) => { error!("pooled connection was not ready, this is a hyper bug"); - let err = io::Error::new( - io::ErrorKind::BrokenPipe, - "pool selected dead connection", - ); - Either::B(future::err(::Error::Io(err))) + Either::B(future::err(::Error::new_canceled(None))) } } }); diff --git a/src/error.rs b/src/error.rs index a7f32c0d62..fc63f7d258 100644 --- a/src/error.rs +++ b/src/error.rs @@ -60,9 +60,9 @@ pub enum Error { } impl Error { - pub(crate) fn new_canceled() -> Error { + pub(crate) fn new_canceled(cause: Option) -> Error { Error::Cancel(Canceled { - _inner: (), + cause: cause.map(Box::new), }) } } @@ -73,10 +73,9 @@ impl Error { /// as the related connection gets closed by the remote. In that case, /// when the connection drops, the pending response future will be /// fulfilled with this error, signaling the `Request` was never started. +#[derive(Debug)] pub struct Canceled { - // maybe in the future this contains an optional value of - // what was canceled? - _inner: (), + cause: Option>, } impl Canceled { @@ -85,13 +84,6 @@ impl Canceled { } } -impl fmt::Debug for Canceled { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Canceled") - .finish() - } -} - impl fmt::Display for Canceled { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.pad(self.description()) @@ -142,6 +134,7 @@ impl StdError for Error { Io(ref error) => Some(error), Uri(ref error) => Some(error), Utf8(ref error) => Some(error), + Cancel(ref e) => e.cause.as_ref().map(|e| &**e as &StdError), Error::__Nonexhaustive(..) => unreachable!(), _ => None, } diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 6cae1b2445..329279bb66 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -119,7 +119,7 @@ where I: AsyncRead + AsyncWrite, } else { trace!("poll when on keep-alive"); if !T::should_read_first() { - self.try_empty_read()?; + self.require_empty_read()?; if self.is_read_closed() { return Ok(Async::Ready(None)); } @@ -281,17 +281,23 @@ where I: AsyncRead + AsyncWrite, pub fn read_keep_alive(&mut self) -> Result<(), ::Error> { debug_assert!(!self.can_read_head() && !self.can_read_body()); - trace!("Conn::read_keep_alive"); + trace!("read_keep_alive; is_mid_message={}", self.is_mid_message()); - if T::should_read_first() || !self.state.is_idle() { + if self.is_mid_message() { self.maybe_park_read(); } else { - self.try_empty_read()?; + self.require_empty_read()?; } - Ok(()) } + fn is_mid_message(&self) -> bool { + match (&self.state.reading, &self.state.writing) { + (&Reading::Init, &Writing::Init) => false, + _ => true, + } + } + fn maybe_park_read(&mut self) { if !self.io.is_read_blocked() { // the Io object is ready to read, which means it will never alert @@ -312,40 +318,68 @@ where I: AsyncRead + AsyncWrite, // // This should only be called for Clients wanting to enter the idle // state. - fn try_empty_read(&mut self) -> io::Result<()> { + fn require_empty_read(&mut self) -> io::Result<()> { assert!(!self.can_read_head() && !self.can_read_body()); if !self.io.read_buf().is_empty() { debug!("received an unexpected {} bytes", self.io.read_buf().len()); Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) } else { - match self.io.read_from_io() { - Ok(Async::Ready(0)) => { - trace!("try_empty_read; found EOF on connection: {:?}", self.state); - let must_error = self.should_error_on_eof(); - // order is important: must_error needs state BEFORE close_read - self.state.close_read(); - if must_error { - Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF waiting for response")) - } else { - Ok(()) - } + match self.try_io_read()? { + Async::Ready(0) => { + // case handled in try_io_read + Ok(()) }, - Ok(Async::Ready(n)) => { + Async::Ready(n) => { debug!("received {} bytes on an idle connection", n); - Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended")) + let desc = if self.state.is_idle() { + "unexpected bytes after message ended" + } else { + "unexpected bytes before writing message" + }; + Err(io::Error::new(io::ErrorKind::InvalidData, desc)) }, - Ok(Async::NotReady) => { + Async::NotReady => { Ok(()) }, - Err(e) => { - self.state.close(); - Err(e) - } } } } + fn try_io_read(&mut self) -> Poll { + match self.io.read_from_io() { + Ok(Async::Ready(0)) => { + trace!("try_io_read; found EOF on connection: {:?}", self.state); + let must_error = self.should_error_on_eof(); + let ret = if must_error { + let desc = if self.is_mid_message() { + "unexpected EOF waiting for response" + } else { + "unexpected EOF before writing message" + }; + Err(io::Error::new(io::ErrorKind::UnexpectedEof, desc)) + } else { + Ok(Async::Ready(0)) + }; + + // order is important: must_error needs state BEFORE close_read + self.state.close_read(); + ret + }, + Ok(Async::Ready(n)) => { + Ok(Async::Ready(n)) + }, + Ok(Async::NotReady) => { + Ok(Async::NotReady) + }, + Err(e) => { + self.state.close(); + Err(e) + } + } + } + + fn maybe_notify(&mut self) { // its possible that we returned NotReady from poll() without having // exhausted the underlying Io. We would have done this when we diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 4b9c5aa1e3..12c9e977e1 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -401,7 +401,9 @@ where let _ = cb.send(Err(err)); Ok(()) } else if let Ok(Async::Ready(Some((_, cb)))) = self.rx.poll() { - let _ = cb.send(Err(err)); + // in this case, the message was never even started, so it's safe to tell + // the user that the request was completely canceled + let _ = cb.send(Err(::Error::new_canceled(Some(err)))); Ok(()) } else { Err(err) @@ -431,13 +433,14 @@ where #[cfg(test)] mod tests { + extern crate pretty_env_logger; + use super::*; use mock::AsyncIo; use proto::ClientTransaction; #[test] - fn client_read_response_before_writing_request() { - extern crate pretty_env_logger; + fn client_read_bytes_before_writing_request() { let _ = pretty_env_logger::try_init(); ::futures::lazy(|| { let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100); @@ -452,11 +455,16 @@ mod tests { }; let res_rx = tx.send((req, None::<::Body>)).unwrap(); - dispatcher.poll().expect("dispatcher poll 1"); - dispatcher.poll().expect("dispatcher poll 2"); - let _res = res_rx.wait() + let a1 = dispatcher.poll().expect("error should be sent on channel"); + assert!(a1.is_ready(), "dispatcher should be closed"); + let err = res_rx.wait() .expect("callback poll") - .expect("callback response"); + .expect_err("callback response"); + + match err { + ::Error::Cancel(_) => (), + other => panic!("expected Cancel(_), got {:?}", other), + } Ok::<(), ()>(()) }).wait().unwrap(); } diff --git a/tests/client.rs b/tests/client.rs index 523f1bcff0..0fdde5abc5 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -14,9 +14,10 @@ use hyper::client::{Client, Request, HttpConnector}; use hyper::{Method, StatusCode}; use futures::{Future, Stream}; +use futures::future::Either; use futures::sync::oneshot; -use tokio_core::reactor::{Core, Handle}; +use tokio_core::reactor::{Core, Handle, Timeout}; fn client(handle: &Handle) -> Client { Client::new(handle) @@ -523,7 +524,6 @@ test! { } - #[test] fn client_keep_alive() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -562,10 +562,72 @@ fn client_keep_alive() { core.run(res.join(rx).map(|r| r.0)).unwrap(); } +#[test] +fn client_keep_alive_connreset() { + use std::sync::mpsc; + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + // This one seems to hang forever + let client = client(&handle); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = mpsc::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); + + // Wait for client to indicate it is done processing the first request + // This is what seem to trigger the race condition -- without it client notices + // connection is closed while processing the first request. + let _ = rx2.recv(); + let _r = sock.shutdown(std::net::Shutdown::Both); + + // Let client know it can try to reuse the connection + let _ = tx1.send(()); + }); + + + let res = client.get(format!("http://{}/a", addr).parse().unwrap()); + core.run(res).unwrap(); + + let _ = tx2.send(()); + + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + core.run(rx).unwrap(); + + let t = Timeout::new(Duration::from_millis(100), &handle).unwrap(); + let res = client.get(format!("http://{}/b", addr).parse().unwrap()); + let fut = res.select2(t).then(|result| match result { + Ok(Either::A((resp, _))) => Ok(resp), + Err(Either::A((err, _))) => Err(err), + Ok(Either::B(_)) | + Err(Either::B(_)) => Err(hyper::Error::Timeout), + }); + + // for now, the 2nd request is just canceled, since the connection is found to be dead + // at the same time the request is scheduled. + // + // in the future, it'd be nice to auto retry the request, but can't really be done yet + // as the `connector` isn't clone so we can't use it "later", when the future resolves. + let err = core.run(fut).unwrap_err(); + match err { + hyper::Error::Cancel(..) => (), + other => panic!("expected Cancel error, got {:?}", other), + } +} -/* TODO: re-enable once retry works, its currently a flaky test #[test] -fn client_pooled_socket_disconnected() { +fn client_keep_alive_extra_body() { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); @@ -581,51 +643,31 @@ fn client_pooled_socket_disconnected() { sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); let mut buf = [0; 4096]; sock.read(&mut buf).expect("read 1"); - let remote_addr = sock.peer_addr().unwrap().to_string(); - let out = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", remote_addr.len(), remote_addr); - sock.write_all(out.as_bytes()).expect("write 1"); - drop(sock); - tx1.send(()); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello").expect("write 1"); + // the body "hello", while ignored because its a HEAD request, should mean the connection + // cannot be put back in the pool + let _ = tx1.send(()); - let mut sock = server.accept().unwrap().0; - sock.read(&mut buf).expect("read 2"); - let second_get = b"GET /b HTTP/1.1\r\n"; - assert_eq!(&buf[..second_get.len()], second_get); - let remote_addr = sock.peer_addr().unwrap().to_string(); - let out = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", remote_addr.len(), remote_addr); - sock.write_all(out.as_bytes()).expect("write 2"); - tx2.send(()); + let mut sock2 = server.accept().unwrap().0; + let n2 = sock2.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock2.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); + let _ = tx2.send(()); }); - // spin shortly so we receive the hangup on the client socket - let sleep = Timeout::new(Duration::from_millis(500), &core.handle()).unwrap(); - core.run(sleep).unwrap(); + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let res = client.get(format!("http://{}/a", addr).parse().unwrap()) - .and_then(|res| { - res.body() - .map(|chunk| chunk.to_vec()) - .collect() - .map(|vec| vec.concat()) - .map(|vec| String::from_utf8(vec).unwrap()) - }); - let addr1 = core.run(res.join(rx).map(|r| r.0)).unwrap(); + let req = Request::new(Method::Head, format!("http://{}/a", addr).parse().unwrap()); + let res = client.request(req); + core.run(res.join(rx).map(|r| r.0)).unwrap(); let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let res = client.get(format!("http://{}/b", addr).parse().unwrap()) - .and_then(|res| { - res.body() - .map(|chunk| chunk.to_vec()) - .collect() - .map(|vec| vec.concat()) - .map(|vec| String::from_utf8(vec).unwrap()) - }); - let addr2 = core.run(res.join(rx).map(|r| r.0)).unwrap(); - - assert_ne!(addr1, addr2); + let res = client.get(format!("http://{}/b", addr).parse().unwrap()); + core.run(res.join(rx).map(|r| r.0)).unwrap(); } -*/ mod dispatch_impl { use super::*;