Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ socket2 = { version = ">=0.5.9, <0.7", optional = true, features = ["all"] }
tracing = { version = "0.1", default-features = false, features = ["std"], optional = true }
tokio = { version = "1", optional = true, default-features = false }
tower-service = { version = "0.3", optional = true }
scopeguard = "1.2.0"

[dev-dependencies]
hyper = { version = "1.4.0", features = ["full"] }
Expand Down
110 changes: 93 additions & 17 deletions src/client/legacy/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::connect::HttpConnector;
use super::connect::{Alpn, Connect, Connected, Connection};
use super::pool::{self, Ver};

use crate::client::legacy::pool::{EventHandler, PoolEvent};
use crate::common::future::poll_fn;
use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};

Expand Down Expand Up @@ -62,7 +63,7 @@ pub struct Error {
}

#[derive(Debug)]
enum ErrorKind {
pub enum ErrorKind {
Canceled,
ChannelClosed,
Connect,
Expand All @@ -89,8 +90,9 @@ macro_rules! e {
};
}

// We might change this... :shrug:
type PoolKey = (http::uri::Scheme, http::uri::Authority);
/// PoolKey is a tuple of Scheme and Authority of Uri, used to
/// identify a connection pool for a specific destination.
pub type PoolKey = (http::uri::Scheme, http::uri::Authority);

enum TrySendError<B> {
Retryable {
Expand Down Expand Up @@ -247,7 +249,7 @@ where
let uri = req.uri().clone();

loop {
req = match self.try_send_request(req, pool_key.clone()).await {
req = match self.try_send_request(req, &pool_key).await {
Ok(resp) => return Ok(resp),
Err(TrySendError::Nope(err)) => return Err(err),
Err(TrySendError::Retryable {
Expand Down Expand Up @@ -275,7 +277,7 @@ where
async fn try_send_request(
&self,
mut req: Request<B>,
pool_key: PoolKey,
pool_key: &PoolKey,
) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
let mut pooled = self
.connection_for(pool_key)
Expand Down Expand Up @@ -368,10 +370,10 @@ where

async fn connection_for(
&self,
pool_key: PoolKey,
pool_key: &PoolKey,
) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
loop {
match self.one_connection_for(pool_key.clone()).await {
match self.one_connection_for(pool_key).await {
Ok(pooled) => return Ok(pooled),
Err(ClientConnectError::Normal(err)) => return Err(err),
Err(ClientConnectError::CheckoutIsClosed(reason)) => {
Expand All @@ -391,7 +393,7 @@ where

async fn one_connection_for(
&self,
pool_key: PoolKey,
pool_key: &PoolKey,
) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
// Return a single connection if pooling is not enabled
if !self.pool.is_enabled() {
Expand Down Expand Up @@ -484,7 +486,7 @@ where
#[cfg(any(feature = "http1", feature = "http2"))]
fn connect_to(
&self,
pool_key: PoolKey,
pool_key: &PoolKey,
) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
{
let executor = self.exec.clone();
Expand All @@ -496,7 +498,8 @@ where
let ver = self.config.ver;
let is_ver_h2 = ver == Ver::Http2;
let connector = self.connector.clone();
let dst = domain_as_uri(pool_key.clone());
let dst = domain_as_uri(pool_key);
let pool_key = pool_key.clone();
hyper_lazy(move || {
// Try to take a "connecting lock".
//
Expand All @@ -512,11 +515,49 @@ where
return Either::Right(future::err(canceled));
}
};

let on_event_error = pool.on_event.clone();
let pool_key_cloned = pool_key.clone();
Either::Left(
connector
.connect(super::connect::sealed::Internal, dst)
.map_err(move |err| {
let err_box: Box<dyn StdError + Send + Sync> = err.into();
let mut source_err: Option<&dyn StdError> = Some(err_box.as_ref());
let (mut io_error_kind, mut elapsed_error) = (None, false);

while let Some(current_err) = source_err {
use std::io;
if let Some(io_err) = current_err.downcast_ref::<io::Error>()
{
io_error_kind = Some(io_err.kind());
} else if current_err.is::<tokio::time::error::Elapsed>() {
elapsed_error = true;
}

source_err = current_err.source();
}

if let Some(ref handler) = on_event_error {
let is_timeout =
elapsed_error || matches!(io_error_kind, Some(std::io::ErrorKind::TimedOut));
if is_timeout {
handler.notify(PoolEvent::ConnectionTimeout, &[&pool_key]);
} else {
handler.notify(PoolEvent::ConnectionError, &[&pool_key]);
}
}
// If the connection failed, we need to notify the event connection error.

e!(Connect, err_box)
})
.map_err(|src| e!(Connect, src))
.and_then(move |io| {
// increment the total connection count for this pool key
if let Some(ref handler) = pool.on_event {
handler.notify(PoolEvent::NewConnection, &[&pool_key_cloned]);
}

let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
Expand All @@ -542,6 +583,13 @@ where
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;

Either::Left(Box::pin(async move {
use scopeguard::{guard, ScopeGuard};
let guard = guard((), |_| {
// increment the destroy connection count for this pool key (if still armed)
if let Some(ref handler) = pool.on_event {
handler.notify(PoolEvent::ConnectionError, &[&pool_key_cloned]);
}
});
let tx = if is_h2 {
#[cfg(feature = "http2")] {
let (mut tx, conn) =
Expand Down Expand Up @@ -653,6 +701,9 @@ where
}
};

// “defuse” the guard...
ScopeGuard::into_inner(guard);

Ok(pool.pooled(
connecting,
PoolClient {
Expand Down Expand Up @@ -932,10 +983,10 @@ fn authority_form(uri: &mut Uri) {
}

fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
let uri_clone = uri.clone();
match (uri_clone.scheme(), uri_clone.authority()) {
match (uri.scheme(), uri.authority()) {
(Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
(None, Some(auth)) if is_http_connect => {
let auth = auth.clone();
let scheme = match auth.port_u16() {
Some(443) => {
set_scheme(uri, Scheme::HTTPS);
Expand All @@ -946,7 +997,7 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error
Scheme::HTTP
}
};
Ok((scheme, auth.clone()))
Ok((scheme, auth))
}
_ => {
debug!("Client requires absolute-form URIs, received: {:?}", uri);
Expand All @@ -955,10 +1006,10 @@ fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error
}
}

fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
fn domain_as_uri((scheme, auth): &PoolKey) -> Uri {
http::uri::Builder::new()
.scheme(scheme)
.authority(auth)
.scheme(scheme.clone())
.authority(auth.clone())
.path_and_query("/")
.build()
.expect("domain is valid Uri")
Expand Down Expand Up @@ -1021,6 +1072,7 @@ pub struct Builder {
h2_builder: hyper::client::conn::http2::Builder<Exec>,
pool_config: pool::Config,
pool_timer: Option<timer::Timer>,
event_handler: Option<EventHandler>,
}

impl Builder {
Expand All @@ -1046,6 +1098,7 @@ impl Builder {
max_idle_per_host: usize::MAX,
},
pool_timer: None,
event_handler: None,
}
}
/// Set an optional timeout for idle sockets being kept-alive.
Expand Down Expand Up @@ -1486,6 +1539,21 @@ impl Builder {
self
}

/// Sets the maximum number of HTTP2 concurrent streams.
///
/// See the documentation of [`h2::client::Builder::max_concurrent_streams`] for more
/// details.
///
/// The default value is determined by the `h2` crate.
///
/// [`h2::client::Builder::max_concurrent_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_streams
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_concurrent_streams(&mut self, max: u32) -> &mut Self {
self.h2_builder.max_concurrent_streams(max);
self
}

/// Sets the maximum number of HTTP2 concurrent locally reset streams.
///
/// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
Expand Down Expand Up @@ -1539,6 +1607,13 @@ impl Builder {
self
}

/// Set a handler to be used for pool event notifications.
///
pub fn pool_event_handler(&mut self, on_event: EventHandler) -> &mut Self {
self.event_handler = Some(on_event);
self
}

/// Set whether to retry requests that get disrupted before ever starting
/// to write.
///
Expand Down Expand Up @@ -1591,6 +1666,7 @@ impl Builder {
{
let exec = self.exec.clone();
let timer = self.pool_timer.clone();
let on_event = self.event_handler.clone();
Client {
config: self.client_config,
exec: exec.clone(),
Expand All @@ -1599,7 +1675,7 @@ impl Builder {
#[cfg(feature = "http2")]
h2_builder: self.h2_builder.clone(),
connector,
pool: pool::Pool::new(self.pool_config, exec, timer),
pool: pool::Pool::new(self.pool_config, exec, timer, on_event),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/legacy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(any(feature = "http1", feature = "http2"))]
mod client;
#[cfg(any(feature = "http1", feature = "http2"))]
pub use client::{Builder, Client, Error, ResponseFuture};
pub use client::{Builder, Client, Error, PoolKey, ResponseFuture};

pub mod connect;
#[doc(hidden)]
Expand Down
Loading