From 56c3cd560bc10671d3d8b638f3f17a304f920c6b Mon Sep 17 00:00:00 2001 From: Yury Yarashevich <yura.yaroshevich@gmail.com> Date: Mon, 1 Jul 2024 15:18:46 +0200 Subject: [PATCH] feat(client): remove `Send +Sync` bounds requirement of `http2::Connection` executor (#3682) --- src/client/conn/http2.rs | 218 ++++++++++++++++++++++++++++++++++++++- src/proto/h2/client.rs | 2 +- 2 files changed, 218 insertions(+), 2 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index c7938e303b..94c56e6e49 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -230,7 +230,7 @@ where B::Data: Send, E: Unpin, B::Error: Into<Box<dyn Error + Send + Sync>>, - E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec<B, T> + Unpin, { type Output = crate::Result<()>; @@ -457,3 +457,219 @@ where } } } + +#[cfg(test)] +mod tests { + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_non_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor; + + impl<F> crate::rt::Executor<F> for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty<bytes::Bytes>, + >(LocalTokioExecutor, io) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData<std::rc::Rc<()>>, + } + + impl<F> crate::rt::Executor<F> for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( + LocalTokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData<std::cell::Cell<()>>, + } + + impl<F> crate::rt::Executor<F> for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( + LocalTokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor; + + impl<F> crate::rt::Executor<F> for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty<bytes::Bytes>, + >(TokioExecutor, io) + .await + .unwrap(); + + tokio::task::spawn(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { + // !Send, !Sync + _x: std::marker::PhantomData<std::rc::Rc<()>>, + } + + impl<F> crate::rt::Executor<F> for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( + TokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { + // !Sync + _x: std::marker::PhantomData<std::cell::Cell<()>>, + } + + impl<F> crate::rt::Executor<F> for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( + TokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } +} diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 6675142118..53f36a7f85 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -611,7 +611,7 @@ where B: Body + 'static + Unpin, B::Data: Send, B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec<B, T> + Unpin, T: Read + Write + Unpin, { type Output = crate::Result<Dispatched>;