Skip to content

Commit

Permalink
Make Future implementation on Connection unconditional on executor be…
Browse files Browse the repository at this point in the history
…ing Send + Sync.
  • Loading branch information
mstyura committed Jun 5, 2024
1 parent 0eb1b6c commit 770c1ca
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 2 deletions.
170 changes: 169 additions & 1 deletion src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
B::Data: Send,
E: Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + Unpin
{
type Output = crate::Result<()>;

Expand Down Expand Up @@ -457,3 +457,171 @@ where
}
}
}

#[cfg(test)]
mod tests {

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_sync_executor_of_non_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor;

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(LocalTokioExecutor, io).await.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn not_send_not_sync_executor_of_not_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor {
_x: std::marker::PhantomData<std::rc::Rc<()>>
}

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(LocalTokioExecutor { _x: Default::default() }, io).await.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_not_sync_executor_of_not_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor {
_x: std::marker::PhantomData<std::cell::Cell<()>>
}

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(LocalTokioExecutor { _x: Default::default() }, io).await.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor;

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(TokioExecutor, io).await.unwrap();

tokio::task::spawn(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn not_send_not_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor { // !Send, !Sync
_x: std::marker::PhantomData<std::rc::Rc<()>>
}

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(TokioExecutor { _x: Default::default() }, io).await.unwrap();

tokio::task::spawn_local(async move { // can't use spawn here because when executor is !Send
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_not_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor { // !Sync
_x: std::marker::PhantomData<std::cell::Cell<()>>
}

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(TokioExecutor { _x: Default::default() }, io).await.unwrap();

tokio::task::spawn_local(async move { // can't use spawn here because when executor is !Send
conn.await.unwrap();
});
}
}
}
2 changes: 1 addition & 1 deletion src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ where
B: Body + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
T: Read + Write + Unpin,
{
type Output = crate::Result<Dispatched>;
Expand Down

0 comments on commit 770c1ca

Please sign in to comment.