Skip to content

Commit

Permalink
feat(client): redesign the Connect trait
Browse files Browse the repository at this point in the history
The original `Connect` trait had some limitations:

- There was no way to provide more details to the connector about how to
  connect, other than the `Uri`.
- There was no way for the connector to return any extra information
  about the connected transport.
- The `Error` was forced to be an `std::io::Error`.
- The transport and future had `'static` requirements.

As hyper gains HTTP/2 support, some of these things needed to be
changed. We want to allow the user to configure whether they hope to
us ALPN to start an HTTP/2 connection, and the connector needs to be
able to return back to hyper if it did so.

The new `Connect2` trait is meant to solve this.

- The `connect` method now receives a `Destination` type, instead of a
  `Uri`. This allows us to include additional data about how to connect.
- The `Future` returned from `connect` now must be a `Connected`, which
  wraps the transport, and includes possibly extra data about what
  happened when connecting.

The `Connect` trait is deprecated, with the hopes of `Connect2` taking
it's place in the next breaking release. For backwards compatibility,
any type that implements `Connect` now will automaticall implement
`Connect2`, ignoring any of the extra data from `Destination` and
`Connected`.
  • Loading branch information
seanmonstar committed Jan 27, 2018
1 parent c33b9d4 commit 955004c
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 53 deletions.
8 changes: 6 additions & 2 deletions src/client/compat.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Wrappers to build compatibility with the `http` crate.
use std::io;

use futures::{Future, Poll, Stream};
use http;
use tokio_service::Service;

use client::{Connect, Client, FutureResponse};
use client::{Connect2, Client, FutureResponse};
use error::Error;
use proto::Body;

Expand All @@ -19,7 +21,9 @@ pub(super) fn client<C, B>(client: Client<C, B>) -> CompatClient<C, B> {
}

impl<C, B> Service for CompatClient<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down
212 changes: 169 additions & 43 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Contains the `Connect2` trait, and supporting types.
use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::sync::Arc;
//use std::net::SocketAddr;

use futures::{Future, Poll, Async};
use futures::future::{Executor, ExecuteError};
Expand All @@ -16,31 +16,74 @@ use tokio_service::Service;
use Uri;

use super::dns;
use self::http_connector::HttpConnectorBlockingTask;

/// Connect to a destination, returning an IO transport.
pub trait Connect2 {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite;
/// An error occured when trying to connect.
type Error;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=Connected<Self::Transport>, Error=Self::Error>;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}

/// A connector creates an Io to a remote address..
///
/// This trait is not implemented directly, and only exists to make
/// the intent clearer. A connector should implement `Service` with
/// `Request=Uri` and `Response: Io` instead.
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
/// A set of properties to describe where and how to try to connect.
#[derive(Debug)]
pub struct Destination {
pub(super) alpn: Alpn,
pub(super) uri: Uri,
}

impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;
/// Extra information about the connected transport.
#[derive(Debug)]
pub struct Connected<T> {
alpn: Alpn,
pub(super) transport: T,
}

fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
#[derive(Debug)]
pub(super) enum Alpn {
Http1,
H2,
}

impl Destination {
/// Get a reference to the requested `Uri`.
pub fn uri(&self) -> &Uri {
&self.uri
}

/// Returns whether this connection must negotiate HTTP/2 via ALPN.
pub fn h2(&self) -> bool {
match self.alpn {
Alpn::Http1 => false,
Alpn::H2 => true,
}
}
}

impl<T> Connected<T> {
/// Create new `Connected` type with empty metadata.
pub fn new(transport: T) -> Connected<T> {
Connected {
alpn: Alpn::Http1,
transport: transport,
}
}

/// Convert into the underlyinng Transport.
pub fn into_transport(self) -> T {
self.transport
}

/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
pub fn h2(&mut self) -> &mut Connected<T> {
self.alpn = Alpn::H2;
self
}
}

Expand Down Expand Up @@ -96,6 +139,8 @@ impl fmt::Debug for HttpConnector {
}
}

// deprecated, will be gone in 0.12
#[doc(hidden)]
impl Service for HttpConnector {
type Request = Uri;
type Response = TcpStream;
Expand Down Expand Up @@ -258,23 +303,27 @@ impl ConnectingTcp {
}
}

/// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
work: oneshot::Execute<dns::Work>
}
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}

impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
}

impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}

Expand All @@ -288,20 +337,97 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
}
}

/*
impl<S: SslClient> HttpsConnector<S> {
/// Create a new connector using the provided SSL implementation.
pub fn new(s: S) -> HttpsConnector<S> {
HttpsConnector {
http: HttpConnector::default(),
ssl: s,
#[doc(hidden)]
#[deprecated(since="0.11.16", note="Use the Connect2 trait, which will become Connect in 0.12")]
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
}

#[doc(hidden)]
#[allow(deprecated)]
impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;

fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
}
}

#[doc(hidden)]
#[allow(deprecated)]
impl<T> Connect2 for T
where
T: Connect,
{
type Transport = <T as Connect>::Output;
type Error = io::Error;
type Future = ConnectToConnect2Future<<T as Connect>::Future>;

fn connect(&self, dst: Destination) -> <Self as Connect2>::Future {
ConnectToConnect2Future {
inner: <Self as Connect>::connect(self, dst.uri),
}
}
}
*/

#[doc(hidden)]
#[deprecated(since="0.11.16")]
#[allow(missing_debug_implementations)]
pub struct ConnectToConnect2Future<F> {
inner: F,
}

#[allow(deprecated)]
impl<F> Future for ConnectToConnect2Future<F>
where
F: Future,
{
type Item = Connected<F::Item>;
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map(|async| async.map(Connected::new))
}
}

// even though deprecated, we need to make sure the HttpConnector still
// implements Connect (and Service apparently...)

#[allow(deprecated)]
fn _assert_http_connector() {
fn assert_connect<T>()
where
T: Connect2<
Transport=TcpStream,
Error=io::Error,
Future=ConnectToConnect2Future<HttpConnecting>
>,
T: Connect<Output=TcpStream, Future=HttpConnecting>,
T: Service<
Request=Uri,
Response=TcpStream,
Future=HttpConnecting,
Error=io::Error
>,
{}

assert_connect::<HttpConnector>();
}

#[cfg(test)]
mod tests {
#![allow(deprecated)]
use std::io;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
Expand Down
29 changes: 21 additions & 8 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ use version::HttpVersion;

pub use proto::response::Response;
pub use proto::request::Request;
pub use self::connect::{HttpConnector, Connect};
pub use self::connect::{Connect2, HttpConnector};
#[allow(deprecated)]
pub use self::connect::Connect;

use self::background::{bg, Background};
use self::connect::Destination;

mod connect;
pub mod connect;
mod dns;
mod pool;
#[cfg(feature = "compat")]
Expand Down Expand Up @@ -99,7 +102,9 @@ impl<C, B> Client<C, B> {
}

impl<C, B> Client<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down Expand Up @@ -149,7 +154,9 @@ impl Future for FutureResponse {
}

impl<C, B> Service for Client<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down Expand Up @@ -195,15 +202,19 @@ where C: Connect,
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.and_then(move |io| {
let dst = Destination {
uri: url,
alpn: self::connect::Alpn::Http1,
};
self.connector.connect(dst)
.and_then(move |connected| {
let (tx, rx) = mpsc::channel(0);
let tx = HyperClient {
tx: RefCell::new(tx),
should_close: true,
};
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(connected.transport, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pooled)
Expand Down Expand Up @@ -384,7 +395,9 @@ impl<C, B> Config<C, B> {
}

impl<C, B> Config<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
Expand Down

0 comments on commit 955004c

Please sign in to comment.