Skip to content

Commit 70782c5

Browse files
committed
feat(client): add Resolve, used by HttpConnector
This introduces a `Resolve` trait to describe asynchronous DNS resolution. The `HttpConnector` can be configured with a resolver, allowing a user to still use all the functionality of the `HttpConnector`, while customizing the DNS resolution. To prevent a breaking change, the `HttpConnector` has its `Resolve` generic set by default to `GaiResolver`. This is same as the existing resolver, which uses `getaddrinfo` inside a thread pool. Closes #1517
1 parent 8bfe3c2 commit 70782c5

File tree

4 files changed

+253
-100
lines changed

4 files changed

+253
-100
lines changed

Diff for: src/client/connect/dns.rs

+179-17
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,183 @@
1-
use std::io;
1+
use std::{fmt, io, vec};
22
use std::net::{
3-
Ipv4Addr, Ipv6Addr,
3+
IpAddr, Ipv4Addr, Ipv6Addr,
44
SocketAddr, ToSocketAddrs,
55
SocketAddrV4, SocketAddrV6,
66
};
7-
use std::vec;
7+
use std::sync::Arc;
88

9-
use ::futures::{Async, Future, Poll};
9+
use futures::{Async, Future, Poll};
10+
use futures::future::{Executor, ExecuteError};
11+
use futures::sync::oneshot;
12+
use futures_cpupool::{Builder as CpuPoolBuilder};
1013

11-
pub struct Work {
14+
use self::sealed::GaiTask;
15+
16+
/// Resolve a hostname to a set of IP addresses.
17+
pub trait Resolve {
18+
/// The set of IP addresses to try to connect to.
19+
type Addrs: Iterator<Item=IpAddr>;
20+
/// A Future of the resolved set of addresses.
21+
type Future: Future<Item=Self::Addrs, Error=io::Error>;
22+
/// Resolve a hostname.
23+
fn resolve(&self, name: Name) -> Self::Future;
24+
}
25+
26+
/// A domain name to resolve into IP addresses.
27+
pub struct Name {
28+
host: String,
29+
}
30+
31+
/// A resolver using blocking `getaddrinfo` calls in a threadpool.
32+
#[derive(Clone)]
33+
pub struct GaiResolver {
34+
executor: GaiExecutor,
35+
}
36+
37+
pub struct GaiAddrs {
38+
inner: IpAddrs,
39+
}
40+
41+
pub struct GaiFuture {
42+
rx: oneshot::SpawnHandle<IpAddrs, io::Error>,
43+
}
44+
45+
impl Name {
46+
pub(super) fn new(host: String) -> Name {
47+
Name {
48+
host,
49+
}
50+
}
51+
52+
/// View the hostname as a string slice.
53+
pub fn as_str(&self) -> &str {
54+
&self.host
55+
}
56+
}
57+
58+
impl fmt::Debug for Name {
59+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60+
fmt::Debug::fmt(&self.host, f)
61+
}
62+
}
63+
64+
impl GaiResolver {
65+
/// Construct a new `GaiResolver`.
66+
///
67+
/// Takes number of DNS worker threads.
68+
pub fn new(threads: usize) -> Self {
69+
let pool = CpuPoolBuilder::new()
70+
.name_prefix("hyper-dns")
71+
.pool_size(threads)
72+
.create();
73+
GaiResolver::new_with_executor(pool)
74+
}
75+
76+
/// Construct a new `GaiResolver` with a shared thread pool executor.
77+
///
78+
/// Takes an executor to run blocking `getaddrinfo` tasks on.
79+
pub fn new_with_executor<E: 'static>(executor: E) -> Self
80+
where
81+
E: Executor<GaiTask> + Send + Sync,
82+
{
83+
GaiResolver {
84+
executor: GaiExecutor(Arc::new(executor)),
85+
}
86+
}
87+
}
88+
89+
impl Resolve for GaiResolver {
90+
type Addrs = GaiAddrs;
91+
type Future = GaiFuture;
92+
93+
fn resolve(&self, name: Name) -> Self::Future {
94+
let blocking = GaiBlocking::new(name.host);
95+
let rx = oneshot::spawn(blocking, &self.executor);
96+
GaiFuture {
97+
rx,
98+
}
99+
}
100+
}
101+
102+
impl fmt::Debug for GaiResolver {
103+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104+
f.pad("GaiResolver")
105+
}
106+
}
107+
108+
impl Future for GaiFuture {
109+
type Item = GaiAddrs;
110+
type Error = io::Error;
111+
112+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
113+
let addrs = try_ready!(self.rx.poll());
114+
Ok(Async::Ready(GaiAddrs {
115+
inner: addrs,
116+
}))
117+
}
118+
}
119+
120+
impl fmt::Debug for GaiFuture {
121+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122+
f.pad("GaiFuture")
123+
}
124+
}
125+
126+
impl Iterator for GaiAddrs {
127+
type Item = IpAddr;
128+
129+
fn next(&mut self) -> Option<Self::Item> {
130+
self.inner.next().map(|sa| sa.ip())
131+
}
132+
}
133+
134+
impl fmt::Debug for GaiAddrs {
135+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
136+
f.pad("GaiAddrs")
137+
}
138+
}
139+
140+
#[derive(Clone)]
141+
struct GaiExecutor(Arc<Executor<GaiTask> + Send + Sync>);
142+
143+
impl Executor<oneshot::Execute<GaiBlocking>> for GaiExecutor {
144+
fn execute(&self, future: oneshot::Execute<GaiBlocking>) -> Result<(), ExecuteError<oneshot::Execute<GaiBlocking>>> {
145+
self.0.execute(GaiTask { work: future })
146+
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
147+
}
148+
}
149+
150+
pub(super) struct GaiBlocking {
12151
host: String,
13-
port: u16
14152
}
15153

16-
impl Work {
17-
pub fn new(host: String, port: u16) -> Work {
18-
Work { host: host, port: port }
154+
impl GaiBlocking {
155+
pub(super) fn new(host: String) -> GaiBlocking {
156+
GaiBlocking { host }
19157
}
20158
}
21159

22-
impl Future for Work {
160+
impl Future for GaiBlocking {
23161
type Item = IpAddrs;
24162
type Error = io::Error;
25163

26164
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
27-
debug!("resolving host={:?}, port={:?}", self.host, self.port);
28-
(&*self.host, self.port).to_socket_addrs()
165+
debug!("resolving host={:?}", self.host);
166+
(&*self.host, 0).to_socket_addrs()
29167
.map(|i| Async::Ready(IpAddrs { iter: i }))
30168
}
31169
}
32170

33-
pub struct IpAddrs {
171+
pub(super) struct IpAddrs {
34172
iter: vec::IntoIter<SocketAddr>,
35173
}
36174

37175
impl IpAddrs {
38-
pub fn new(addrs: Vec<SocketAddr>) -> Self {
176+
pub(super) fn new(addrs: Vec<SocketAddr>) -> Self {
39177
IpAddrs { iter: addrs.into_iter() }
40178
}
41179

42-
pub fn try_parse(host: &str, port: u16) -> Option<IpAddrs> {
180+
pub(super) fn try_parse(host: &str, port: u16) -> Option<IpAddrs> {
43181
if let Ok(addr) = host.parse::<Ipv4Addr>() {
44182
let addr = SocketAddrV4::new(addr, port);
45183
return Some(IpAddrs { iter: vec![SocketAddr::V4(addr)].into_iter() })
@@ -51,7 +189,7 @@ impl IpAddrs {
51189
None
52190
}
53191

54-
pub fn split_by_preference(self) -> (IpAddrs, IpAddrs) {
192+
pub(super) fn split_by_preference(self) -> (IpAddrs, IpAddrs) {
55193
let preferring_v6 = self.iter
56194
.as_slice()
57195
.first()
@@ -64,7 +202,7 @@ impl IpAddrs {
64202
(IpAddrs::new(preferred), IpAddrs::new(fallback))
65203
}
66204

67-
pub fn is_empty(&self) -> bool {
205+
pub(super) fn is_empty(&self) -> bool {
68206
self.iter.as_slice().is_empty()
69207
}
70208
}
@@ -77,6 +215,30 @@ impl Iterator for IpAddrs {
77215
}
78216
}
79217

218+
// Make this Future unnameable outside of this crate.
219+
pub(super) mod sealed {
220+
use super::*;
221+
// Blocking task to be executed on a thread pool.
222+
pub struct GaiTask {
223+
pub(super) work: oneshot::Execute<GaiBlocking>
224+
}
225+
226+
impl fmt::Debug for GaiTask {
227+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
228+
f.pad("GaiTask")
229+
}
230+
}
231+
232+
impl Future for GaiTask {
233+
type Item = ();
234+
type Error = ();
235+
236+
fn poll(&mut self) -> Poll<(), ()> {
237+
self.work.poll()
238+
}
239+
}
240+
}
241+
80242
#[cfg(test)]
81243
mod tests {
82244
use std::net::{Ipv4Addr, Ipv6Addr};

0 commit comments

Comments
 (0)