From c87b743138c739a14a41718d6bfd05805f4d01d2 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Jun 2025 15:27:39 -0400 Subject: [PATCH 1/5] feat(pool): add a Singleton pool type --- Cargo.toml | 3 + src/client/mod.rs | 3 + src/client/pool/mod.rs | 3 + src/client/pool/singleton.rs | 333 +++++++++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+) create mode 100644 src/client/pool/mod.rs create mode 100644 src/client/pool/singleton.rs diff --git a/Cargo.toml b/Cargo.toml index c81763ee..b7646cab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } tokio-test = "0.4" +tower-test = "0.4" pretty_env_logger = "0.5" [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] @@ -60,6 +61,7 @@ default = [] full = [ "client", "client-legacy", + "client-pool", "client-proxy", "client-proxy-system", "server", @@ -74,6 +76,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] +client-pool = [] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] diff --git a/src/client/mod.rs b/src/client/mod.rs index 0d896030..268cadf0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -4,5 +4,8 @@ #[cfg(feature = "client-legacy")] pub mod legacy; +#[cfg(feature = "client-pool")] +pub mod pool; + #[cfg(feature = "client-proxy")] pub mod proxy; diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs new file mode 100644 index 00000000..c0e9bfec --- /dev/null +++ b/src/client/pool/mod.rs @@ -0,0 +1,3 @@ +//! Composable pool services + +pub mod singleton; diff --git a/src/client/pool/singleton.rs b/src/client/pool/singleton.rs new file mode 100644 index 00000000..50518fd4 --- /dev/null +++ b/src/client/pool/singleton.rs @@ -0,0 +1,333 @@ +//! Singleton pools +//! +//! The singleton pool combines a MakeService that should only produce a single +//! active connection. It can bundle all concurrent calls to it, so that only +//! one connection is made. All calls to the singleton will return a clone of +//! the inner service once established. +//! +//! This fits the HTTP/2 case well. + +use std::sync::{Arc, Mutex}; +use std::task::{self, Poll}; + +use tokio::sync::oneshot; +use tower_service::Service; + +use self::internal::{DitchGuard, SingletonError, SingletonFuture}; + +type BoxError = Box; + +/// A singleton pool over an inner service. +/// +/// The singleton wraps an inner service maker, bundling all calls to ensure +/// only one service is created. Once made, it returns clones of the made +/// service. +#[derive(Debug)] +pub struct Singleton +where + M: Service, +{ + mk_svc: M, + state: Arc>>, +} + +#[derive(Debug)] +enum State { + Empty, + Making(Vec>), + Made(S), +} + +impl Singleton +where + M: Service, + M::Response: Clone, +{ + /// Create a new singleton pool over an inner make service. + pub fn new(mk_svc: M) -> Self { + Singleton { + mk_svc, + state: Arc::new(Mutex::new(State::Empty)), + } + } + + // pub fn reset? + // pub fn retain? +} + +impl Service for Singleton +where + M: Service, + M::Response: Clone, + M::Error: Into, +{ + type Response = M::Response; + type Error = SingletonError; + type Future = SingletonFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if let State::Empty = *self.state.lock().unwrap() { + return self + .mk_svc + .poll_ready(cx) + .map_err(|e| SingletonError(e.into())); + } + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: Target) -> Self::Future { + let mut locked = self.state.lock().unwrap(); + match *locked { + State::Empty => { + let fut = self.mk_svc.call(dst); + *locked = State::Making(Vec::new()); + SingletonFuture::Driving { + future: fut, + singleton: DitchGuard(Arc::downgrade(&self.state)), + } + } + State::Making(ref mut waiters) => { + let (tx, rx) = oneshot::channel(); + waiters.push(tx); + SingletonFuture::Waiting { rx } + } + State::Made(ref svc) => SingletonFuture::Made { + svc: Some(svc.clone()), + }, + } + } +} + +impl Clone for Singleton +where + M: Service + Clone, +{ + fn clone(&self) -> Self { + Self { + mk_svc: self.mk_svc.clone(), + state: self.state.clone(), + } + } +} + +// Holds some "pub" items that otherwise shouldn't be public. +mod internal { + use std::future::Future; + use std::pin::Pin; + use std::sync::{Mutex, Weak}; + use std::task::{self, Poll}; + + use futures_core::ready; + use pin_project_lite::pin_project; + use tokio::sync::oneshot; + + use super::{BoxError, State}; + + pin_project! { + #[project = SingletonFutureProj] + pub enum SingletonFuture { + Driving { + #[pin] + future: F, + singleton: DitchGuard, + }, + Waiting { + rx: oneshot::Receiver, + }, + Made { + svc: Option, + }, + } + } + + // XXX: pub because of the enum SingletonFuture + pub struct DitchGuard(pub(super) Weak>>); + + impl Future for SingletonFuture + where + F: Future>, + E: Into, + S: Clone, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match self.project() { + SingletonFutureProj::Driving { future, singleton } => { + match ready!(future.poll(cx)) { + Ok(svc) => { + if let Some(state) = singleton.0.upgrade() { + let mut locked = state.lock().unwrap(); + singleton.0 = Weak::new(); + match std::mem::replace(&mut *locked, State::Made(svc.clone())) { + State::Making(waiters) => { + for tx in waiters { + let _ = tx.send(svc.clone()); + } + } + State::Empty | State::Made(_) => { + // shouldn't happen! + } + } + } + Poll::Ready(Ok(svc)) + } + Err(e) => { + if let Some(state) = singleton.0.upgrade() { + let mut locked = state.lock().unwrap(); + singleton.0 = Weak::new(); + *locked = State::Empty; + } + Poll::Ready(Err(SingletonError(e.into()))) + } + } + } + SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) { + Ok(svc) => Poll::Ready(Ok(svc)), + Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))), + }, + SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())), + } + } + } + + impl Drop for DitchGuard { + fn drop(&mut self) { + if let Some(state) = self.0.upgrade() { + if let Ok(mut locked) = state.lock() { + *locked = State::Empty; + } + } + } + } + + // An opaque error type. By not exposing the type, nor being specifically + // Box, we can _change_ the type once we no longer need the Canceled + // error type. This will be possible with the refactor to baton passing. + #[derive(Debug)] + pub struct SingletonError(pub(super) BoxError); + + impl std::fmt::Display for SingletonError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("singleton connection error") + } + } + + impl std::error::Error for SingletonError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&*self.0) + } + } + + #[derive(Debug)] + struct Canceled; + + impl std::fmt::Display for Canceled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("singleton connection canceled") + } + } + + impl std::error::Error for Canceled {} +} + +#[cfg(test)] +mod tests { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + use tower_service::Service; + + use super::Singleton; + + #[tokio::test] + async fn first_call_drives_subsequent_wait() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + + let mut singleton = Singleton::new(mock_svc); + + handle.allow(1); + crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) + .await + .unwrap(); + // First call: should go into Driving + let fut1 = singleton.call(()); + // Second call: should go into Waiting + let fut2 = singleton.call(()); + + // Expect exactly one request to the inner service + let ((), send_response) = handle.next_request().await.unwrap(); + send_response.send_response("svc"); + + // Both futures should resolve to the same value + assert_eq!(fut1.await.unwrap(), "svc"); + assert_eq!(fut2.await.unwrap(), "svc"); + } + + #[tokio::test] + async fn made_state_returns_immediately() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + let mut singleton = Singleton::new(mock_svc); + + handle.allow(1); + crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) + .await + .unwrap(); + // Drive first call to completion + let fut1 = singleton.call(()); + let ((), send_response) = handle.next_request().await.unwrap(); + send_response.send_response("svc"); + assert_eq!(fut1.await.unwrap(), "svc"); + + // Second call should not hit inner service + let res = singleton.call(()).await.unwrap(); + assert_eq!(res, "svc"); + } + + #[tokio::test] + async fn cancel_waiter_does_not_affect_others() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + let mut singleton = Singleton::new(mock_svc); + + crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) + .await + .unwrap(); + let fut1 = singleton.call(()); + let fut2 = singleton.call(()); + drop(fut2); // cancel one waiter + + let ((), send_response) = handle.next_request().await.unwrap(); + send_response.send_response("svc"); + + assert_eq!(fut1.await.unwrap(), "svc"); + } + + // TODO: this should be able to be improved with a cooperative baton refactor + #[tokio::test] + async fn cancel_driver_cancels_all() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + let mut singleton = Singleton::new(mock_svc); + + crate::common::future::poll_fn(|cx| singleton.poll_ready(cx)) + .await + .unwrap(); + let mut fut1 = singleton.call(()); + let fut2 = singleton.call(()); + + // poll driver just once, and then drop + crate::common::future::poll_fn(move |cx| { + let _ = Pin::new(&mut fut1).poll(cx); + Poll::Ready(()) + }) + .await; + + let ((), send_response) = handle.next_request().await.unwrap(); + send_response.send_response("svc"); + + assert_eq!( + fut2.await.unwrap_err().0.to_string(), + "singleton connection canceled" + ); + } +} From 6d7eef95086f86ab769114233a4e473989cb76ec Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Jun 2025 15:27:39 -0400 Subject: [PATCH 2/5] feat(pool): add a Cache pooling service --- .github/workflows/CI.yml | 2 +- Cargo.toml | 2 +- src/client/pool/cache.rs | 457 +++++++++++++++++++++++++++++++++++++++ src/client/pool/mod.rs | 1 + 4 files changed, 460 insertions(+), 2 deletions(-) create mode 100644 src/client/pool/cache.rs diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index e52508b5..46a03bc0 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -119,4 +119,4 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@nightly - - run: cargo rustdoc -- --cfg docsrs -D rustdoc::broken-intra-doc-links + - run: cargo rustdoc --features full -- --cfg docsrs -D rustdoc::broken-intra-doc-links diff --git a/Cargo.toml b/Cargo.toml index b7646cab..130e48f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] -client-pool = [] +client-pool = ["dep:futures-util"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] diff --git a/src/client/pool/cache.rs b/src/client/pool/cache.rs new file mode 100644 index 00000000..1353c74d --- /dev/null +++ b/src/client/pool/cache.rs @@ -0,0 +1,457 @@ +//! A cache of services +//! +//! The cache is a single list of cached services, bundled with a `MakeService`. +//! Calling the cache returns either an existing service, or makes a new one. +//! The returned `impl Service` can be used to send requests, and when dropped, +//! it will try to be returned back to the cache. + +pub use self::internal::builder; + +#[cfg(docsrs)] +pub use self::internal::Builder; +#[cfg(docsrs)] +pub use self::internal::Cache; +#[cfg(docsrs)] +pub use self::internal::Cached; + +// For now, nothing else in this module is nameable. We can always make things +// more public, but we can't change type shapes (generics) once things are +// public. +mod internal { + use std::fmt; + use std::future::Future; + use std::pin::Pin; + use std::sync::{Arc, Mutex, Weak}; + use std::task::{self, Poll}; + + use futures_core::ready; + use futures_util::future; + use tokio::sync::oneshot; + use tower_service::Service; + + use super::events; + + /// Start a builder to construct a `Cache` pool. + pub fn builder() -> Builder { + Builder { + events: events::Ignore, + } + } + + /// A cache pool of services from the inner make service. + /// + /// Created with [`builder()`]. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Debug)] + pub struct Cache + where + M: Service, + { + connector: M, + shared: Arc>>, + events: Ev, + } + + /// A builder to configure a `Cache`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Debug)] + pub struct Builder { + events: Ev, + } + + /// A cached service returned from a [`Cache`]. + /// + /// Implements `Service` by delegating to the inner service. Once dropped, + /// tries to reinsert into the `Cache`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + pub struct Cached { + inner: Option, + shared: Weak>>, + // todo: on_idle + } + + pub enum CacheFuture + where + M: Service, + { + Racing { + shared: Arc>>, + select: future::Select, M::Future>, + events: Ev, + }, + Connecting { + // TODO: could be Weak even here... + shared: Arc>>, + future: M::Future, + }, + Cached { + svc: Option>, + }, + } + + // shouldn't be pub + #[derive(Debug)] + pub struct Shared { + services: Vec, + waiters: Vec>, + } + + // impl Builder + + impl Builder { + /// Provide a `Future` executor to be used by the `Cache`. + /// + /// The executor is used handle some optional background tasks that + /// can improve the behavior of the cache, such as reducing connection + /// thrashing when a race is won. If not configured with an executor, + /// the default behavior is to ignore any of these optional background + /// tasks. + /// + /// The executor should implmenent [`hyper::rt::Executor`]. + /// + /// # Example + /// + /// ```rust + /// # #[cfg(feature = "tokio")] + /// # fn run() { + /// let _builder = hyper_util::client::pool::cache::builder() + /// .executor(hyper_util::rt::TokioExecutor::new()); + /// # } + /// ``` + pub fn executor(self, exec: E) -> Builder> { + Builder { + events: events::WithExecutor(exec), + } + } + + /// Build a `Cache` pool around the `connector`. + pub fn build(self, connector: M) -> Cache + where + M: Service, + { + Cache { + connector, + events: self.events, + shared: Arc::new(Mutex::new(Shared { + services: Vec::new(), + waiters: Vec::new(), + })), + } + } + } + + // impl Cache + + impl Service for Cache + where + M: Service, + M::Future: Unpin, + M::Response: Unpin, + Ev: events::Events> + Clone + Unpin, + { + type Response = Cached; + type Error = M::Error; + type Future = CacheFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if !self.shared.lock().unwrap().services.is_empty() { + Poll::Ready(Ok(())) + } else { + self.connector.poll_ready(cx) + } + } + + fn call(&mut self, target: Dst) -> Self::Future { + // 1. If already cached, easy! + let waiter = { + let mut locked = self.shared.lock().unwrap(); + if let Some(found) = locked.take() { + return CacheFuture::Cached { + svc: Some(Cached::new(found, Arc::downgrade(&self.shared))), + }; + } + + let (tx, rx) = oneshot::channel(); + locked.waiters.push(tx); + rx + }; + + // 2. Otherwise, we start a new connect, and also listen for + // any newly idle. + CacheFuture::Racing { + shared: self.shared.clone(), + select: future::select(waiter, self.connector.call(target)), + events: self.events.clone(), + } + } + } + + impl Clone for Cache + where + M: Service + Clone, + Ev: Clone, + { + fn clone(&self) -> Self { + Self { + connector: self.connector.clone(), + events: self.events.clone(), + shared: self.shared.clone(), + } + } + } + + impl Future for CacheFuture + where + M: Service, + M::Future: Unpin, + M::Response: Unpin, + Ev: events::Events> + Unpin, + { + type Output = Result, M::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + loop { + match &mut *self.as_mut() { + CacheFuture::Racing { + shared, + select, + events, + } => { + match ready!(Pin::new(select).poll(cx)) { + future::Either::Left((Err(_pool_closed), connecting)) => { + // pool was dropped, so we'll never get it from a waiter, + // but if this future still exists, then the user still + // wants a connection. just wait for the connecting + *self = CacheFuture::Connecting { + shared: shared.clone(), + future: connecting, + }; + } + future::Either::Left((Ok(pool_got), connecting)) => { + events.on_race_lost(BackgroundConnect { + future: connecting, + shared: Arc::downgrade(&shared), + }); + return Poll::Ready(Ok(Cached::new( + pool_got, + Arc::downgrade(&shared), + ))); + } + future::Either::Right((connected, _waiter)) => { + let inner = connected?; + return Poll::Ready(Ok(Cached::new( + inner, + Arc::downgrade(&shared), + ))); + } + } + } + CacheFuture::Connecting { shared, future } => { + let inner = ready!(Pin::new(future).poll(cx))?; + return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared)))); + } + CacheFuture::Cached { svc } => { + return Poll::Ready(Ok(svc.take().unwrap())); + } + } + } + } + } + + // impl Cached + + impl Cached { + fn new(inner: S, shared: Weak>>) -> Self { + Cached { + inner: Some(inner), + shared, + } + } + } + + impl Service for Cached + where + S: Service, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.inner.as_mut().unwrap().poll_ready(cx) + } + + fn call(&mut self, req: Req) -> Self::Future { + self.inner.as_mut().unwrap().call(req) + } + } + + impl Drop for Cached { + fn drop(&mut self) { + if let Some(value) = self.inner.take() { + if let Some(shared) = self.shared.upgrade() { + if let Ok(mut shared) = shared.lock() { + shared.put(value); + } + } + } + } + } + + impl fmt::Debug for Cached { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Cached") + .field(self.inner.as_ref().unwrap()) + .finish() + } + } + + // impl Shared + + impl Shared { + fn put(&mut self, val: V) { + let mut val = Some(val); + while let Some(tx) = self.waiters.pop() { + if !tx.is_closed() { + match tx.send(val.take().unwrap()) { + Ok(()) => break, + Err(v) => { + val = Some(v); + } + } + } + } + + if let Some(val) = val { + self.services.push(val); + } + } + + fn take(&mut self) -> Option { + // TODO: take in a loop + self.services.pop() + } + } + + pub struct BackgroundConnect { + future: CF, + shared: Weak>>, + } + + impl Future for BackgroundConnect + where + CF: Future> + Unpin, + { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.future).poll(cx)) { + Ok(svc) => { + if let Some(shared) = self.shared.upgrade() { + if let Ok(mut locked) = shared.lock() { + locked.put(svc); + } + } + Poll::Ready(()) + } + Err(_e) => Poll::Ready(()), + } + } + } +} + +mod events { + #[derive(Clone, Debug)] + #[non_exhaustive] + pub struct Ignore; + + #[derive(Clone, Debug)] + pub struct WithExecutor(pub(super) E); + + pub trait Events { + fn on_race_lost(&self, fut: CF); + } + + impl Events for Ignore { + fn on_race_lost(&self, _fut: CF) {} + } + + impl Events for WithExecutor + where + E: hyper::rt::Executor, + { + fn on_race_lost(&self, fut: CF) { + self.0.execute(fut); + } + } +} + +#[cfg(test)] +mod tests { + use futures_util::future; + use tower_service::Service; + use tower_test::assert_request_eq; + + #[tokio::test] + async fn test_makes_svc_when_empty() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + handle.allow(1); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + + let f = cache.call(1); + + future::join(f, async move { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + } + + #[tokio::test] + async fn test_reuses_after_idle() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + + // only 1 connection should ever be made + handle.allow(1); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let f = cache.call(1); + let cached = future::join(f, async { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + drop(cached); + + crate::common::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let f = cache.call(1); + let cached = f.await.expect("call"); + drop(cached); + } +} diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index c0e9bfec..a17acd37 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -1,3 +1,4 @@ //! Composable pool services +pub mod cache; pub mod singleton; From 11159bb913bac8004609855195febee28819cdd2 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Jun 2025 15:27:39 -0400 Subject: [PATCH 3/5] feat(pool): add a Negotiate pooling service --- Cargo.toml | 3 +- src/client/pool/mod.rs | 1 + src/client/pool/negotiate.rs | 574 +++++++++++++++++++++++++++++++++++ 3 files changed, 577 insertions(+), 1 deletion(-) create mode 100644 src/client/pool/negotiate.rs diff --git a/Cargo.toml b/Cargo.toml index 130e48f9..f793258c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ pin-project-lite = "0.2.4" socket2 = { version = ">=0.5.9, <0.7", optional = true, features = ["all"] } tracing = { version = "0.1", default-features = false, features = ["std"], optional = true } tokio = { version = "1", optional = true, default-features = false } +tower-layer = { version = "0.3", optional = true } tower-service = { version = "0.3", optional = true } [dev-dependencies] @@ -76,7 +77,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] -client-pool = ["dep:futures-util"] +client-pool = ["dep:futures-util", "dep:tower-layer"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index a17acd37..8b2c3bff 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -1,4 +1,5 @@ //! Composable pool services pub mod cache; +pub mod negotiate; pub mod singleton; diff --git a/src/client/pool/negotiate.rs b/src/client/pool/negotiate.rs new file mode 100644 index 00000000..8e283186 --- /dev/null +++ b/src/client/pool/negotiate.rs @@ -0,0 +1,574 @@ +//! Negotiate a pool of services +//! +//! The negotiate pool allows for a service that can decide between two service +//! types based on an intermediate return value. It differs from typical +//! routing since it doesn't depend on the request, but the response. +//! +//! The original use case is support ALPN upgrades to HTTP/2, with a fallback +//! to HTTP/1. +//! +//! # Example +//! +//! ```rust,ignore +//! # async fn run() -> Result<(), Box> { +//! # struct Conn; +//! # impl Conn { fn negotiated_protocol(&self) -> &[u8] { b"h2" } } +//! # let some_tls_connector = tower::service::service_fn(|_| async move { +//! # Ok::<_, std::convert::Infallible>(Conn) +//! # }); +//! # let http1_layer = tower::layer::layer_fn(|s| s); +//! # let http2_layer = tower::layer::layer_fn(|s| s); +//! let mut pool = hyper_util::client::pool::negotiate::builder() +//! .connect(some_tls_connector) +//! .inspect(|c| c.negotiated_protocol() == b"h2") +//! .fallback(http1_layer) +//! .upgrade(http2_layer) +//! .build(); +//! +//! // connect +//! let mut svc = pool.call(http::Uri::from_static("https://hyper.rs")).await?; +//! svc.ready().await; +//! +//! // http1 or http2 is now set up +//! # let some_http_req = http::Request::new(()); +//! let resp = svc.call(some_http_req).await?; +//! # Ok(()) +//! # } +//! ``` + +pub use self::internal::builder; + +#[cfg(docsrs)] +pub use self::internal::Builder; +#[cfg(docsrs)] +pub use self::internal::Negotiate; +#[cfg(docsrs)] +pub use self::internal::Negotiated; + +mod internal { + use std::future::Future; + use std::pin::Pin; + use std::sync::{Arc, Mutex}; + use std::task::{self, Poll}; + + use futures_core::ready; + use pin_project_lite::pin_project; + use tower_layer::Layer; + use tower_service::Service; + + type BoxError = Box; + + /// A negotiating pool over an inner make service. + /// + /// Created with [`builder()`]. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + pub struct Negotiate { + left: L, + right: R, + } + + /// A negotiated service returned by [`Negotiate`]. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Clone, Debug)] + pub enum Negotiated { + #[doc(hidden)] + Fallback(L), + #[doc(hidden)] + Upgraded(R), + } + + pin_project! { + pub struct Negotiating + where + L: Service, + R: Service<()>, + { + #[pin] + state: State, + left: L, + right: R, + } + } + + pin_project! { + #[project = StateProj] + enum State { + Eager { + #[pin] + future: FR, + dst: Option, + }, + Fallback { + #[pin] + future: FL, + }, + Upgrade { + #[pin] + future: FR, + } + } + } + + pin_project! { + #[project = NegotiatedProj] + pub enum NegotiatedFuture { + Fallback { + #[pin] + future: L + }, + Upgraded { + #[pin] + future: R + }, + } + } + + /// A builder to configure a `Negotiate`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + #[derive(Debug)] + pub struct Builder { + connect: C, + inspect: I, + fallback: L, + upgrade: R, + } + + #[derive(Debug)] + pub struct WantsConnect; + #[derive(Debug)] + pub struct WantsInspect; + #[derive(Debug)] + pub struct WantsFallback; + #[derive(Debug)] + pub struct WantsUpgrade; + + /// Start a builder to construct a `Negotiate` pool. + pub fn builder() -> Builder { + Builder { + connect: WantsConnect, + inspect: WantsInspect, + fallback: WantsFallback, + upgrade: WantsUpgrade, + } + } + + impl Builder { + /// Provide the initial connector. + pub fn connect(self, connect: CC) -> Builder { + Builder { + connect, + inspect: self.inspect, + fallback: self.fallback, + upgrade: self.upgrade, + } + } + + /// Provide the inspector that determines the result of the negotiation. + pub fn inspect(self, inspect: II) -> Builder { + Builder { + connect: self.connect, + inspect, + fallback: self.fallback, + upgrade: self.upgrade, + } + } + + /// Provide the layer to fallback to if negotiation fails. + pub fn fallback(self, fallback: LL) -> Builder { + Builder { + connect: self.connect, + inspect: self.inspect, + fallback, + upgrade: self.upgrade, + } + } + + /// Provide the layer to upgrade to if negotiation succeeds. + pub fn upgrade(self, upgrade: RR) -> Builder { + Builder { + connect: self.connect, + inspect: self.inspect, + fallback: self.fallback, + upgrade, + } + } + + /// Build the `Negotiate` pool. + pub fn build(self) -> Negotiate + where + C: Service, + C::Error: Into, + L: Layer>, + L::Service: Service + Clone, + >::Error: Into, + R: Layer>, + R::Service: Service<()> + Clone, + >::Error: Into, + I: Fn(&C::Response) -> bool + Clone, + { + let Builder { + connect, + inspect, + fallback, + upgrade, + } = self; + + let slot = Arc::new(Mutex::new(None)); + let wrapped = Inspector { + svc: connect, + inspect, + slot: slot.clone(), + }; + let left = fallback.layer(wrapped); + + let right = upgrade.layer(Inspected { slot }); + + Negotiate { left, right } + } + } + + impl Service for Negotiate + where + L: Service + Clone, + L::Error: Into, + R: Service<()> + Clone, + R::Error: Into, + { + type Response = Negotiated; + type Error = BoxError; + type Future = Negotiating; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.left.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Target) -> Self::Future { + let left = self.left.clone(); + Negotiating { + state: State::Eager { + future: self.right.call(()), + dst: Some(dst), + }, + // place clone, take original that we already polled-ready. + left: std::mem::replace(&mut self.left, left), + right: self.right.clone(), + } + } + } + + impl Future for Negotiating + where + L: Service, + L::Error: Into, + R: Service<()>, + R::Error: Into, + { + type Output = Result, BoxError>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // States: + // - `Eager`: try the "right" path first; on `UseOther` sentinel, fall back to left. + // - `Fallback`: try the left path; on `UseOther` sentinel, upgrade back to right. + // - `Upgrade`: retry the right path after a fallback. + // If all fail, give up. + let mut me = self.project(); + loop { + match me.state.as_mut().project() { + StateProj::Eager { future, dst } => match ready!(future.poll(cx)) { + Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))), + Err(err) => { + let err = err.into(); + if err.is::() { + let dst = dst.take().unwrap(); + let f = me.left.call(dst); + me.state.set(State::Fallback { future: f }); + continue; + } else { + return Poll::Ready(Err(err)); + } + } + }, + StateProj::Fallback { future } => match ready!(future.poll(cx)) { + Ok(out) => return Poll::Ready(Ok(Negotiated::Fallback(out))), + Err(err) => { + let err = err.into(); + if err.is::() { + let f = me.right.call(()); + me.state.set(State::Upgrade { future: f }); + continue; + } else { + return Poll::Ready(Err(err)); + } + } + }, + StateProj::Upgrade { future } => match ready!(future.poll(cx)) { + Ok(out) => return Poll::Ready(Ok(Negotiated::Upgraded(out))), + Err(err) => return Poll::Ready(Err(err.into())), + }, + } + } + } + } + + #[cfg(test)] + impl Negotiated { + // Could be useful? + pub(super) fn is_fallback(&self) -> bool { + matches!(self, Negotiated::Fallback(_)) + } + + pub(super) fn is_upgraded(&self) -> bool { + matches!(self, Negotiated::Upgraded(_)) + } + } + + impl Service for Negotiated + where + L: Service, + R: Service, + { + type Response = Res; + type Error = E; + type Future = NegotiatedFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + match self { + Negotiated::Fallback(ref mut s) => s.poll_ready(cx), + Negotiated::Upgraded(ref mut s) => s.poll_ready(cx), + } + } + + fn call(&mut self, req: Req) -> Self::Future { + match self { + Negotiated::Fallback(ref mut s) => NegotiatedFuture::Fallback { + future: s.call(req), + }, + Negotiated::Upgraded(ref mut s) => NegotiatedFuture::Upgraded { + future: s.call(req), + }, + } + } + } + + impl Future for NegotiatedFuture + where + L: Future, + R: Future, + { + type Output = Out; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match self.project() { + NegotiatedProj::Fallback { future } => future.poll(cx), + NegotiatedProj::Upgraded { future } => future.poll(cx), + } + } + } + + // ===== internal ===== + + pub struct Inspector { + svc: M, + inspect: I, + slot: Arc>>, + } + + pin_project! { + pub struct InspectFuture { + #[pin] + future: F, + inspect: I, + slot: Arc>>, + } + } + + impl Clone for Inspector { + fn clone(&self) -> Self { + Self { + svc: self.svc.clone(), + inspect: self.inspect.clone(), + slot: self.slot.clone(), + } + } + } + + impl Service for Inspector + where + M: Service, + M::Error: Into, + I: Clone + Fn(&S) -> bool, + { + type Response = M::Response; + type Error = BoxError; + type Future = InspectFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.svc.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Target) -> Self::Future { + InspectFuture { + future: self.svc.call(dst), + inspect: self.inspect.clone(), + slot: self.slot.clone(), + } + } + } + + impl Future for InspectFuture + where + F: Future>, + E: Into, + I: Fn(&S) -> bool, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let me = self.project(); + let s = ready!(me.future.poll(cx)).map_err(Into::into)?; + Poll::Ready(if (me.inspect)(&s) { + *me.slot.lock().unwrap() = Some(s); + Err(UseOther.into()) + } else { + Ok(s) + }) + } + } + + pub struct Inspected { + slot: Arc>>, + } + + impl Service for Inspected { + type Response = S; + type Error = BoxError; + type Future = std::future::Ready>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + if self.slot.lock().unwrap().is_some() { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(UseOther.into())) + } + } + + fn call(&mut self, _dst: Target) -> Self::Future { + let s = self + .slot + .lock() + .unwrap() + .take() + .ok_or_else(|| UseOther.into()); + std::future::ready(s) + } + } + + impl Clone for Inspected { + fn clone(&self) -> Inspected { + Inspected { + slot: self.slot.clone(), + } + } + } + + #[derive(Debug)] + struct UseOther; + + impl std::fmt::Display for UseOther { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("sentinel error; using other") + } + } + + impl std::error::Error for UseOther {} +} + +#[cfg(test)] +mod tests { + use futures_util::future; + use tower_service::Service; + use tower_test::assert_request_eq; + + #[tokio::test] + async fn not_negotiated_falls_back_to_left() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + + let mut negotiate = super::builder() + .connect(mock_svc) + .inspect(|_: &&str| false) + .fallback(layer_fn(|s| s)) + .upgrade(layer_fn(|s| s)) + .build(); + + crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx)) + .await + .unwrap(); + + let fut = negotiate.call(()); + let nsvc = future::join(fut, async move { + assert_request_eq!(handle, ()).send_response("one"); + }) + .await + .0 + .expect("call"); + assert!(nsvc.is_fallback()); + } + + #[tokio::test] + async fn negotiated_uses_right() { + let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>(); + + let mut negotiate = super::builder() + .connect(mock_svc) + .inspect(|_: &&str| true) + .fallback(layer_fn(|s| s)) + .upgrade(layer_fn(|s| s)) + .build(); + + crate::common::future::poll_fn(|cx| negotiate.poll_ready(cx)) + .await + .unwrap(); + + let fut = negotiate.call(()); + let nsvc = future::join(fut, async move { + assert_request_eq!(handle, ()).send_response("one"); + }) + .await + .0 + .expect("call"); + + assert!(nsvc.is_upgraded()); + } + + fn layer_fn(f: F) -> LayerFn { + LayerFn(f) + } + + #[derive(Clone)] + struct LayerFn(F); + + impl tower_layer::Layer for LayerFn + where + F: Fn(S) -> Out, + { + type Service = Out; + fn layer(&self, inner: S) -> Self::Service { + (self.0)(inner) + } + } +} From d562e10cbb218d1847c9b5a5c9111c9e3f90c318 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Jun 2025 15:27:39 -0400 Subject: [PATCH 4/5] feat(pool): add a Map pool service type --- src/client/pool/map.rs | 213 +++++++++++++++++++++++++++++++++++++++++ src/client/pool/mod.rs | 1 + 2 files changed, 214 insertions(+) create mode 100644 src/client/pool/map.rs diff --git a/src/client/pool/map.rs b/src/client/pool/map.rs new file mode 100644 index 00000000..895d746b --- /dev/null +++ b/src/client/pool/map.rs @@ -0,0 +1,213 @@ +//! Map pool utilities +//! +//! The map isn't a typical `Service`, but rather stand-alone type that can map +//! requests to a key and service factory. This is because the service is more +//! of a router, and cannot determine which inner service to check for +//! backpressure since it's not know until the request is made. +//! +//! The map implementation allows customization of extracting a key, and how to +//! construct a MakeService for that key. +//! +//! # Example +//! +//! ```rust,ignore +//! # async fn run() { +//! # use hyper_util::client::pool; +//! # let req = http::Request::new(()); +//! # let some_http1_connector = || { +//! # tower::service::service_fn(|_req| async { Ok::<_, &'static str>(()) }) +//! # }; +//! let mut map = pool::map::Map::builder() +//! .keys(|uri| (uri.scheme().clone(), uri.authority().clone())) +//! .values(|_| { +//! some_http1_connector() +//! }) +//! .build(); +//! +//! let resp = map.service(req.uri()).call(req).await; +//! # } +//! ``` + +use std::collections::HashMap; + +// expose the documentation +#[cfg(docsrs)] +pub use self::builder::Builder; + +/// A map caching `MakeService`s per key. +/// +/// Create one with the [`Map::builder()`]. +pub struct Map +where + T: target::Target, +{ + map: HashMap, + targeter: T, +} + +// impl Map + +impl Map { + /// Create a [`Builder`] to configure a new `Map`. + pub fn builder() -> builder::Builder + { + builder::Builder::new() + } +} + +impl Map +where + T: target::Target, +{ + fn new(targeter: T) -> Self { + Map { + map: HashMap::new(), + targeter, + } + } +} + +impl Map +where + T: target::Target, + T::Key: Eq + std::hash::Hash, +{ + /// Get a service after extracting the key from `req`. + pub fn service(&mut self, req: &Req) -> &mut T::Service { + let key = self.targeter.key(req); + self.map + .entry(key) + .or_insert_with(|| self.targeter.service(req)) + } +} + +// sealed and unnameable for now +mod target { + pub trait Target { + type Key; + type Service; + + fn key(&self, dst: &Dst) -> Self::Key; + fn service(&self, dst: &Dst) -> Self::Service; + } +} + +// sealed and unnameable for now +mod builder { + use std::marker::PhantomData; + + /// A builder to configure a `Map`. + /// + /// # Unnameable + /// + /// This type is normally unnameable, forbidding naming of the type within + /// code. The type is exposed in the documentation to show which methods + /// can be publicly called. + pub struct Builder { + _dst: PhantomData, + keys: K, + svcs: S, + } + + pub struct WantsKeyer; + pub struct WantsServiceMaker; + + pub enum StartHere {} + + pub struct Built { + keys: K, + svcs: S, + } + + impl Builder { + pub(super) fn new() -> Self { + Builder { + _dst: PhantomData, + keys: WantsKeyer, + svcs: WantsServiceMaker, + } + } + } + + impl Builder { + /// Provide a closure that extracts a pool key for the destination. + pub fn keys(self, keyer: K) -> Builder + where + K: Fn(&Dst) -> KK, + { + Builder { + _dst: PhantomData, + keys: keyer, + svcs: self.svcs, + } + } + } + + impl Builder { + /// Provide a closure to create a new `MakeService` for the destination. + pub fn values(self, svcs: S) -> Builder + where + S: Fn(&Dst) -> SS, + { + Builder { + _dst: PhantomData, + keys: self.keys, + svcs, + } + } + } + + impl Builder + where + Built: super::target::Target, + as super::target::Target>::Key: Eq + std::hash::Hash, + { + /// Build the `Map` pool. + pub fn build(self) -> super::Map, Dst> { + super::Map::new(Built { + keys: self.keys, + svcs: self.svcs, + }) + } + } + + impl super::target::Target for StartHere { + type Key = StartHere; + type Service = StartHere; + + fn key(&self, _: &StartHere) -> Self::Key { + match *self {} + } + + fn service(&self, _: &StartHere) -> Self::Service { + match *self {} + } + } + + impl super::target::Target for Built + where + K: Fn(&Dst) -> KK, + S: Fn(&Dst) -> SS, + KK: Eq + std::hash::Hash, + { + type Key = KK; + type Service = SS; + + fn key(&self, dst: &Dst) -> Self::Key { + (self.keys)(dst) + } + + fn service(&self, dst: &Dst) -> Self::Service { + (self.svcs)(dst) + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn smoke() { + let mut pool = super::Map::builder().keys(|_| "a").values(|_| "b").build(); + pool.service(&"hello"); + } +} diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index 8b2c3bff..9f1a8fce 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -1,5 +1,6 @@ //! Composable pool services pub mod cache; +pub mod map; pub mod negotiate; pub mod singleton; From 7b668a657215423eac27ab148b068ab86a1dece1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Jun 2025 15:27:39 -0400 Subject: [PATCH 5/5] pools wip --- Cargo.toml | 9 +- examples/client.rs | 172 ++++++++++++++++++++++++++++++++------ examples/client_legacy.rs | 37 ++++++++ src/client/pool/expire.rs | 3 + src/lib.rs | 6 +- 5 files changed, 198 insertions(+), 29 deletions(-) create mode 100644 examples/client_legacy.rs create mode 100644 src/client/pool/expire.rs diff --git a/Cargo.toml b/Cargo.toml index f793258c..b35d20b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } tokio-test = "0.4" +tower = { version = "0.5", features = ["util"] } tower-test = "0.4" pretty_env_logger = "0.5" @@ -56,7 +57,7 @@ system-configuration = { version = "0.6.1", optional = true } windows-registry = { version = "0.5", optional = true } [features] -default = [] +default = ["client", "client-pool"] # Shorthand to enable everything full = [ @@ -77,7 +78,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] -client-pool = ["dep:futures-util", "dep:tower-layer"] +client-pool = ["tokio/sync", "dep:futures-util", "dep:tower-layer"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] @@ -99,6 +100,10 @@ __internal_happy_eyeballs_tests = [] [[example]] name = "client" +required-features = ["client-legacy", "client-pool", "http1", "tokio"] + +[[example]] +name = "client_legacy" required-features = ["client-legacy", "http1", "tokio"] [[example]] diff --git a/examples/client.rs b/examples/client.rs index 04defac0..d40b357b 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,37 +1,157 @@ -use std::env; +use tower::ServiceExt; +use tower_service::Service; -use http_body_util::Empty; -use hyper::Request; -use hyper_util::client::legacy::{connect::HttpConnector, Client}; +use hyper_util::client::pool; #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - let url = match env::args().nth(1) { - Some(url) => url, - None => { - eprintln!("Usage: client "); - return Ok(()); - } - }; - - // HTTPS requires picking a TLS implementation, so give a better - // warning if the user tries to request an 'https' URL. - let url = url.parse::()?; - if url.scheme_str() != Some("http") { - eprintln!("This example only works with 'http' URLs."); - return Ok(()); +async fn main() -> Result<(), Box> { + send_nego().await +} + +async fn send_h1() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); + + let http1 = tcp.and_then(|conn| { + Box::pin(async move { + let (mut tx, c) = hyper::client::conn::http1::handshake::< + _, + http_body_util::Empty, + >(conn) + .await?; + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + }) + }); + + let mut p = pool::Cache::new(http1).build(); + + let mut c = p.call(http::Uri::from_static("http://hyper.rs")).await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); + + Ok(()) +} + +async fn send_h2() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); + + let http2 = tcp.and_then(|conn| { + Box::pin(async move { + let (mut tx, c) = hyper::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(hyper_util::rt::TokioExecutor::new(), conn) + .await?; + println!("connected"); + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + }) + }); + + let mut p = pool::Singleton::new(http2); + + for _ in 0..5 { + let mut c = p + .call(http::Uri::from_static("http://localhost:3000")) + .await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); } - let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); + Ok(()) +} - let req = Request::builder() - .uri(url) - .body(Empty::::new())?; +async fn send_nego() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); - let resp = client.request(req).await?; + let http1 = tower::layer::layer_fn(|tcp| { + tower::service_fn(move |dst| { + let inner = tcp.call(dst); + async move { + let conn = inner.await?; + let (mut tx, c) = hyper::client::conn::http1::handshake::< + _, + http_body_util::Empty, + >(conn) + .await?; + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + } + }) + }); - eprintln!("{:?} {:?}", resp.version(), resp.status()); - eprintln!("{:#?}", resp.headers()); + let http2 = tower::layer::layer_fn(|tcp| { + tower::service_fn(move |dst| { + let inner = tcp.call(dst); + async move { + let conn = inner.await?; + let (mut tx, c) = hyper::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(hyper_util::rt::TokioExecutor::new(), conn) + .await?; + println!("connected"); + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + } + }) + }); + + let mut svc = pool::negotiate(tcp, |_| false, http1, http2); + + for _ in 0..5 { + let mut c = svc + .call(http::Uri::from_static("http://localhost:3000")) + .await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); + } Ok(()) } diff --git a/examples/client_legacy.rs b/examples/client_legacy.rs new file mode 100644 index 00000000..04defac0 --- /dev/null +++ b/examples/client_legacy.rs @@ -0,0 +1,37 @@ +use std::env; + +use http_body_util::Empty; +use hyper::Request; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let url = match env::args().nth(1) { + Some(url) => url, + None => { + eprintln!("Usage: client "); + return Ok(()); + } + }; + + // HTTPS requires picking a TLS implementation, so give a better + // warning if the user tries to request an 'https' URL. + let url = url.parse::()?; + if url.scheme_str() != Some("http") { + eprintln!("This example only works with 'http' URLs."); + return Ok(()); + } + + let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); + + let req = Request::builder() + .uri(url) + .body(Empty::::new())?; + + let resp = client.request(req).await?; + + eprintln!("{:?} {:?}", resp.version(), resp.status()); + eprintln!("{:#?}", resp.headers()); + + Ok(()) +} diff --git a/src/client/pool/expire.rs b/src/client/pool/expire.rs new file mode 100644 index 00000000..281c7253 --- /dev/null +++ b/src/client/pool/expire.rs @@ -0,0 +1,3 @@ +pub struct Expire { + +} diff --git a/src/lib.rs b/src/lib.rs index ac8f89b1..7a9c3c5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,11 @@ mod common; pub mod rt; #[cfg(feature = "server")] pub mod server; -#[cfg(any(feature = "service", feature = "client-legacy"))] +#[cfg(any( + feature = "service", + feature = "client-legacy", + feature = "client-pool", +))] pub mod service; mod error;