diff --git a/src/client/connect.rs b/src/client/connect.rs index cfe6fd96a5..296c06d161 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,3 +1,10 @@ +//! The `Connect` trait, and supporting types. +//! +//! This module contains: +//! +//! - A default [`HttpConnector`](HttpConnector) that does DNS resolution and +//! establishes connections over TCP. +//! - The [`Connect`](Connect) trait and related types to build custom connectors. use std::error::Error as StdError; use std::fmt; use std::io; @@ -14,38 +21,121 @@ use http::uri::Scheme; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; use tokio::net::{TcpStream, TcpStreamNew}; -use tokio_service::Service; use super::dns; +use self::http_connector::HttpConnectorBlockingTask; -/// A connector creates an Io to a remote address.. +/// Connect to a destination, returning an IO transport. /// -/// This trait is not implemented directly, and only exists to make -/// the intent clearer. A connector should implement `Service` with -/// `Request=Uri` and `Response: Io` instead. -pub trait Connect: Service + 'static { - /// The connected Io Stream. - type Output: AsyncRead + AsyncWrite + 'static; - /// A Future that will resolve to the connected Stream. - type Future: Future + 'static; - /// Connect to a remote address. - fn connect(&self, Uri) -> ::Future; +/// A connector receives a [`Destination`](Destination) describing how a +/// connection should be estabilished, and returns a `Future` of the +/// ready connection. +pub trait Connect { + /// The connected IO Stream. + type Transport: AsyncRead + AsyncWrite + 'static; + /// An error occured when trying to connect. + type Error; + /// A Future that will resolve to the connected Transport. + type Future: Future; + /// Connect to a destination. + fn connect(&self, dst: Destination) -> Self::Future; } -impl Connect for T -where T: Service + 'static, - T::Response: AsyncRead + AsyncWrite, - T::Future: Future, -{ - type Output = T::Response; - type Future = T::Future; +/// A set of properties to describe where and how to try to connect. +#[derive(Debug)] +pub struct Destination { + //pub(super) alpn: Alpn, + pub(super) uri: Uri, +} + +/// Extra information about the connected transport. +/// +/// This can be used to inform recipients about things like if ALPN +/// was used, or if connected to an HTTP proxy. +#[derive(Debug)] +pub struct Connected { + //alpn: Alpn, + pub(super) is_proxied: bool, +} + +/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed +#[derive(Debug)] +pub(super) enum Alpn { + Http1, + //H2, + //Http1OrH2 +} +*/ + +impl Destination { + /// Get the protocol scheme. + #[inline] + pub fn scheme(&self) -> &str { + self.uri + .scheme_part() + .expect("destination uri has scheme") + .as_str() + } + + /// Get the hostname. + #[inline] + pub fn host(&self) -> &str { + self.uri + .host() + .expect("destination uri has host") + } + + /// Get the port, if specified. + #[inline] + pub fn port(&self) -> Option { + self.uri.port() + } + + /* + /// Returns whether this connection must negotiate HTTP/2 via ALPN. + pub fn must_h2(&self) -> bool { + match self.alpn { + Alpn::Http1 => false, + Alpn::H2 => true, + } + } + */ +} - fn connect(&self, url: Uri) -> ::Future { - self.call(url) +impl Connected { + /// Create new `Connected` type with empty metadata. + pub fn new() -> Connected { + Connected { + //alpn: Alpn::Http1, + is_proxied: false, + } } + + /// Set whether the connected transport is to an HTTP proxy. + /// + /// This setting will affect if HTTP/1 requests written on the transport + /// will have the request-target in absolute-form or origin-form (such as + /// `GET http://hyper.rs/guide HTTP/1.1` or `GET /guide HTTP/1.1`). + /// + /// Default is `false`. + pub fn proxy(mut self, is_proxied: bool) -> Connected { + self.is_proxied = is_proxied; + self + } + + /* + /// Set that the connected transport negotiated HTTP/2 as it's + /// next protocol. + pub fn h2(mut self) -> Connected { + self.alpn = Alpn::H2; + self + } + */ } /// A connector for the `http` scheme. +/// +/// Performs DNS resolution in a thread pool, and then connects over TCP. #[derive(Clone)] pub struct HttpConnector { executor: HttpConnectExecutor, @@ -109,30 +199,29 @@ impl fmt::Debug for HttpConnector { } } -impl Service for HttpConnector { - type Request = Uri; - type Response = TcpStream; +impl Connect for HttpConnector { + type Transport = TcpStream; type Error = io::Error; type Future = HttpConnecting; - fn call(&self, uri: Uri) -> Self::Future { - trace!("Http::connect({:?})", uri); + fn connect(&self, dst: Destination) -> Self::Future { + trace!("Http::connect({:?})", dst.uri); if self.enforce_http { - if uri.scheme_part() != Some(&Scheme::HTTP) { + if dst.uri.scheme_part() != Some(&Scheme::HTTP) { return invalid_url(InvalidUrl::NotHttp, &self.handle); } - } else if uri.scheme_part().is_none() { + } else if dst.uri.scheme_part().is_none() { return invalid_url(InvalidUrl::MissingScheme, &self.handle); } - let host = match uri.host() { + let host = match dst.uri.host() { Some(s) => s, None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle), }; - let port = match uri.port() { + let port = match dst.uri.port() { Some(port) => port, - None => if uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 }, + None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 }, }; HttpConnecting { @@ -191,7 +280,7 @@ enum State { } impl Future for HttpConnecting { - type Item = TcpStream; + type Item = (TcpStream, Connected); type Error = io::Error; fn poll(&mut self) -> Poll { @@ -230,7 +319,7 @@ impl Future for HttpConnecting { sock.set_keepalive(Some(dur))?; } - return Ok(Async::Ready(sock)); + return Ok(Async::Ready((sock, Connected::new()))); }, State::Error(ref mut e) => return Err(e.take().expect("polled more than once")), } @@ -279,23 +368,27 @@ impl ConnectingTcp { } } -/// Blocking task to be executed on a thread pool. -pub struct HttpConnectorBlockingTask { - work: oneshot::Execute -} +// Make this Future unnameable outside of this crate. +mod http_connector { + use super::*; + // Blocking task to be executed on a thread pool. + pub struct HttpConnectorBlockingTask { + pub(super) work: oneshot::Execute + } -impl fmt::Debug for HttpConnectorBlockingTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnectorBlockingTask") + impl fmt::Debug for HttpConnectorBlockingTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnectorBlockingTask") + } } -} -impl Future for HttpConnectorBlockingTask { - type Item = (); - type Error = (); + impl Future for HttpConnectorBlockingTask { + type Item = (); + type Error = (); - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() + fn poll(&mut self) -> Poll<(), ()> { + self.work.poll() + } } } @@ -311,35 +404,45 @@ impl Executor> for HttpConnectExecutor { #[cfg(test)] mod tests { + #![allow(deprecated)] use std::io; use tokio::reactor::Core; - use super::{Connect, HttpConnector}; + use super::{Connect, Destination, HttpConnector}; #[test] fn test_errors_missing_authority() { let mut core = Core::new().unwrap(); - let url = "/foo/bar?baz".parse().unwrap(); + let uri = "/foo/bar?baz".parse().unwrap(); + let dst = Destination { + uri, + }; let connector = HttpConnector::new(1, &core.handle()); - assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); } #[test] fn test_errors_enforce_http() { let mut core = Core::new().unwrap(); - let url = "https://example.domain/foo/bar?baz".parse().unwrap(); + let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); + let dst = Destination { + uri, + }; let connector = HttpConnector::new(1, &core.handle()); - assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); } #[test] fn test_errors_missing_scheme() { let mut core = Core::new().unwrap(); - let url = "example.domain".parse().unwrap(); + let uri = "example.domain".parse().unwrap(); + let dst = Destination { + uri, + }; let connector = HttpConnector::new(1, &core.handle()); - assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index d8a808e318..87f34f2bde 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -11,6 +11,7 @@ use futures::{Async, Future, Poll}; use futures::future::{self, Executor}; use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; +use http::uri::Scheme; use tokio::reactor::Handle; pub use tokio_service::Service; @@ -18,12 +19,13 @@ use proto::body::{Body, Entity}; use proto; use self::pool::Pool; -pub use self::connect::{HttpConnector, Connect}; +pub use self::connect::{Connect, HttpConnector}; use self::background::{bg, Background}; +use self::connect::Destination; pub mod conn; -mod connect; +pub mod connect; //TODO(easy): move cancel and dispatch into common instead pub(crate) mod dispatch; mod dns; @@ -101,7 +103,9 @@ impl Client { } impl Client -where C: Connect, +where C: Connect + 'static, + C::Transport: 'static, + C::Future: 'static, B: Entity + 'static, { @@ -180,13 +184,11 @@ where C: Connect, let client = self.clone(); - //TODO: let is_proxy = req.is_proxy(); let uri = req.uri().clone(); let fut = RetryableSendRequest { client: client, future: self.send_request(req, &domain), domain: domain, - //is_proxy: is_proxy, uri: uri, }; FutureResponse(Box::new(fut)) @@ -195,19 +197,6 @@ where C: Connect, //TODO: replace with `impl Future` when stable fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError>> { let url = req.uri().clone(); - - let path = match url.path_and_query() { - Some(path) => { - let mut parts = ::http::uri::Parts::default(); - parts.path_and_query = Some(path.clone()); - Uri::from_parts(parts).expect("path is valid uri") - }, - None => { - "/".parse().expect("/ is valid path") - } - }; - *req.uri_mut() = path; - let checkout = self.pool.checkout(domain); let connect = { let executor = self.executor.clone(); @@ -215,18 +204,23 @@ where C: Connect, let pool_key = Arc::new(domain.to_string()); let h1_writev = self.h1_writev; let connector = self.connector.clone(); + let dst = Destination { + uri: url, + }; future::lazy(move || { - connector.connect(url) + connector.connect(dst) .from_err() - .and_then(move |io| { + .and_then(move |(io, connected)| { conn::Builder::new() .h1_writev(h1_writev) .handshake_no_upgrades(io) - }).and_then(move |(tx, conn)| { - executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?; - Ok(pool.pooled(pool_key, PoolClient { - tx: tx, - })) + .and_then(move |(tx, conn)| { + executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?; + Ok(pool.pooled(pool_key, PoolClient { + is_proxied: connected.is_proxied, + tx: tx, + })) + }) }) }) }; @@ -245,13 +239,14 @@ where C: Connect, let executor = self.executor.clone(); let resp = race.and_then(move |mut pooled| { let conn_reused = pooled.is_reused(); + set_relative_uri(req.uri_mut(), pooled.is_proxied); let fut = pooled.tx.send_request_retryable(req) .map_err(move |(err, orig_req)| { if let Some(req) = orig_req { ClientError::Canceled { connection_reused: conn_reused, reason: err, - req: req, + req, } } else { ClientError::Normal(err) @@ -292,7 +287,8 @@ where C: Connect, } impl Service for Client -where C: Connect, +where C: Connect + 'static, + C::Future: 'static, B: Entity + 'static, { type Request = Request; @@ -348,13 +344,13 @@ struct RetryableSendRequest { client: Client, domain: String, future: Box, Error=ClientError>>, - //is_proxy: bool, uri: Uri, } impl Future for RetryableSendRequest where - C: Connect, + C: Connect + 'static, + C::Future: 'static, B: Entity + 'static, { type Item = Response; @@ -387,6 +383,7 @@ where } struct PoolClient { + is_proxied: bool, tx: conn::SendRequest, } @@ -399,7 +396,7 @@ where } } -pub(crate) enum ClientError { +enum ClientError { Normal(::Error), Canceled { connection_reused: bool, @@ -408,6 +405,23 @@ pub(crate) enum ClientError { } } +fn set_relative_uri(uri: &mut Uri, is_proxied: bool) { + if is_proxied && uri.scheme_part() != Some(&Scheme::HTTPS) { + return; + } + let path = match uri.path_and_query() { + Some(path) => { + let mut parts = ::http::uri::Parts::default(); + parts.path_and_query = Some(path.clone()); + Uri::from_parts(parts).expect("path is valid uri") + }, + None => { + "/".parse().expect("/ is valid path") + } + }; + *uri = path; +} + /// Configuration for a Client pub struct Config { _body_type: PhantomData, @@ -545,7 +559,9 @@ impl Config { } impl Config -where C: Connect, +where C: Connect, + C::Transport: 'static, + C::Future: 'static, B: Entity, { /// Construct the Client with this configuration. diff --git a/src/client/tests.rs b/src/client/tests.rs index 1f40e81671..ad2a16d779 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -14,8 +14,8 @@ fn retryable_request() { let mut connector = MockConnector::new(); - let sock1 = connector.mock("http://mock.local/a"); - let sock2 = connector.mock("http://mock.local/b"); + let sock1 = connector.mock("http://mock.local"); + let sock2 = connector.mock("http://mock.local"); let client = Client::configure() .connector(connector) @@ -62,7 +62,7 @@ fn conn_reset_after_write() { let mut connector = MockConnector::new(); - let sock1 = connector.mock("http://mock.local/a"); + let sock1 = connector.mock("http://mock.local"); let client = Client::configure() .connector(connector) diff --git a/src/mock.rs b/src/mock.rs index b8b8a55ea9..4bf49f2835 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -8,9 +8,8 @@ use bytes::Buf; use futures::{Async, Poll}; use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_service::Service; -use ::Uri; +use ::client::connect::{Connect, Connected, Destination}; #[derive(Debug)] pub struct MockCursor { @@ -410,19 +409,23 @@ impl MockConnector { } } -impl Service for MockConnector { - type Request = Uri; - type Response = Duplex; +impl Connect for MockConnector { + type Transport = Duplex; type Error = io::Error; - type Future = ::futures::future::FutureResult; + type Future = ::futures::future::FutureResult<(Self::Transport, Connected), Self::Error>; - fn call(&self, uri: Uri) -> Self::Future { + fn connect(&self, dst: Destination) -> Self::Future { use futures::future; - trace!("mock connect: {}", uri); + trace!("mock connect: {:?}", dst); + let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() { + format!(":{}", port) + } else { + "".to_owned() + }); let mut mocks = self.mocks.borrow_mut(); - let mocks = mocks.get_mut(&uri.to_string()) - .expect(&format!("unknown mocks uri: {}", uri)); - assert!(!mocks.is_empty(), "no additional mocks for {}", uri); - future::ok(mocks.remove(0)) + let mocks = mocks.get_mut(&key) + .expect(&format!("unknown mocks uri: {}", key)); + assert!(!mocks.is_empty(), "no additional mocks for {}", key); + future::ok((mocks.remove(0), Connected::new())) } } diff --git a/tests/client.rs b/tests/client.rs index f3bfd5626d..baf23379de 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -638,9 +638,8 @@ mod dispatch_impl { use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; - use hyper::client::HttpConnector; - use hyper::server::Service; - use hyper::{Client, Uri}; + use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; + use hyper::Client; use hyper; @@ -1264,11 +1263,51 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 2); } + #[test] + fn connect_proxy_sends_absolute_uri() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let connector = DebugConnector::new(&handle) + .proxy(); + + let client = Client::configure() + .connector(connector) + .build(&handle); + + let (tx1, rx1) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + let n = sock.read(&mut buf).expect("read 1"); + let expected = format!("GET http://{addr}/foo/bar HTTP/1.1\r\nhost: {addr}\r\n\r\n", addr=addr); + assert_eq!(s(&buf[..n]), expected); + + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); + let _ = tx1.send(()); + }); + + + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let req = Request::builder() + .uri(&*format!("http://{}/foo/bar", addr)) + .body(Body::empty()) + .unwrap(); + let res = client.request(req); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + } + struct DebugConnector { http: HttpConnector, closes: mpsc::Sender<()>, connects: Arc, + is_proxy: bool, } impl DebugConnector { @@ -1283,21 +1322,27 @@ mod dispatch_impl { http: http, closes: closes, connects: Arc::new(AtomicUsize::new(0)), + is_proxy: false, } } + + fn proxy(mut self) -> Self { + self.is_proxy = true; + self + } } - impl Service for DebugConnector { - type Request = Uri; - type Response = DebugStream; + impl Connect for DebugConnector { + type Transport = DebugStream; type Error = io::Error; - type Future = Box>; + type Future = Box>; - fn call(&self, uri: Uri) -> Self::Future { + fn connect(&self, dst: Destination) -> Self::Future { self.connects.fetch_add(1, Ordering::SeqCst); let closes = self.closes.clone(); - Box::new(self.http.call(uri).map(move |s| { - DebugStream(s, closes) + let is_proxy = self.is_proxy; + Box::new(self.http.connect(dst).map(move |(s, c)| { + (DebugStream(s, closes), c.proxy(is_proxy)) })) } }