Skip to content

Commit f9f65b7

Browse files
authored
feat(rt): replace IO traits with hyper::rt ones (hyperium#3230)
This replaces the usage of `tokio::io::{AsyncRead, AsyncWrite}` in hyper's public API with new traits in the `hyper::rt` module. Closes hyperium#3110 BREAKING CHANGE: Any IO transport type provided must not implement `hyper::rt::{Read, Write}` instead of `tokio::io` traits. You can grab a helper type from `hyper-util` to wrap Tokio types, or implement the traits yourself, if it's a custom type.
1 parent f4b5130 commit f9f65b7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1015
-292
lines changed

benches/end_to_end.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
extern crate test;
55
mod support;
66

7-
// TODO: Reimplement Opts::bench using hyper::server::conn and hyper::client::conn
8-
// (instead of Server and HttpClient).
7+
// TODO: Reimplement parallel for HTTP/1
98

109
use std::convert::Infallible;
1110
use std::net::SocketAddr;
@@ -315,7 +314,8 @@ impl Opts {
315314

316315
let mut client = rt.block_on(async {
317316
if self.http2 {
318-
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
317+
let tcp = tokio::net::TcpStream::connect(&addr).await.unwrap();
318+
let io = support::TokioIo::new(tcp);
319319
let (tx, conn) = hyper::client::conn::http2::Builder::new(support::TokioExecutor)
320320
.initial_stream_window_size(self.http2_stream_window)
321321
.initial_connection_window_size(self.http2_conn_window)
@@ -328,7 +328,8 @@ impl Opts {
328328
} else if self.parallel_cnt > 1 {
329329
todo!("http/1 parallel >1");
330330
} else {
331-
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
331+
let tcp = tokio::net::TcpStream::connect(&addr).await.unwrap();
332+
let io = support::TokioIo::new(tcp);
332333
let (tx, conn) = hyper::client::conn::http1::Builder::new()
333334
.handshake(io)
334335
.await
@@ -414,14 +415,15 @@ fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
414415
let opts = opts.clone();
415416
rt.spawn(async move {
416417
while let Ok((sock, _)) = listener.accept().await {
418+
let io = support::TokioIo::new(sock);
417419
if opts.http2 {
418420
tokio::spawn(
419421
hyper::server::conn::http2::Builder::new(support::TokioExecutor)
420422
.initial_stream_window_size(opts.http2_stream_window)
421423
.initial_connection_window_size(opts.http2_conn_window)
422424
.adaptive_window(opts.http2_adaptive_window)
423425
.serve_connection(
424-
sock,
426+
io,
425427
service_fn(move |req: Request<hyper::body::Incoming>| async move {
426428
let mut req_body = req.into_body();
427429
while let Some(_chunk) = req_body.frame().await {}
@@ -433,7 +435,7 @@ fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
433435
);
434436
} else {
435437
tokio::spawn(hyper::server::conn::http1::Builder::new().serve_connection(
436-
sock,
438+
io,
437439
service_fn(move |req: Request<hyper::body::Incoming>| async move {
438440
let mut req_body = req.into_body();
439441
while let Some(_chunk) = req_body.frame().await {}

benches/pipeline.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
extern crate test;
55

6+
mod support;
7+
68
use std::convert::Infallible;
79
use std::io::{Read, Write};
810
use std::net::{SocketAddr, TcpStream};
@@ -40,11 +42,12 @@ fn hello_world_16(b: &mut test::Bencher) {
4042
rt.spawn(async move {
4143
loop {
4244
let (stream, _addr) = listener.accept().await.expect("accept");
45+
let io = support::TokioIo::new(stream);
4346

4447
http1::Builder::new()
4548
.pipeline_flush(true)
4649
.serve_connection(
47-
stream,
50+
io,
4851
service_fn(|_| async {
4952
Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(
5053
"Hello, World!",

benches/server.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
extern crate test;
55

6+
mod support;
7+
68
use std::io::{Read, Write};
79
use std::net::{SocketAddr, TcpListener, TcpStream};
810
use std::sync::mpsc;
@@ -38,10 +40,11 @@ macro_rules! bench_server {
3840
rt.spawn(async move {
3941
loop {
4042
let (stream, _) = listener.accept().await.expect("accept");
43+
let io = support::TokioIo::new(stream);
4144

4245
http1::Builder::new()
4346
.serve_connection(
44-
stream,
47+
io,
4548
service_fn(|_| async {
4649
Ok::<_, hyper::Error>(
4750
Response::builder()

benches/support/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
mod tokiort;
2-
pub use tokiort::{TokioExecutor, TokioTimer};
2+
pub use tokiort::{TokioExecutor, TokioIo, TokioTimer};

benches/support/tokiort.rs

+146
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,149 @@ impl TokioSleep {
8888
self.project().inner.as_mut().reset(deadline.into());
8989
}
9090
}
91+
92+
pin_project! {
93+
#[derive(Debug)]
94+
pub struct TokioIo<T> {
95+
#[pin]
96+
inner: T,
97+
}
98+
}
99+
100+
impl<T> TokioIo<T> {
101+
pub fn new(inner: T) -> Self {
102+
Self { inner }
103+
}
104+
105+
pub fn inner(self) -> T {
106+
self.inner
107+
}
108+
}
109+
110+
impl<T> hyper::rt::Read for TokioIo<T>
111+
where
112+
T: tokio::io::AsyncRead,
113+
{
114+
fn poll_read(
115+
self: Pin<&mut Self>,
116+
cx: &mut Context<'_>,
117+
mut buf: hyper::rt::ReadBufCursor<'_>,
118+
) -> Poll<Result<(), std::io::Error>> {
119+
let n = unsafe {
120+
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
121+
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
122+
Poll::Ready(Ok(())) => tbuf.filled().len(),
123+
other => return other,
124+
}
125+
};
126+
127+
unsafe {
128+
buf.advance(n);
129+
}
130+
Poll::Ready(Ok(()))
131+
}
132+
}
133+
134+
impl<T> hyper::rt::Write for TokioIo<T>
135+
where
136+
T: tokio::io::AsyncWrite,
137+
{
138+
fn poll_write(
139+
self: Pin<&mut Self>,
140+
cx: &mut Context<'_>,
141+
buf: &[u8],
142+
) -> Poll<Result<usize, std::io::Error>> {
143+
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
144+
}
145+
146+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
147+
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
148+
}
149+
150+
fn poll_shutdown(
151+
self: Pin<&mut Self>,
152+
cx: &mut Context<'_>,
153+
) -> Poll<Result<(), std::io::Error>> {
154+
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
155+
}
156+
157+
fn is_write_vectored(&self) -> bool {
158+
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
159+
}
160+
161+
fn poll_write_vectored(
162+
self: Pin<&mut Self>,
163+
cx: &mut Context<'_>,
164+
bufs: &[std::io::IoSlice<'_>],
165+
) -> Poll<Result<usize, std::io::Error>> {
166+
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
167+
}
168+
}
169+
170+
impl<T> tokio::io::AsyncRead for TokioIo<T>
171+
where
172+
T: hyper::rt::Read,
173+
{
174+
fn poll_read(
175+
self: Pin<&mut Self>,
176+
cx: &mut Context<'_>,
177+
tbuf: &mut tokio::io::ReadBuf<'_>,
178+
) -> Poll<Result<(), std::io::Error>> {
179+
//let init = tbuf.initialized().len();
180+
let filled = tbuf.filled().len();
181+
let sub_filled = unsafe {
182+
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
183+
184+
match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
185+
Poll::Ready(Ok(())) => buf.filled().len(),
186+
other => return other,
187+
}
188+
};
189+
190+
let n_filled = filled + sub_filled;
191+
// At least sub_filled bytes had to have been initialized.
192+
let n_init = sub_filled;
193+
unsafe {
194+
tbuf.assume_init(n_init);
195+
tbuf.set_filled(n_filled);
196+
}
197+
198+
Poll::Ready(Ok(()))
199+
}
200+
}
201+
202+
impl<T> tokio::io::AsyncWrite for TokioIo<T>
203+
where
204+
T: hyper::rt::Write,
205+
{
206+
fn poll_write(
207+
self: Pin<&mut Self>,
208+
cx: &mut Context<'_>,
209+
buf: &[u8],
210+
) -> Poll<Result<usize, std::io::Error>> {
211+
hyper::rt::Write::poll_write(self.project().inner, cx, buf)
212+
}
213+
214+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
215+
hyper::rt::Write::poll_flush(self.project().inner, cx)
216+
}
217+
218+
fn poll_shutdown(
219+
self: Pin<&mut Self>,
220+
cx: &mut Context<'_>,
221+
) -> Poll<Result<(), std::io::Error>> {
222+
hyper::rt::Write::poll_shutdown(self.project().inner, cx)
223+
}
224+
225+
fn is_write_vectored(&self) -> bool {
226+
hyper::rt::Write::is_write_vectored(&self.inner)
227+
}
228+
229+
fn poll_write_vectored(
230+
self: Pin<&mut Self>,
231+
cx: &mut Context<'_>,
232+
bufs: &[std::io::IoSlice<'_>],
233+
) -> Poll<Result<usize, std::io::Error>> {
234+
hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
235+
}
236+
}

examples/client.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use hyper::Request;
88
use tokio::io::{self, AsyncWriteExt as _};
99
use tokio::net::TcpStream;
1010

11+
#[path = "../benches/support/mod.rs"]
12+
mod support;
13+
use support::TokioIo;
14+
1115
// A simple type alias so as to DRY.
1216
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1317

@@ -40,8 +44,9 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
4044
let port = url.port_u16().unwrap_or(80);
4145
let addr = format!("{}:{}", host, port);
4246
let stream = TcpStream::connect(addr).await?;
47+
let io = TokioIo::new(stream);
4348

44-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
49+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4550
tokio::task::spawn(async move {
4651
if let Err(err) = conn.await {
4752
println!("Connection failed: {:?}", err);

examples/client_json.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use hyper::{body::Buf, Request};
77
use serde::Deserialize;
88
use tokio::net::TcpStream;
99

10+
#[path = "../benches/support/mod.rs"]
11+
mod support;
12+
use support::TokioIo;
13+
1014
// A simple type alias so as to DRY.
1115
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1216

@@ -29,8 +33,9 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
2933
let addr = format!("{}:{}", host, port);
3034

3135
let stream = TcpStream::connect(addr).await?;
36+
let io = TokioIo::new(stream);
3237

33-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
38+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
3439
tokio::task::spawn(async move {
3540
if let Err(err) = conn.await {
3641
println!("Connection failed: {:?}", err);

examples/echo.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{body::Body, Method, Request, Response, StatusCode};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
/// This is our service handler. It receives a Request, routes on its
1418
/// path, and returns a Future of a Response.
1519
async fn echo(
@@ -92,10 +96,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
9296
println!("Listening on http://{}", addr);
9397
loop {
9498
let (stream, _) = listener.accept().await?;
99+
let io = TokioIo::new(stream);
95100

96101
tokio::task::spawn(async move {
97102
if let Err(err) = http1::Builder::new()
98-
.serve_connection(stream, service_fn(echo))
103+
.serve_connection(io, service_fn(echo))
99104
.await
100105
{
101106
println!("Error serving connection: {:?}", err);

examples/gateway.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ use hyper::{server::conn::http1, service::service_fn};
44
use std::net::SocketAddr;
55
use tokio::net::{TcpListener, TcpStream};
66

7+
#[path = "../benches/support/mod.rs"]
8+
mod support;
9+
use support::TokioIo;
10+
711
#[tokio::main]
812
async fn main() -> Result<(), Box<dyn std::error::Error>> {
913
pretty_env_logger::init();
@@ -20,6 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2024

2125
loop {
2226
let (stream, _) = listener.accept().await?;
27+
let io = TokioIo::new(stream);
2328

2429
// This is the `Service` that will handle the connection.
2530
// `service_fn` is a helper to convert a function that
@@ -42,9 +47,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4247

4348
async move {
4449
let client_stream = TcpStream::connect(addr).await.unwrap();
50+
let io = TokioIo::new(client_stream);
4551

46-
let (mut sender, conn) =
47-
hyper::client::conn::http1::handshake(client_stream).await?;
52+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4853
tokio::task::spawn(async move {
4954
if let Err(err) = conn.await {
5055
println!("Connection failed: {:?}", err);
@@ -56,10 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5661
});
5762

5863
tokio::task::spawn(async move {
59-
if let Err(err) = http1::Builder::new()
60-
.serve_connection(stream, service)
61-
.await
62-
{
64+
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
6365
println!("Failed to serve the connection: {:?}", err);
6466
}
6567
});

examples/hello.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{Request, Response};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
// An async function that consumes a request, does nothing with it and returns a
1418
// response.
1519
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
@@ -35,7 +39,10 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3539
// has work to do. In this case, a connection arrives on the port we are listening on and
3640
// the task is woken up, at which point the task is then put back on a thread, and is
3741
// driven forward by the runtime, eventually yielding a TCP stream.
38-
let (stream, _) = listener.accept().await?;
42+
let (tcp, _) = listener.accept().await?;
43+
// Use an adapter to access something implementing `tokio::io` traits as if they implement
44+
// `hyper::rt` IO traits.
45+
let io = TokioIo::new(tcp);
3946

4047
// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
4148
// current task without waiting for the processing of the HTTP1 connection we just received
@@ -44,7 +51,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4451
// Handle the connection from the client using HTTP1 and pass any
4552
// HTTP requests received on that connection to the `hello` function
4653
if let Err(err) = http1::Builder::new()
47-
.serve_connection(stream, service_fn(hello))
54+
.serve_connection(io, service_fn(hello))
4855
.await
4956
{
5057
println!("Error serving connection: {:?}", err);

0 commit comments

Comments
 (0)