From 68c6e38ab4e75e8e10889819f60c538a9e226e0d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 17 Oct 2018 16:01:28 -0700 Subject: [PATCH] feat(client): add `Resolve`, used by `HttpConnector` This introduces a `Resolve` trait to describe asynchronous DNS resolution. The `HttpConnector` can be configured with a resolver, allowing a user to still use all the functionality of the `HttpConnector`, while customizing the DNS resolution. To prevent a breaking change, the `HttpConnector` has its `Resolve` generic set by default to `GaiResolver`. This is same as the existing resolver, which uses `getaddrinfo` inside a thread pool. Closes #1517 --- src/client/connect/dns.rs | 118 ++++++++++++++++++++++++++----------- src/client/connect/http.rs | 108 +++++++++++++++++---------------- src/client/connect/mod.rs | 1 + tests/client.rs | 2 +- 4 files changed, 144 insertions(+), 85 deletions(-) diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index 5b4b439b62..c054a8ebad 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -9,52 +9,80 @@ use std::sync::Arc; use futures::{Async, Future, Poll}; use futures::future::{Executor, ExecuteError}; use futures::sync::oneshot; +use futures_cpupool::{Builder as CpuPoolBuilder}; use self::sealed::GaiTask; +/// Resolve a hostname to a set of IP addresses. pub trait Resolve { + /// The set of IP addresses to try to connect to. type Addrs: Iterator; + /// A Future of the resolved set of addresses. type Future: Future; - fn resolve(&self, host: &str) -> Self::Future; + /// Resolve a hostname. + fn resolve(&self, name: Name) -> Self::Future; } -pub struct DefaultResolver { - inner: GaiResolver, +/// A domain name to resolve into IP addresses. +pub struct Name { + host: String, } -pub struct DefaultAddrs; -pub struct DefaultFuture; - +/// A resolver using blocking `getaddrinfo` calls in a threadpool. +#[derive(Clone)] pub struct GaiResolver { executor: GaiExecutor, } -pub struct GaiAddrs; -pub struct GaiFuture; +pub struct GaiAddrs { + inner: IpAddrs, +} -impl Resolve for DefaultResolver { - type Addrs = DefaultAddrs; - type Future = DefaultFuture; +pub struct GaiFuture { + rx: oneshot::SpawnHandle, +} - fn resolve(&self, host: &str) -> Self::Future { - DefaultFuture +impl Name { + pub(super) fn new(host: String) -> Name { + Name { + host, + } } -} -impl Future for DefaultFuture { - type Item = DefaultAddrs; - type Error = io::Error; + /// View the hostname as a string slice. + pub fn as_str(&self) -> &str { + &self.host + } +} - fn poll(&mut self) -> Poll { - unimplemented!("DefaultFuture::poll"); +impl fmt::Debug for Name { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.host, f) } } -impl Iterator for DefaultAddrs { - type Item = IpAddr; +impl GaiResolver { + /// Construct a new `GaiResolver`. + /// + /// Takes number of DNS worker threads. + pub fn new(threads: usize) -> Self { + let pool = CpuPoolBuilder::new() + .name_prefix("hyper-dns") + .pool_size(threads) + .create(); + GaiResolver::new_with_executor(pool) + } - fn next(&mut self) -> Option { - unimplemented!("DefaultAddrs::next"); + /// Construct a new `GaiResolver` with a shared thread pool executor. + /// + /// Takes an executor to run blocking `getaddrinfo` tasks on. + pub fn new_with_executor(executor: E) -> Self + where + E: Executor + Send + Sync, + { + GaiResolver { + executor: GaiExecutor(Arc::new(executor)), + } } } @@ -62,8 +90,18 @@ impl Resolve for GaiResolver { type Addrs = GaiAddrs; type Future = GaiFuture; - fn resolve(&self, host: &str) -> Self::Future { - GaiFuture + fn resolve(&self, name: Name) -> Self::Future { + let blocking = GaiBlocking::new(name.host); + let rx = oneshot::spawn(blocking, &self.executor); + GaiFuture { + rx, + } + } +} + +impl fmt::Debug for GaiResolver { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiResolver") } } @@ -72,7 +110,16 @@ impl Future for GaiFuture { type Error = io::Error; fn poll(&mut self) -> Poll { - unimplemented!("GaiFuture::poll"); + let addrs = try_ready!(self.rx.poll()); + Ok(Async::Ready(GaiAddrs { + inner: addrs, + })) + } +} + +impl fmt::Debug for GaiFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiFuture") } } @@ -80,7 +127,13 @@ impl Iterator for GaiAddrs { type Item = IpAddr; fn next(&mut self) -> Option { - unimplemented!("GaiAddrs::next"); + self.inner.next().map(|sa| sa.ip()) + } +} + +impl fmt::Debug for GaiAddrs { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiAddrs") } } @@ -96,12 +149,11 @@ impl Executor> for GaiExecutor { pub(super) struct GaiBlocking { host: String, - port: u16 } impl GaiBlocking { - pub(super) fn new(host: String, port: u16) -> GaiBlocking { - GaiBlocking { host: host, port: port } + pub(super) fn new(host: String) -> GaiBlocking { + GaiBlocking { host } } } @@ -110,8 +162,8 @@ impl Future for GaiBlocking { type Error = io::Error; fn poll(&mut self) -> Poll { - debug!("resolving host={:?}, port={:?}", self.host, self.port); - (&*self.host, self.port).to_socket_addrs() + debug!("resolving host={:?}", self.host); + (&*self.host, 0).to_socket_addrs() .map(|i| Async::Ready(IpAddrs { iter: i })) } } @@ -164,7 +216,7 @@ impl Iterator for IpAddrs { } // Make this Future unnameable outside of this crate. -mod sealed { +pub(super) mod sealed { use super::*; // Blocking task to be executed on a thread pool. pub struct GaiTask { diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index e14e8f601a..2f92e5e5b6 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -4,12 +4,10 @@ use std::error::Error as StdError; use std::io; use std::mem; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll}; -use futures::future::{Executor, ExecuteError}; -use futures_cpupool::{Builder as CpuPoolBuilder}; +use futures::future::{Executor}; use http::uri::Scheme; use net2::TcpBuilder; use tokio_reactor::Handle; @@ -17,7 +15,7 @@ use tokio_tcp::{TcpStream, ConnectFuture}; use tokio_timer::Delay; use super::{Connect, Connected, Destination}; -use super::dns::{DefaultResolver, Resolve}; +use super::dns::{self, GaiResolver, Resolve}; /// A connector for the `http` scheme. /// @@ -28,7 +26,7 @@ use super::dns::{DefaultResolver, Resolve}; /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes /// transport information such as the remote socket address used. #[derive(Clone)] -pub struct HttpConnector { +pub struct HttpConnector { enforce_http: bool, handle: Option, happy_eyeballs_timeout: Option, @@ -76,43 +74,49 @@ impl HttpConnector { /// Takes number of DNS worker threads. #[inline] pub fn new(threads: usize) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, None) + HttpConnector::new_with_resolver(GaiResolver::new(threads)) } - /// Construct a new HttpConnector with a specific Tokio handle. + #[doc(hidden)] + #[deprecated(note = "Use HttpConnector::set_reactor to set a reactor handle")] pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, Some(handle)) - } - - fn new_with_handle_opt(threads: usize, handle: Option) -> HttpConnector { - let pool = CpuPoolBuilder::new() - .name_prefix("hyper-dns") - .pool_size(threads) - .create(); - HttpConnector::new_with_executor(pool, handle) + let resolver = GaiResolver::new(threads); + let mut http = HttpConnector::new_with_resolver(resolver); + http.set_reactor(Some(handle)); + http } /// Construct a new HttpConnector. /// - /// Takes an executor to run blocking tasks on. + /// Takes an executor to run blocking `getaddrinfo` tasks on. pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector - where E: Executor + Send + Sync + where E: Executor + Send + Sync { + let resolver = GaiResolver::new_with_executor(executor); + let mut http = HttpConnector::new_with_resolver(resolver); + http.set_reactor(handle); + http + } +} + + +impl HttpConnector { + /// Construct a new HttpConnector. + /// + /// Takes a `Resolve` to handle DNS lookups. + pub fn new_with_resolver(resolver: R) -> HttpConnector { HttpConnector { - executor: HttpConnectExecutor(Arc::new(executor)), enforce_http: true, - handle, + handle: None, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), keep_alive_timeout: None, - nodelay: false, local_address: None, - happy_eyeballs_timeout: Some(Duration::from_millis(300)), + nodelay: false, + resolver, reuse_address: false, } } -} - -impl HttpConnector { /// Option to enforce all `Uri`s have the `http` scheme. /// /// Enabled by default. @@ -121,6 +125,14 @@ impl HttpConnector { self.enforce_http = is_enforced; } + /// Set a handle to a `Reactor` to register connections to. + /// + /// If `None`, the implicit default reactor will be used. + #[inline] + pub fn set_reactor(&mut self, handle: Option) { + self.handle = handle; + } + /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. /// /// If `None`, the option will not be set. @@ -184,10 +196,14 @@ impl fmt::Debug for HttpConnector { } } -impl Connect for HttpConnector { +impl Connect for HttpConnector +where + R: Resolve + Clone + Send + Sync, + R::Future: Send, +{ type Transport = TcpStream; type Error = io::Error; - type Future = HttpConnecting; + type Future = HttpConnecting; fn connect(&self, dst: Destination) -> Self::Future { trace!( @@ -215,7 +231,7 @@ impl Connect for HttpConnector { }; HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), self.local_address), + state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, @@ -234,7 +250,7 @@ impl HttpInfo { } #[inline] -fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { +fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { HttpConnecting { state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), handle: handle.clone(), @@ -270,7 +286,7 @@ impl StdError for InvalidUrl { } /// A Future representing work to connect to a URL. #[must_use = "futures do nothing unless polled"] -pub struct HttpConnecting { +pub struct HttpConnecting { state: State, handle: Option, happy_eyeballs_timeout: Option, @@ -295,22 +311,28 @@ impl Future for HttpConnecting { loop { let state; match self.state { - State::Lazy(ref resolver, ref host, local_addr) => { + State::Lazy(ref resolver, ref mut host, local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host) { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, self.port, self.happy_eyeballs_timeout, self.reuse_address)); + local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } else { - state = State::Resolving(resolver.resolve(host), local_addr); + let name = dns::Name::new(mem::replace(host, String::new())); + state = State::Resolving(resolver.resolve(name), local_addr); } }, State::Resolving(ref mut future, local_addr) => { match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { + let port = self.port; + let addrs = addrs + .map(|addr| SocketAddr::new(addr, port)) + .collect(); + let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, self.port, self.happy_eyeballs_timeout, self.reuse_address)); + local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } }; }, @@ -338,7 +360,7 @@ impl Future for HttpConnecting { } } -impl fmt::Debug for HttpConnecting { +impl fmt::Debug for HttpConnecting { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.pad("HttpConnecting") } @@ -346,7 +368,6 @@ impl fmt::Debug for HttpConnecting { struct ConnectingTcp { local_addr: Option, - port: u16, preferred: ConnectingTcpRemote, fallback: Option, reuse_address: bool, @@ -356,7 +377,6 @@ impl ConnectingTcp { fn new( local_addr: Option, remote_addrs: dns::IpAddrs, - port: u16, fallback_timeout: Option, reuse_address: bool, ) -> ConnectingTcp { @@ -365,7 +385,6 @@ impl ConnectingTcp { if fallback_addrs.is_empty() { return ConnectingTcp { local_addr, - port, preferred: ConnectingTcpRemote::new(preferred_addrs), fallback: None, reuse_address, @@ -374,7 +393,6 @@ impl ConnectingTcp { ConnectingTcp { local_addr, - port, preferred: ConnectingTcpRemote::new(preferred_addrs), fallback: Some(ConnectingTcpFallback { delay: Delay::new(Instant::now() + fallback_timeout), @@ -385,7 +403,6 @@ impl ConnectingTcp { } else { ConnectingTcp { local_addr, - port, preferred: ConnectingTcpRemote::new(remote_addrs), fallback: None, reuse_address, @@ -530,17 +547,6 @@ impl ConnectingTcp { } } - -#[derive(Clone)] -struct HttpConnectExecutor(Arc + Send + Sync>); - -impl Executor> for HttpConnectExecutor { - fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { - self.0.execute(HttpConnectorBlockingTask { work: future }) - .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) - } -} - #[cfg(test)] mod tests { use std::io; diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index a67d6d1079..ec0aeac163 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -15,6 +15,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] mod dns; #[cfg(feature = "runtime")] mod http; +#[cfg(feature = "runtime")] pub use self::dns::{GaiResolver, Name, Resolve}; #[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo}; /// Connect to a destination, returning an IO transport. diff --git a/tests/client.rs b/tests/client.rs index fa48d89919..ef08929501 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -226,7 +226,7 @@ macro_rules! test { let addr = server.local_addr().expect("local_addr"); let mut rt = $runtime; - let connector = ::hyper::client::HttpConnector::new_with_handle(1, Handle::default()); + let connector = ::hyper::client::HttpConnector::new(1); let client = Client::builder() .set_host($set_host) .http1_title_case_headers($title_case_headers)