diff --git a/src/client/mod.rs b/src/client/mod.rs index b026e5cbea..8cdbc0395e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -182,22 +182,25 @@ where C: Connect, let pool = self.pool.clone(); let pool_key = Rc::new(domain.to_string()); let h1_writev = self.h1_writev; - self.connector.connect(url) - .and_then(move |io| { - let (tx, rx) = dispatch::channel(); - let tx = HyperClient { - tx: tx, - should_close: Cell::new(true), - }; - let pooled = pool.pooled(pool_key, tx); - let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); - if !h1_writev { - conn.set_write_strategy_flatten(); - } - let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); - executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?; - Ok(pooled) - }) + let connector = self.connector.clone(); + future::lazy(move || { + connector.connect(url) + .and_then(move |io| { + let (tx, rx) = dispatch::channel(); + let tx = HyperClient { + tx: tx, + should_close: Cell::new(true), + }; + let pooled = pool.pooled(pool_key, tx); + let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); + if !h1_writev { + conn.set_write_strategy_flatten(); + } + let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); + executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?; + Ok(pooled) + }) + }) }; let race = checkout.select(connect) diff --git a/tests/client.rs b/tests/client.rs index c935e5e792..bbdb13320f 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -725,7 +725,7 @@ mod dispatch_impl { let handle = core.handle(); let closes = Arc::new(AtomicUsize::new(0)); let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &core.handle()), closes.clone())) .build(&handle); let (tx1, rx1) = oneshot::channel(); @@ -784,7 +784,7 @@ mod dispatch_impl { let res = { let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .build(&handle); client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -834,7 +834,7 @@ mod dispatch_impl { let uri = format!("http://{}/a", addr).parse().unwrap(); let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .build(&handle); let res = client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -883,7 +883,7 @@ mod dispatch_impl { let res = { let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .build(&handle); client.get(uri) }; @@ -927,7 +927,7 @@ mod dispatch_impl { let res = { let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .build(&handle); // notably, havent read body yet client.get(uri) @@ -966,7 +966,7 @@ mod dispatch_impl { let uri = format!("http://{}/a", addr).parse().unwrap(); let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .keep_alive(false) .build(&handle); let res = client.get(uri).and_then(move |res| { @@ -1005,7 +1005,7 @@ mod dispatch_impl { let uri = format!("http://{}/a", addr).parse().unwrap(); let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .build(&handle); let res = client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -1095,7 +1095,7 @@ mod dispatch_impl { let uri = format!("http://{}/a", addr).parse().unwrap(); let client = Client::configure() - .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone())) .executor(handle.clone()); let res = client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); @@ -1110,7 +1110,79 @@ mod dispatch_impl { assert_eq!(closes.load(Ordering::Relaxed), 1); } - struct DebugConnector(HttpConnector, Arc); + #[test] + fn idle_conn_prevents_connect_call() { + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let connector = DebugConnector::new(&handle); + let connects = connector.connects.clone(); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); + let _ = tx1.send(()); + + sock.read(&mut buf).expect("read 2"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); + let _ = tx2.send(()); + }); + + let uri: hyper::Uri = format!("http://{}/a", addr).parse().unwrap(); + + let client = Client::configure() + .connector(connector) + .build(&handle); + + let res = client.get(uri.clone()).and_then(move |res| { + assert_eq!(res.status(), hyper::StatusCode::Ok); + res.body().concat2() + }); + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + assert_eq!(connects.load(Ordering::Relaxed), 1); + + let res2 = client.get(uri).and_then(move |res| { + assert_eq!(res.status(), hyper::StatusCode::Ok); + res.body().concat2() + }); + let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + core.run(res2.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1); + } + + + struct DebugConnector { + http: HttpConnector, + closes: Arc, + connects: Arc, + } + + impl DebugConnector { + fn new(handle: &Handle) -> DebugConnector { + let http = HttpConnector::new(1, handle); + DebugConnector::with_http_and_closes(http, Arc::new(AtomicUsize::new(0))) + } + + fn with_http_and_closes(http: HttpConnector, closes: Arc) -> DebugConnector { + DebugConnector { + http: http, + closes: closes, + connects: Arc::new(AtomicUsize::new(0)), + } + } + } impl Service for DebugConnector { type Request = Uri; @@ -1119,8 +1191,11 @@ mod dispatch_impl { type Future = Box>; fn call(&self, uri: Uri) -> Self::Future { - let counter = self.1.clone(); - Box::new(self.0.call(uri).map(move |s| DebugStream(s, counter))) + self.connects.fetch_add(1, Ordering::SeqCst); + let closes = self.closes.clone(); + Box::new(self.http.call(uri).map(move |s| { + DebugStream(s, closes) + })) } }