Skip to content

Commit bcee4d6

Browse files
committed
feat(client): adds HttpInfo to responses when HttpConnector is used
- Adds `client::connect::Connected::extra()`, which allows connectors to specify arbitrary custom information about a connected transport. If a connector provides this extra value, it will be set in the `Response` extensions. Closes #1402
1 parent e06dc52 commit bcee4d6

File tree

4 files changed

+141
-25
lines changed

4 files changed

+141
-25
lines changed

Diff for: examples/hello.rs

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ static PHRASE: &'static [u8] = b"Hello World!";
1010

1111
fn main() {
1212
pretty_env_logger::init();
13-
1413
let addr = ([127, 0, 0, 1], 3000).into();
1514

1615
// new_service is run for each connection, creating a 'service'

Diff for: src/client/connect.rs

+115-16
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
//! establishes connections over TCP.
77
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
88
use std::error::Error as StdError;
9+
use std::fmt;
910
use std::mem;
1011

1112
use bytes::{BufMut, BytesMut};
1213
use futures::Future;
13-
use http::{uri, Uri};
14+
use http::{uri, Response, Uri};
1415
use tokio_io::{AsyncRead, AsyncWrite};
1516

16-
#[cfg(feature = "runtime")] pub use self::http::HttpConnector;
17+
#[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo};
1718

1819
/// Connect to a destination, returning an IO transport.
1920
///
@@ -42,12 +43,16 @@ pub struct Destination {
4243
///
4344
/// This can be used to inform recipients about things like if ALPN
4445
/// was used, or if connected to an HTTP proxy.
45-
#[derive(Debug)]
46+
#[derive(Clone, Debug)]
4647
pub struct Connected {
4748
//alpn: Alpn,
4849
pub(super) is_proxied: bool,
50+
pub(super) extra: Option<Extra>,
4951
}
5052

53+
pub(super) struct Extra(Box<FnClone>);
54+
55+
5156
/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
5257
#[derive(Debug)]
5358
pub(super) enum Alpn {
@@ -234,8 +239,8 @@ impl Connected {
234239
/// Create new `Connected` type with empty metadata.
235240
pub fn new() -> Connected {
236241
Connected {
237-
//alpn: Alpn::Http1,
238242
is_proxied: false,
243+
extra: None,
239244
}
240245
}
241246

@@ -251,6 +256,15 @@ impl Connected {
251256
self
252257
}
253258

259+
/// Set extra connection information to be set in the extensions of every `Response`.
260+
pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
261+
self.extra = Some(Extra(Box::new(move |res: &mut Response<::Body>| {
262+
let e = extra.clone();
263+
res.extensions_mut().insert(e);
264+
})));
265+
self
266+
}
267+
254268
/*
255269
/// Set that the connected transport negotiated HTTP/2 as it's
256270
/// next protocol.
@@ -261,6 +275,45 @@ impl Connected {
261275
*/
262276
}
263277

278+
// ===== impl Extra =====
279+
280+
impl Extra {
281+
pub(super) fn set(&self, res: &mut Response<::Body>) {
282+
self.0.call(res);
283+
}
284+
}
285+
286+
impl Clone for Extra {
287+
fn clone(&self) -> Extra {
288+
Extra(self.0.clone_fn())
289+
}
290+
}
291+
292+
impl fmt::Debug for Extra {
293+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294+
f.debug_struct("Extra")
295+
.finish()
296+
}
297+
}
298+
299+
trait FnClone: Send + Sync {
300+
fn clone_fn(&self) -> Box<FnClone>;
301+
fn call(&self, res: &mut Response<::Body>);
302+
}
303+
304+
impl<F> FnClone for F
305+
where
306+
F: Fn(&mut Response<::Body>) + Clone + Send + Sync + 'static
307+
{
308+
fn clone_fn(&self) -> Box<FnClone> {
309+
Box::new(self.clone())
310+
}
311+
312+
fn call(&self, res: &mut Response<::Body>) {
313+
(*self)(res)
314+
}
315+
}
316+
264317
#[cfg(test)]
265318
mod tests {
266319
use super::Destination;
@@ -436,14 +489,50 @@ mod http {
436489
/// A connector for the `http` scheme.
437490
///
438491
/// Performs DNS resolution in a thread pool, and then connects over TCP.
492+
///
493+
/// # Note
494+
///
495+
/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
496+
/// transport information such as the remote socket address used.
439497
#[derive(Clone)]
440498
pub struct HttpConnector {
441499
executor: HttpConnectExecutor,
442500
enforce_http: bool,
443501
handle: Option<Handle>,
444502
keep_alive_timeout: Option<Duration>,
445-
nodelay: bool,
446503
local_address: Option<IpAddr>,
504+
nodelay: bool,
505+
}
506+
507+
/// Extra information about the transport when an HttpConnector is used.
508+
///
509+
/// # Example
510+
///
511+
/// ```rust
512+
/// use hyper::client::{Client, connect::HttpInfo};
513+
/// use hyper::rt::Future;
514+
///
515+
/// let client = Client::new();
516+
///
517+
/// let fut = client.get("http://example.local".parse().unwrap())
518+
/// .inspect(|resp| {
519+
/// let info = resp
520+
/// .extensions()
521+
/// .get::<HttpInfo>()
522+
/// .expect("HttpConnector sets HttpInfo");
523+
///
524+
/// println!("remote addr = {}", info.remote_addr());
525+
/// });
526+
/// ```
527+
///
528+
/// # Note
529+
///
530+
/// If a different connector is used besides [`HttpConnector`](HttpConnector),
531+
/// this value will not exist in the extensions. Consult that specific
532+
/// connector to see what "extra" information it might provide to responses.
533+
#[derive(Clone, Debug)]
534+
pub struct HttpInfo {
535+
remote_addr: SocketAddr,
447536
}
448537

449538
impl HttpConnector {
@@ -600,6 +689,7 @@ mod http {
600689
}
601690
}
602691
}
692+
603693
/// A Future representing work to connect to a URL.
604694
#[must_use = "futures do nothing unless polled"]
605695
pub struct HttpConnecting {
@@ -640,16 +730,12 @@ mod http {
640730
}
641731
},
642732
State::Resolving(ref mut future, local_addr) => {
643-
match try!(future.poll()) {
644-
Async::NotReady => return Ok(Async::NotReady),
645-
Async::Ready(addrs) => {
646-
state = State::Connecting(ConnectingTcp {
647-
addrs: addrs,
648-
local_addr: local_addr,
649-
current: None,
650-
})
651-
}
652-
};
733+
let addrs = try_ready!(future.poll());
734+
state = State::Connecting(ConnectingTcp {
735+
addrs: addrs,
736+
local_addr: local_addr,
737+
current: None,
738+
});
653739
},
654740
State::Connecting(ref mut c) => {
655741
let sock = try_ready!(c.poll(&self.handle));
@@ -660,7 +746,13 @@ mod http {
660746

661747
sock.set_nodelay(self.nodelay)?;
662748

663-
return Ok(Async::Ready((sock, Connected::new())));
749+
let extra = HttpInfo {
750+
remote_addr: sock.peer_addr()?,
751+
};
752+
let connected = Connected::new()
753+
.extra(extra);
754+
755+
return Ok(Async::Ready((sock, connected)));
664756
},
665757
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
666758
}
@@ -710,6 +802,13 @@ mod http {
710802
}
711803
}
712804

805+
impl HttpInfo {
806+
/// Get the remote address of the transport used.
807+
pub fn remote_addr(&self) -> SocketAddr {
808+
self.remote_addr
809+
}
810+
}
811+
713812
// Make this Future unnameable outside of this crate.
714813
mod http_connector {
715814
use super::*;

Diff for: src/client/mod.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ use http::uri::Scheme;
9292
use body::{Body, Payload};
9393
use common::Exec;
9494
use common::lazy as hyper_lazy;
95-
use self::connect::{Connect, Destination};
95+
use self::connect::{Connect, Connected, Destination};
9696
use self::pool::{Pool, Poolable, Reservation};
9797

9898
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
@@ -304,7 +304,7 @@ where C: Connect + Sync + 'static,
304304
})
305305
.map(move |tx| {
306306
pool.pooled(connecting, PoolClient {
307-
is_proxied: connected.is_proxied,
307+
conn_info: connected,
308308
tx: match ver {
309309
Ver::Http1 => PoolTx::Http1(tx),
310310
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
@@ -387,7 +387,7 @@ where C: Connect + Sync + 'static,
387387
// CONNECT always sends origin-form, so check it first...
388388
if req.method() == &Method::CONNECT {
389389
authority_form(req.uri_mut());
390-
} else if pooled.is_proxied {
390+
} else if pooled.conn_info.is_proxied {
391391
absolute_form(req.uri_mut());
392392
} else {
393393
origin_form(req.uri_mut());
@@ -401,6 +401,14 @@ where C: Connect + Sync + 'static,
401401

402402
let fut = pooled.send_request_retryable(req);
403403

404+
let extra_info = pooled.conn_info.extra.clone();
405+
let fut = fut.map(move |mut res| {
406+
if let Some(extra) = extra_info {
407+
extra.set(&mut res);
408+
}
409+
res
410+
});
411+
404412
// As of [email protected], there is a race condition in the mpsc
405413
// channel, such that sending when the receiver is closing can
406414
// result in the message being stuck inside the queue. It won't
@@ -584,7 +592,7 @@ where
584592
}
585593

586594
struct PoolClient<B> {
587-
is_proxied: bool,
595+
conn_info: Connected,
588596
tx: PoolTx<B>,
589597
}
590598

@@ -644,17 +652,17 @@ where
644652
match self.tx {
645653
PoolTx::Http1(tx) => {
646654
Reservation::Unique(PoolClient {
647-
is_proxied: self.is_proxied,
655+
conn_info: self.conn_info,
648656
tx: PoolTx::Http1(tx),
649657
})
650658
},
651659
PoolTx::Http2(tx) => {
652660
let b = PoolClient {
653-
is_proxied: self.is_proxied,
661+
conn_info: self.conn_info.clone(),
654662
tx: PoolTx::Http2(tx.clone()),
655663
};
656664
let a = PoolClient {
657-
is_proxied: self.is_proxied,
665+
conn_info: self.conn_info,
658666
tx: PoolTx::Http2(tx),
659667
};
660668
Reservation::Shared(a, b)

Diff for: tests/client.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,17 @@ macro_rules! test {
274274

275275
let rx = rx.expect("thread panicked");
276276

277-
rt.block_on(res.join(rx).map(|r| r.0))
277+
rt.block_on(res.join(rx).map(|r| r.0)).map(move |mut resp| {
278+
// Always check that HttpConnector has set the "extra" info...
279+
let extra = resp
280+
.extensions_mut()
281+
.remove::<::hyper::client::connect::HttpInfo>()
282+
.expect("HttpConnector should set HttpInfo");
283+
284+
assert_eq!(extra.remote_addr(), addr, "HttpInfo should have server addr");
285+
286+
resp
287+
})
278288
});
279289
}
280290

0 commit comments

Comments
 (0)