From 955004c5b65c27a5b40db6175132a5ae830a1383 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 26 Jan 2018 15:44:49 -0800 Subject: [PATCH] feat(client): redesign the `Connect` trait The original `Connect` trait had some limitations: - There was no way to provide more details to the connector about how to connect, other than the `Uri`. - There was no way for the connector to return any extra information about the connected transport. - The `Error` was forced to be an `std::io::Error`. - The transport and future had `'static` requirements. As hyper gains HTTP/2 support, some of these things needed to be changed. We want to allow the user to configure whether they hope to us ALPN to start an HTTP/2 connection, and the connector needs to be able to return back to hyper if it did so. The new `Connect2` trait is meant to solve this. - The `connect` method now receives a `Destination` type, instead of a `Uri`. This allows us to include additional data about how to connect. - The `Future` returned from `connect` now must be a `Connected`, which wraps the transport, and includes possibly extra data about what happened when connecting. The `Connect` trait is deprecated, with the hopes of `Connect2` taking it's place in the next breaking release. For backwards compatibility, any type that implements `Connect` now will automaticall implement `Connect2`, ignoring any of the extra data from `Destination` and `Connected`. --- src/client/compat.rs | 8 +- src/client/connect.rs | 212 +++++++++++++++++++++++++++++++++--------- src/client/mod.rs | 29 ++++-- 3 files changed, 196 insertions(+), 53 deletions(-) diff --git a/src/client/compat.rs b/src/client/compat.rs index 26399e82aa..64ce1a2aef 100644 --- a/src/client/compat.rs +++ b/src/client/compat.rs @@ -1,10 +1,12 @@ //! Wrappers to build compatibility with the `http` crate. +use std::io; + use futures::{Future, Poll, Stream}; use http; use tokio_service::Service; -use client::{Connect, Client, FutureResponse}; +use client::{Connect2, Client, FutureResponse}; use error::Error; use proto::Body; @@ -19,7 +21,9 @@ pub(super) fn client(client: Client) -> CompatClient { } impl Service for CompatClient -where C: Connect, +where C: Connect2, + C::Transport: 'static, + C::Future: 'static, B: Stream + 'static, B::Item: AsRef<[u8]>, { diff --git a/src/client/connect.rs b/src/client/connect.rs index 808c6b2115..c240083418 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,9 +1,9 @@ +//! Contains the `Connect2` trait, and supporting types. use std::error::Error as StdError; use std::fmt; use std::io; use std::mem; use std::sync::Arc; -//use std::net::SocketAddr; use futures::{Future, Poll, Async}; use futures::future::{Executor, ExecuteError}; @@ -16,31 +16,74 @@ use tokio_service::Service; use Uri; use super::dns; +use self::http_connector::HttpConnectorBlockingTask; + +/// Connect to a destination, returning an IO transport. +pub trait Connect2 { + /// The connected IO Stream. + type Transport: AsyncRead + AsyncWrite; + /// An error occured when trying to connect. + type Error; + /// A Future that will resolve to the connected Transport. + type Future: Future, Error=Self::Error>; + /// Connect to a destination. + fn connect(&self, dst: Destination) -> Self::Future; +} -/// A connector creates an Io to a remote address.. -/// -/// This trait is not implemented directly, and only exists to make -/// the intent clearer. A connector should implement `Service` with -/// `Request=Uri` and `Response: Io` instead. -pub trait Connect: Service + 'static { - /// The connected Io Stream. - type Output: AsyncRead + AsyncWrite + 'static; - /// A Future that will resolve to the connected Stream. - type Future: Future + 'static; - /// Connect to a remote address. - fn connect(&self, Uri) -> ::Future; +/// A set of properties to describe where and how to try to connect. +#[derive(Debug)] +pub struct Destination { + pub(super) alpn: Alpn, + pub(super) uri: Uri, } -impl Connect for T -where T: Service + 'static, - T::Response: AsyncRead + AsyncWrite, - T::Future: Future, -{ - type Output = T::Response; - type Future = T::Future; +/// Extra information about the connected transport. +#[derive(Debug)] +pub struct Connected { + alpn: Alpn, + pub(super) transport: T, +} - fn connect(&self, url: Uri) -> ::Future { - self.call(url) +#[derive(Debug)] +pub(super) enum Alpn { + Http1, + H2, +} + +impl Destination { + /// Get a reference to the requested `Uri`. + pub fn uri(&self) -> &Uri { + &self.uri + } + + /// Returns whether this connection must negotiate HTTP/2 via ALPN. + pub fn h2(&self) -> bool { + match self.alpn { + Alpn::Http1 => false, + Alpn::H2 => true, + } + } +} + +impl Connected { + /// Create new `Connected` type with empty metadata. + pub fn new(transport: T) -> Connected { + Connected { + alpn: Alpn::Http1, + transport: transport, + } + } + + /// Convert into the underlyinng Transport. + pub fn into_transport(self) -> T { + self.transport + } + + /// Set that the connected transport negotiated HTTP/2 as it's + /// next protocol. + pub fn h2(&mut self) -> &mut Connected { + self.alpn = Alpn::H2; + self } } @@ -96,6 +139,8 @@ impl fmt::Debug for HttpConnector { } } +// deprecated, will be gone in 0.12 +#[doc(hidden)] impl Service for HttpConnector { type Request = Uri; type Response = TcpStream; @@ -258,23 +303,27 @@ impl ConnectingTcp { } } -/// Blocking task to be executed on a thread pool. -pub struct HttpConnectorBlockingTask { - work: oneshot::Execute -} +// Make this Future unnameable outside of this crate. +mod http_connector { + use super::*; + // Blocking task to be executed on a thread pool. + pub struct HttpConnectorBlockingTask { + pub(super) work: oneshot::Execute + } -impl fmt::Debug for HttpConnectorBlockingTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnectorBlockingTask") + impl fmt::Debug for HttpConnectorBlockingTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnectorBlockingTask") + } } -} -impl Future for HttpConnectorBlockingTask { - type Item = (); - type Error = (); + impl Future for HttpConnectorBlockingTask { + type Item = (); + type Error = (); - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() + fn poll(&mut self) -> Poll<(), ()> { + self.work.poll() + } } } @@ -288,20 +337,97 @@ impl Executor> for HttpConnectExecutor { } } -/* -impl HttpsConnector { - /// Create a new connector using the provided SSL implementation. - pub fn new(s: S) -> HttpsConnector { - HttpsConnector { - http: HttpConnector::default(), - ssl: s, +#[doc(hidden)] +#[deprecated(since="0.11.16", note="Use the Connect2 trait, which will become Connect in 0.12")] +pub trait Connect: Service + 'static { + /// The connected Io Stream. + type Output: AsyncRead + AsyncWrite + 'static; + /// A Future that will resolve to the connected Stream. + type Future: Future + 'static; + /// Connect to a remote address. + fn connect(&self, Uri) -> ::Future; +} + +#[doc(hidden)] +#[allow(deprecated)] +impl Connect for T +where T: Service + 'static, + T::Response: AsyncRead + AsyncWrite, + T::Future: Future, +{ + type Output = T::Response; + type Future = T::Future; + + fn connect(&self, url: Uri) -> ::Future { + self.call(url) + } +} + +#[doc(hidden)] +#[allow(deprecated)] +impl Connect2 for T +where + T: Connect, +{ + type Transport = ::Output; + type Error = io::Error; + type Future = ConnectToConnect2Future<::Future>; + + fn connect(&self, dst: Destination) -> ::Future { + ConnectToConnect2Future { + inner: ::connect(self, dst.uri), } } } -*/ + +#[doc(hidden)] +#[deprecated(since="0.11.16")] +#[allow(missing_debug_implementations)] +pub struct ConnectToConnect2Future { + inner: F, +} + +#[allow(deprecated)] +impl Future for ConnectToConnect2Future +where + F: Future, +{ + type Item = Connected; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + .map(|async| async.map(Connected::new)) + } +} + +// even though deprecated, we need to make sure the HttpConnector still +// implements Connect (and Service apparently...) + +#[allow(deprecated)] +fn _assert_http_connector() { + fn assert_connect() + where + T: Connect2< + Transport=TcpStream, + Error=io::Error, + Future=ConnectToConnect2Future + >, + T: Connect, + T: Service< + Request=Uri, + Response=TcpStream, + Future=HttpConnecting, + Error=io::Error + >, + {} + + assert_connect::(); +} #[cfg(test)] mod tests { + #![allow(deprecated)] use std::io; use tokio::reactor::Core; use super::{Connect, HttpConnector}; diff --git a/src/client/mod.rs b/src/client/mod.rs index ececc3e985..0b201e3b53 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -24,11 +24,14 @@ use version::HttpVersion; pub use proto::response::Response; pub use proto::request::Request; -pub use self::connect::{HttpConnector, Connect}; +pub use self::connect::{Connect2, HttpConnector}; +#[allow(deprecated)] +pub use self::connect::Connect; use self::background::{bg, Background}; +use self::connect::Destination; -mod connect; +pub mod connect; mod dns; mod pool; #[cfg(feature = "compat")] @@ -99,7 +102,9 @@ impl Client { } impl Client -where C: Connect, +where C: Connect2, + C::Transport: 'static, + C::Future: 'static, B: Stream + 'static, B::Item: AsRef<[u8]>, { @@ -149,7 +154,9 @@ impl Future for FutureResponse { } impl Service for Client -where C: Connect, +where C: Connect2, + C::Transport: 'static, + C::Future: 'static, B: Stream + 'static, B::Item: AsRef<[u8]>, { @@ -195,15 +202,19 @@ where C: Connect, let executor = self.executor.clone(); let pool = self.pool.clone(); let pool_key = Rc::new(domain.to_string()); - self.connector.connect(url) - .and_then(move |io| { + let dst = Destination { + uri: url, + alpn: self::connect::Alpn::Http1, + }; + self.connector.connect(dst) + .and_then(move |connected| { let (tx, rx) = mpsc::channel(0); let tx = HyperClient { tx: RefCell::new(tx), should_close: true, }; let pooled = pool.pooled(pool_key, tx); - let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); + let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(connected.transport, pooled.clone()); let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?; Ok(pooled) @@ -384,7 +395,9 @@ impl Config { } impl Config -where C: Connect, +where C: Connect2, + C::Transport: 'static, + C::Future: 'static, B: Stream, B::Item: AsRef<[u8]>, {