Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use super::conn::Connection;
use super::conn::Protocol;
use super::conn::Transport;
use super::pool::PoolableConnection;
use super::ConnectionPoolLayer;
use crate::service::RequestExecutor;
use crate::service::{Http1ChecksLayer, Http2ChecksLayer, SetHostHeaderLayer};

use super::ClientService;
use crate::client::conn::connection::ConnectionError;
#[cfg(feature = "tls")]
use crate::client::default_tls_config;
Expand Down Expand Up @@ -484,12 +486,14 @@ where
http::header::USER_AGENT,
user_agent,
))
.service(ClientService {
transport,
protocol: self.protocol.build(),
pool: self.pool.map(super::pool::Pool::new),
_body: std::marker::PhantomData,
});
.layer(
ConnectionPoolLayer::new(transport, self.protocol.build())
.with_optional_pool(self.pool.clone()),
)
.layer(SetHostHeaderLayer::new())
.layer(Http2ChecksLayer::new())
.layer(Http1ChecksLayer::new())
.service(RequestExecutor::new());

SharedService::new(service)
}
Expand Down
8 changes: 4 additions & 4 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//!
//! 1. The high-level [`Client`] API, which is the most user-friendly and abstracts away most of the details.
//! It is "batteries-included", and supports features like redirects, retries and timeouts.
//! 2. The [`Service`][ClientService] API, which is a lower-level API that allows for more control over the request and response.
//! 2. The [`Service`][ConnectionPoolService] API, which is a lower-level API that allows for more control over the request and response.
//! It presents a `tower::Service` that can be used to send requests and receive responses, and can be wrapped
//! by middleware compatible with the tower ecosystem.
//! 3. The [connection][self::conn] API, which is the lowest-level API that allows for direct control over the
Expand All @@ -21,14 +21,14 @@ use tower::ServiceExt;

use self::conn::protocol::auto;
use self::conn::transport::tcp::TcpTransportConfig;
pub use self::service::ClientService;
pub use self::pool::service::ConnectionPoolLayer;
pub use self::pool::service::ConnectionPoolService;
use crate::service::SharedService;

mod builder;
pub mod conn;
mod error;
pub mod pool;
mod service;

pub use self::error::Error;
pub use self::pool::Config as PoolConfig;
Expand Down Expand Up @@ -82,7 +82,7 @@ impl ClientRef {

/// A high-level async HTTP client.
///
/// This client is built on top of the [`Service`][ClientService] API and provides a more user-friendly interface,
/// This client is built on top of the [`Service`][ConnectionPoolService] API and provides a more user-friendly interface,
/// including support for retries, redirects and timeouts.
///
/// # Example
Expand Down
130 changes: 95 additions & 35 deletions src/client/pool/checkout.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt;
use std::future::poll_fn;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
Expand All @@ -9,7 +10,6 @@ use std::task::Context;
use std::task::Poll;

use futures_util::future::BoxFuture;
use futures_util::FutureExt as _;
use pin_project::pin_project;
use pin_project::pinned_drop;
use thiserror::Error;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<C: PoolableConnection, T: PoolableTransport, E> fmt::Debug
}

#[pin_project(PinnedDrop)]
pub(crate) struct Checkout<C: PoolableConnection, T: PoolableTransport, E> {
pub(crate) struct Checkout<C: PoolableConnection, T: PoolableTransport, E: 'static> {
key: Key,
pool: WeakOpt<Mutex<PoolInner<C>>>,
#[pin]
Expand All @@ -157,7 +157,7 @@ pub(crate) struct Checkout<C: PoolableConnection, T: PoolableTransport, E> {
id: CheckoutId,
}

impl<C: PoolableConnection, T: PoolableTransport, E> fmt::Debug for Checkout<C, T, E> {
impl<C: PoolableConnection, T: PoolableTransport, E: 'static> fmt::Debug for Checkout<C, T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Checkout")
.field("key", &self.key)
Expand All @@ -168,7 +168,7 @@ impl<C: PoolableConnection, T: PoolableTransport, E> fmt::Debug for Checkout<C,
}
}

impl<C: PoolableConnection, T: PoolableTransport, E> Checkout<C, T, E> {
impl<C: PoolableConnection, T: PoolableTransport, E: 'static> Checkout<C, T, E> {
pub(crate) fn detached(key: Key, connector: Connector<C, T, E>) -> Self {
Self {
key,
Expand Down Expand Up @@ -262,6 +262,7 @@ where
let transport: T = match this.inner {
InnerCheckoutConnecting::Waiting => {
// We're waiting on a connection to be ready.
// If that were still happening, we would bail out above.
return Poll::Ready(Err(Error::Unavailable));
}
InnerCheckoutConnecting::Connected => {
Expand All @@ -274,11 +275,11 @@ where
return Poll::Ready(Ok(self.as_mut().connected(connection)));
}
InnerCheckoutConnecting::Connecting(Connector { transport, .. }) => {
ready!(transport.poll_unpin(cx)).map_err(Error::Connecting)?
ready!(transport.as_mut().poll(cx)).map_err(Error::Connecting)?
}
InnerCheckoutConnecting::Handshaking(handshake) => {
let connection =
ready!(handshake.poll_unpin(cx)).map_err(Error::Handshaking)?;
ready!(handshake.as_mut().poll(cx)).map_err(Error::Handshaking)?;
return Poll::Ready(Ok(self.as_mut().connected(connection)));
}
};
Expand Down Expand Up @@ -307,7 +308,7 @@ where
}
}

impl<C: PoolableConnection, T: PoolableTransport, E> Checkout<C, T, E> {
impl<C: PoolableConnection, T: PoolableTransport, E: 'static> Checkout<C, T, E> {
/// Checks the waiter to see if a new connection is ready and can be passed along.
///
/// If there is no waiter, this function returns `Poll::Ready(Ok(None))`. If there is
Expand All @@ -320,45 +321,104 @@ impl<C: PoolableConnection, T: PoolableTransport, E> Checkout<C, T, E> {
}

/// Called to register a new connection with the pool.
pub(crate) fn connected(self: Pin<&mut Self>, mut connection: C) -> Pooled<C> {
if let Some(pool) = self.pool.upgrade() {
if let Ok(mut inner) = pool.lock() {
if let Some(reused) = connection.reuse() {
inner.push(self.key.clone(), reused, self.pool.clone());
return Pooled {
connection: Some(connection),
is_reused: true,
key: self.key.clone(),
pool: WeakOpt::none(),
};
} else {
return Pooled {
connection: Some(connection),
is_reused: false,
key: self.key.clone(),
pool: WeakOpt::downgrade(&pool),
};
}
pub(crate) fn connected(self: Pin<&mut Self>, connection: C) -> Pooled<C> {
register_connected(&self.pool, &self.key, connection)
}
}

fn register_connected<C>(
poolref: &WeakOpt<Mutex<PoolInner<C>>>,
key: &Key,
mut connection: C,
) -> Pooled<C>
where
C: PoolableConnection,
{
if let Some(pool) = poolref.upgrade() {
if let Ok(mut inner) = pool.lock() {
if let Some(reused) = connection.reuse() {
inner.push(key.clone(), reused, poolref.clone());
return Pooled {
connection: Some(connection),
is_reused: true,
key: key.clone(),
pool: WeakOpt::none(),
};
} else {
return Pooled {
connection: Some(connection),
is_reused: false,
key: key.clone(),
pool: WeakOpt::downgrade(&pool),
};
}
}
}

// No pool or lock was available, so we can't add the connection to the pool.
Pooled {
connection: Some(connection),
is_reused: false,
key: self.key.clone(),
pool: WeakOpt::none(),
}
// No pool or lock was available, so we can't add the connection to the pool.
Pooled {
connection: Some(connection),
is_reused: false,
key: key.clone(),
pool: WeakOpt::none(),
}
}

#[pinned_drop]
impl<C: PoolableConnection, T: PoolableTransport, E> PinnedDrop for Checkout<C, T, E> {
fn drop(self: Pin<&mut Self>) {
impl<C: PoolableConnection, T: PoolableTransport, E> PinnedDrop for Checkout<C, T, E>
where
E: 'static,
{
fn drop(mut self: Pin<&mut Self>) {
if let Some(pool) = self.pool.upgrade() {
if let Ok(mut inner) = pool.lock() {
inner.cancel_connection(&self.key);
}

let state = std::mem::replace(&mut self.inner, InnerCheckoutConnecting::Connected);

match state {
InnerCheckoutConnecting::Connecting(mut connector) => {
let pool = self.pool.clone();
let key = self.key.clone();
tokio::spawn(async move {
let io: T = match poll_fn(|cx| connector.transport.as_mut().poll(cx)).await
{
Ok(io) => io,
Err(_) => {
tracing::error!(%key, "error connecting background transport");
return;
}
};

let connection = match (connector.handshake)(io).await {
Ok(conn) => conn,
Err(_) => {
tracing::error!(%key, "error handshaking background connection");
return;
}
};

register_connected(&pool, &key, connection);
});
}
InnerCheckoutConnecting::Handshaking(handshake) => {
let pool = self.pool.clone();
let key = self.key.clone();
tokio::spawn(async move {
let connection = match handshake.await {
Ok(conn) => conn,
Err(_) => {
tracing::error!(key=%key, "error handshaking connection");
return;
}
};

register_connected(&pool, &key, connection);
});
}
_ => {}
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/client/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tracing::trace;
mod checkout;
mod idle;
pub(super) mod key;
pub(super) mod service;
mod weakopt;

pub(crate) use self::checkout::Checkout;
Expand Down Expand Up @@ -273,6 +274,10 @@ pub trait PoolableConnection: Unpin + Send + Sized + 'static {
fn reuse(&mut self) -> Option<Self>;
}

/// Wrapper type for a connection which is managed by a pool.
///
/// This type is used outside of the Pool to ensure that dropped
/// connections are returned to the pool.
pub(crate) struct Pooled<C: PoolableConnection> {
connection: Option<C>,
is_reused: bool,
Expand Down
Loading