From 4d7a2266b88b2c5c92231bcd2bd75d5842198add Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 4 Dec 2019 15:17:49 -0800 Subject: [PATCH] feat(client): change connectors to return an `impl Connection` Instead of returning a tuple `(impl AsyncRead + AsyncWrite, Connected)`, this adds a new trait, `hyper::client::connect::Connection`, which allows querying the connection type for a `Connected`. BREAKING CHANGE: Connectors no longer return a tuple of `(T, Connected)`, but a single `T: Connection`. --- examples/client.rs | 3 +-- src/client/connect/http.rs | 32 ++++++++++++++++------------ src/client/connect/mod.rs | 20 +++++++++++------- src/client/mod.rs | 5 +++-- tests/client.rs | 43 +++++++++++++++++++++++++++----------- tests/server.rs | 2 +- 6 files changed, 68 insertions(+), 37 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index 2057bc74d1..f488bc8916 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -3,8 +3,7 @@ use std::env; use std::io::{self, Write}; -use hyper::Client; -use futures_util::StreamExt; +use hyper::{Client, body::HttpBody as _}; // A simple type alias so as to DRY. type Result = std::result::Result>; diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 9ab38cd7cf..4daeeca3c9 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -17,7 +17,7 @@ use tokio::net::TcpStream; use tokio::time::Delay; use super::dns::{self, resolve, GaiResolver, Resolve}; -use super::{Connected}; +use super::{Connected, Connection}; //#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver; @@ -234,7 +234,7 @@ where R: Resolve + Clone + Send + Sync + 'static, R::Future: Send, { - type Response = (TcpStream, Connected); + type Response = TcpStream; type Error = ConnectError; type Future = HttpConnecting; @@ -259,7 +259,7 @@ where async fn call_async( &mut self, dst: Uri, - ) -> Result<(TcpStream, Connected), ConnectError> { + ) -> Result { trace!( "Http::connect; scheme={:?}, host={:?}, port={:?}", dst.scheme(), @@ -340,14 +340,20 @@ where sock.set_nodelay(config.nodelay) .map_err(ConnectError::m("tcp set_nodelay error"))?; - let extra = HttpInfo { - remote_addr: sock - .peer_addr() - .map_err(ConnectError::m("tcp peer_addr error"))?, - }; - let connected = Connected::new().extra(extra); + Ok(sock) + } +} - Ok((sock, connected)) +impl Connection for TcpStream { + fn connected(&self) -> Connected { + let connected = Connected::new(); + if let Ok(remote_addr) = self.peer_addr() { + connected.extra(HttpInfo { + remote_addr, + }) + } else { + connected + } } } @@ -372,7 +378,7 @@ pub struct HttpConnecting { _marker: PhantomData, } -type ConnectResult = Result<(TcpStream, Connected), ConnectError>; +type ConnectResult = Result; type BoxConnecting = Pin + Send>>; impl Future for HttpConnecting { @@ -644,12 +650,12 @@ mod tests { use ::http::Uri; use super::super::sealed::Connect; - use super::{Connected, HttpConnector}; + use super::HttpConnector; async fn connect( connector: C, dst: Uri, - ) -> Result<(C::Transport, Connected), C::Error> + ) -> Result where C: Connect, { diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index b6af79235e..0d53395285 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -13,6 +13,12 @@ use ::http::{Response}; #[cfg(feature = "tcp")] mod http; #[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo}; +/// Describes a type returned by a connector. +pub trait Connection { + /// Return metadata describing the connection. + fn connected(&self) -> Connected; +} + /// Extra information about the connected transport. /// /// This can be used to inform recipients about things like if ALPN @@ -167,7 +173,7 @@ pub(super) mod sealed { use tokio::io::{AsyncRead, AsyncWrite}; use crate::common::{Future, Unpin}; - use super::{Connected}; + use super::{Connection}; /// Connect to a destination, returning an IO transport. /// @@ -183,21 +189,21 @@ pub(super) mod sealed { // fit the `Connect` bounds because of the blanket impl for `Service`. pub trait Connect: Sealed + Sized { /// The connected IO Stream. - type Transport: AsyncRead + AsyncWrite; + type Transport: AsyncRead + AsyncWrite + Connection; /// An error occured when trying to connect. type Error: Into>; /// A Future that will resolve to the connected Transport. - type Future: Future>; + type Future: Future>; #[doc(hidden)] fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future; } impl Connect for S where - S: tower_service::Service + Send, + S: tower_service::Service + Send, S::Error: Into>, S::Future: Unpin + Send, - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { type Transport = T; type Error = S::Error; @@ -209,10 +215,10 @@ pub(super) mod sealed { impl Sealed for S where - S: tower_service::Service + Send, + S: tower_service::Service + Send, S::Error: Into>, S::Future: Unpin + Send, - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, {} pub trait Sealed {} diff --git a/src/client/mod.rs b/src/client/mod.rs index c980d1b449..6d0c4b0098 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -71,7 +71,7 @@ use http::uri::Scheme; use crate::body::{Body, Payload}; use crate::common::{lazy as hyper_lazy, BoxSendFuture, Executor, Lazy, Future, Pin, Poll, task}; -use self::connect::{Alpn, sealed::Connect, Connected}; +use self::connect::{Alpn, sealed::Connect, Connected, Connection}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; #[cfg(feature = "tcp")] pub use self::connect::HttpConnector; @@ -478,7 +478,8 @@ where C: Connect + Clone + Send + Sync + 'static, }; Either::Left(connector.connect(connect::sealed::Internal, dst) .map_err(crate::Error::new_connect) - .and_then(move |(io, connected)| { + .and_then(move |io| { + let connected = io.connected(); // If ALPN is h2 and we aren't http2_only already, // then we need to convert our pool checkout into // a single HTTP2 one. diff --git a/tests/client.rs b/tests/client.rs index 567728ac3f..03c0da2e10 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -945,7 +945,7 @@ mod dispatch_impl { use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; - use hyper::client::connect::{Connected, HttpConnector}; + use hyper::client::connect::{Connected, Connection, HttpConnector}; use hyper::Client; #[test] @@ -1740,7 +1740,7 @@ mod dispatch_impl { } impl hyper::service::Service for DebugConnector { - type Response = (DebugStream, Connected); + type Response = DebugStream; type Error = >::Error; type Future = Pin @@ -1756,30 +1756,37 @@ mod dispatch_impl { let closes = self.closes.clone(); let is_proxy = self.is_proxy; let is_alpn_h2 = self.alpn_h2; - Box::pin(self.http.call(dst).map_ok(move |(s, mut c)| { - if is_alpn_h2 { - c = c.negotiated_h2(); + Box::pin(self.http.call(dst).map_ok(move |tcp| { + DebugStream { + tcp, + on_drop: closes, + is_alpn_h2, + is_proxy, } - (DebugStream(s, closes), c.proxy(is_proxy)) })) } } - struct DebugStream(TcpStream, mpsc::Sender<()>); + struct DebugStream { + tcp: TcpStream, + on_drop: mpsc::Sender<()>, + is_alpn_h2: bool, + is_proxy: bool, + } impl Drop for DebugStream { fn drop(&mut self) { - let _ = self.1.try_send(()); + let _ = self.on_drop.try_send(()); } } impl AsyncWrite for DebugStream { fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) + Pin::new(&mut self.tcp).poll_shutdown(cx) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) + Pin::new(&mut self.tcp).poll_flush(cx) } fn poll_write( @@ -1787,7 +1794,7 @@ mod dispatch_impl { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) + Pin::new(&mut self.tcp).poll_write(cx, buf) } } @@ -1797,7 +1804,19 @@ mod dispatch_impl { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) + Pin::new(&mut self.tcp).poll_read(cx, buf) + } + } + + impl Connection for DebugStream { + fn connected(&self) -> Connected { + let connected = self.tcp.connected().proxy(self.is_proxy); + + if self.is_alpn_h2 { + connected.negotiated_h2() + } else { + connected + } } } } diff --git a/tests/server.rs b/tests/server.rs index b595895cf0..c43f4e1ab3 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1779,7 +1779,7 @@ impl tower_service::Service> for TestService { let replies = self.reply.clone(); Box::pin(async move { - while let Some(chunk) = req.body_mut().next().await { + while let Some(chunk) = hyper::body::HttpBody::next(req.body_mut()).await { match chunk { Ok(chunk) => { tx.send(Msg::Chunk(chunk.to_vec())).unwrap();