Skip to content

Commit

Permalink
feat(client): Make clients able to use non-Send executor (#3184)
Browse files Browse the repository at this point in the history
Closes #3017

BREAKING CHANGE: `client::conn::http2` types now use another generic for an `Executor`.
  Code that names `Connection` needs to include the additional generic parameter.
  • Loading branch information
Ruben2424 authored Jun 29, 2023
1 parent 9169a7c commit d977f20
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 213 deletions.
171 changes: 157 additions & 14 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
#![deny(warnings)]

use http_body_util::BodyExt;
use hyper::server::conn::http2;
use std::cell::Cell;
use std::net::SocketAddr;
use std::rc::Rc;
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;

use hyper::body::{Body as HttpBody, Bytes, Frame};
use hyper::service::service_fn;
use hyper::Request;
use hyper::{Error, Response};
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use tokio::net::TcpStream;

struct Body {
// Our Body type is !Send and !Sync:
Expand Down Expand Up @@ -40,28 +45,57 @@ impl HttpBody for Body {
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() {
pretty_env_logger::init();

// Configure a runtime that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, run())
let server = thread::spawn(move || {
// Configure a runtime for the server that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, server()).unwrap();
});

let client = thread::spawn(move || {
// Configure a runtime for the client that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local
.block_on(
&rt,
client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
)
.unwrap();
});

server.join().unwrap();
client.join().unwrap();
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
async fn server() -> Result<(), Box<dyn std::error::Error>> {
let mut stdout = io::stdout();

let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
// Using a !Send request counter is fine on 1 thread...
let counter = Rc::new(Cell::new(0));

let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);

stdout
.write_all(format!("Listening on http://{}", addr).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

loop {
let (stream, _) = listener.accept().await?;

Expand All @@ -80,12 +114,121 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
.serve_connection(stream, service)
.await
{
println!("Error serving connection: {:?}", err);
let mut stdout = io::stdout();
stdout
.write_all(format!("Error serving connection: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});
}
}

struct IOTypeNotSend {
_marker: PhantomData<*const ()>,
stream: TcpStream,
}

impl IOTypeNotSend {
fn new(stream: TcpStream) -> Self {
Self {
_marker: PhantomData,
stream,
}
}
}

impl AsyncWrite for IOTypeNotSend {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}

impl AsyncRead for IOTypeNotSend {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}

async fn client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let stream = IOTypeNotSend::new(stream);

let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?;

tokio::task::spawn_local(async move {
if let Err(err) = conn.await {
let mut stdout = io::stdout();
stdout
.write_all(format!("Connection failed: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});

let authority = url.authority().unwrap().clone();

// Make 4 requests
for _ in 0..4 {
let req = Request::builder()
.uri(url.clone())
.header(hyper::header::HOST, authority.as_str())
.body(Body::from("test".to_string()))?;

let mut res = sender.send_request(req).await?;

let mut stdout = io::stdout();
stdout
.write_all(format!("Response: {}\n", res.status()).as_bytes())
.await
.unwrap();
stdout
.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

// Print the response body
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
stdout.write_all(&chunk).await.unwrap();
}
}
stdout.write_all(b"\n-----------------\n").await.unwrap();
stdout.flush().await.unwrap();
}
Ok(())
}

// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor.
//
// Since the Server needs to spawn some background tasks, we needed
Expand Down
77 changes: 43 additions & 34 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! HTTP/2 client connections
use std::error::Error as StdError;
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
Expand All @@ -12,12 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::rt::{Executor, Timer};
use crate::rt::bounds::ExecutorClient;
use crate::rt::Timer;

/// The sender side of an established connection.
pub struct SendRequest<B> {
Expand All @@ -37,20 +35,22 @@ impl<B> Clone for SendRequest<B> {
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
pub struct Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Send + 'static,
T: AsyncRead + AsyncWrite + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
}

/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a handshake future.
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub struct Builder<Ex> {
pub(super) exec: Ex,
pub(super) timer: Time,
h2_builder: proto::h2::client::Config,
}
Expand All @@ -59,13 +59,16 @@ pub struct Builder {
///
/// This is a shortcut for `Builder::new().handshake(io)`.
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
pub async fn handshake<E, T, B>(
exec: E,
io: T,
) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin + Clone,
{
Builder::new(exec).handshake(io).await
}
Expand Down Expand Up @@ -188,12 +191,13 @@ impl<B> fmt::Debug for SendRequest<B> {

// ===== impl Connection

impl<T, B> Connection<T, B>
impl<T, B, E> Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin,
{
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
Expand All @@ -209,22 +213,26 @@ where
}
}

impl<T, B> fmt::Debug for Connection<T, B>
impl<T, B, E> fmt::Debug for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}

impl<T, B> Future for Connection<T, B>
impl<T, B, E> Future for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
{
type Output = crate::Result<()>;

Expand All @@ -239,22 +247,22 @@ where

// ===== impl Builder

impl Builder {
impl<Ex> Builder<Ex>
where
Ex: Clone,
{
/// Creates a new connection builder.
#[inline]
pub fn new<E>(exec: E) -> Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
pub fn new(exec: Ex) -> Builder<Ex> {
Builder {
exec: Exec::new(exec),
exec,
timer: Time::Empty,
h2_builder: Default::default(),
}
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
where
M: Timer + Send + Sync + 'static,
{
Expand Down Expand Up @@ -388,12 +396,13 @@ impl Builder {
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
Ex: ExecutorClient<B, T> + Unpin,
{
let opts = self.clone();

Expand Down
Loading

0 comments on commit d977f20

Please sign in to comment.