From 9db5b12a64220c34cdd0e604f37f0609cc6e26f3 Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Tue, 9 Aug 2022 21:23:07 +0200 Subject: [PATCH 1/6] reenable benches/end_to_end.rs --- benches/end_to_end.rs | 761 +++++++++++++++++++++--------------------- 1 file changed, 388 insertions(+), 373 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 41dbde2ab0..9c58d2dc92 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -1,385 +1,400 @@ #![feature(test)] -#![deny(warnings)] - extern crate test; -// TODO: Reimplement Opts::bench using hyper::server::conn and hyper::client::conn -// (instead of Server and HttpClient). - -// use std::net::SocketAddr; +use test::Bencher; -// use futures_util::future::join_all; - -// use hyper::client::HttpConnector; -// use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server}; +use hyper::body::HttpBody; +use hyper::client::conn::{Builder, SendRequest}; +use hyper::{server::conn::Http, service::service_fn}; +use hyper::{Body, Method, Request, Response}; +use std::convert::Infallible; +use std::net::SocketAddr; +use tokio::net::TcpListener; +use tokio::net::TcpStream; // // HTTP1 - -// #[bench] -// fn http1_consecutive_x1_empty(b: &mut test::Bencher) { -// opts().bench(b) -// } - -// #[bench] -// fn http1_consecutive_x1_req_10b(b: &mut test::Bencher) { -// opts() -// .method(Method::POST) -// .request_body(&[b's'; 10]) -// .bench(b) -// } - -// #[bench] -// fn http1_consecutive_x1_both_100kb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 100]; -// opts() -// .method(Method::POST) -// .request_body(body) -// .response_body(body) -// .bench(b) -// } - -// #[bench] -// fn http1_consecutive_x1_both_10mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 10]; -// opts() -// .method(Method::POST) -// .request_body(body) -// .response_body(body) -// .bench(b) -// } - -// #[bench] -// fn http1_parallel_x10_empty(b: &mut test::Bencher) { -// opts().parallel(10).bench(b) -// } - -// #[bench] -// fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 10]; -// opts() -// .parallel(10) -// .method(Method::POST) -// .request_body(body) -// .bench(b) -// } - -// #[bench] -// fn http1_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 10]; -// opts() -// .parallel(10) -// .method(Method::POST) -// .request_chunks(body, 100) -// .bench(b) -// } - -// #[bench] -// fn http1_parallel_x10_res_1mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 1]; -// opts().parallel(10).response_body(body).bench(b) -// } - -// #[bench] -// fn http1_parallel_x10_res_10mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 10]; -// opts().parallel(10).response_body(body).bench(b) -// } +#[bench] +fn http1_consecutive_x1_empty(b: &mut Bencher) { + opts().bench(b) +} + +#[bench] +fn http1_consecutive_x1_req_10b(b: &mut test::Bencher) { + opts() + .method(Method::POST) + .request_body(&[b's'; 10]) + .bench(b) +} + +#[bench] +fn http1_consecutive_x1_both_100kb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 100]; + opts() + .method(Method::POST) + .request_body(body) + .response_body(body) + .bench(b) +} + +#[bench] +fn http1_consecutive_x1_both_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts() + .method(Method::POST) + .request_body(body) + .response_body(body) + .bench(b) +} + +#[bench] +fn http1_parallel_x10_empty(b: &mut test::Bencher) { + opts().parallel(10).bench(b) +} + +#[bench] +fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts() + .parallel(10) + .method(Method::POST) + .request_body(body) + .bench(b) +} + +#[bench] +fn http1_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 10]; + opts() + .parallel(10) + .method(Method::POST) + .request_chunks(body, 100) + .bench(b) +} + +#[bench] +fn http1_parallel_x10_res_1mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 1]; + opts().parallel(10).response_body(body).bench(b) +} + +#[bench] +fn http1_parallel_x10_res_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts().parallel(10).response_body(body).bench(b) +} // // HTTP2 -// const HTTP2_MAX_WINDOW: u32 = std::u32::MAX >> 1; - -// #[bench] -// fn http2_consecutive_x1_empty(b: &mut test::Bencher) { -// opts().http2().bench(b) -// } - -// #[bench] -// fn http2_consecutive_x1_req_10b(b: &mut test::Bencher) { -// opts() -// .http2() -// .method(Method::POST) -// .request_body(&[b's'; 10]) -// .bench(b) -// } - -// #[bench] -// fn http2_consecutive_x1_req_100kb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 100]; -// opts() -// .http2() -// .method(Method::POST) -// .request_body(body) -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_empty(b: &mut test::Bencher) { -// opts().http2().parallel(10).bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 10]; -// opts() -// .http2() -// .parallel(10) -// .method(Method::POST) -// .request_body(body) -// .http2_stream_window(HTTP2_MAX_WINDOW) -// .http2_conn_window(HTTP2_MAX_WINDOW) -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 10]; -// opts() -// .http2() -// .parallel(10) -// .method(Method::POST) -// .request_chunks(body, 100) -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 10]; -// opts() -// .http2() -// .parallel(10) -// .method(Method::POST) -// .request_chunks(body, 100) -// .http2_adaptive_window() -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_req_10kb_100_chunks_max_window(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 10]; -// opts() -// .http2() -// .parallel(10) -// .method(Method::POST) -// .request_chunks(body, 100) -// .http2_stream_window(HTTP2_MAX_WINDOW) -// .http2_conn_window(HTTP2_MAX_WINDOW) -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_res_1mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 1]; -// opts() -// .http2() -// .parallel(10) -// .response_body(body) -// .http2_stream_window(HTTP2_MAX_WINDOW) -// .http2_conn_window(HTTP2_MAX_WINDOW) -// .bench(b) -// } - -// #[bench] -// fn http2_parallel_x10_res_10mb(b: &mut test::Bencher) { -// let body = &[b'x'; 1024 * 1024 * 10]; -// opts() -// .http2() -// .parallel(10) -// .response_body(body) -// .http2_stream_window(HTTP2_MAX_WINDOW) -// .http2_conn_window(HTTP2_MAX_WINDOW) -// .bench(b) -// } +const HTTP2_MAX_WINDOW: u32 = std::u32::MAX >> 1; + +#[bench] +fn http2_consecutive_x1_empty(b: &mut test::Bencher) { + opts().http2().bench(b) +} + +#[bench] +fn http2_consecutive_x1_req_10b(b: &mut test::Bencher) { + opts() + .http2() + .method(Method::POST) + .request_body(&[b's'; 10]) + .bench(b) +} + +#[bench] +fn http2_consecutive_x1_req_100kb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 100]; + opts() + .http2() + .method(Method::POST) + .request_body(body) + .bench(b) +} + +#[bench] +fn http2_parallel_x10_empty(b: &mut test::Bencher) { + opts().http2().parallel(10).bench(b) +} + +#[bench] +fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts() + .http2() + .parallel(10) + .method(Method::POST) + .request_body(body) + .http2_stream_window(HTTP2_MAX_WINDOW) + .http2_conn_window(HTTP2_MAX_WINDOW) + .bench(b) +} + +#[bench] +fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 10]; + opts() + .http2() + .parallel(10) + .method(Method::POST) + .request_chunks(body, 100) + .bench(b) +} + +#[bench] +fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 10]; + opts() + .http2() + .parallel(10) + .method(Method::POST) + .request_chunks(body, 100) + .http2_adaptive_window() + .bench(b) +} + +#[bench] +fn http2_parallel_x10_req_10kb_100_chunks_max_window(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 10]; + opts() + .http2() + .parallel(10) + .method(Method::POST) + .request_chunks(body, 100) + .http2_stream_window(HTTP2_MAX_WINDOW) + .http2_conn_window(HTTP2_MAX_WINDOW) + .bench(b) +} + +#[bench] +fn http2_parallel_x10_res_1mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 1]; + opts() + .http2() + .parallel(10) + .response_body(body) + .http2_stream_window(HTTP2_MAX_WINDOW) + .http2_conn_window(HTTP2_MAX_WINDOW) + .bench(b) +} + +#[bench] +fn http2_parallel_x10_res_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts() + .http2() + .parallel(10) + .response_body(body) + .http2_stream_window(HTTP2_MAX_WINDOW) + .http2_conn_window(HTTP2_MAX_WINDOW) + .bench(b) +} // // ==== Benchmark Options ===== -// struct Opts { -// http2: bool, -// http2_stream_window: Option, -// http2_conn_window: Option, -// http2_adaptive_window: bool, -// parallel_cnt: u32, -// request_method: Method, -// request_body: Option<&'static [u8]>, -// request_chunks: usize, -// response_body: &'static [u8], -// } - -// fn opts() -> Opts { -// Opts { -// http2: false, -// http2_stream_window: None, -// http2_conn_window: None, -// http2_adaptive_window: false, -// parallel_cnt: 1, -// request_method: Method::GET, -// request_body: None, -// request_chunks: 0, -// response_body: b"", -// } -// } - -// impl Opts { -// fn http2(mut self) -> Self { -// self.http2 = true; -// self -// } - -// fn http2_stream_window(mut self, sz: impl Into>) -> Self { -// assert!(!self.http2_adaptive_window); -// self.http2_stream_window = sz.into(); -// self -// } - -// fn http2_conn_window(mut self, sz: impl Into>) -> Self { -// assert!(!self.http2_adaptive_window); -// self.http2_conn_window = sz.into(); -// self -// } - -// fn http2_adaptive_window(mut self) -> Self { -// assert!(self.http2_stream_window.is_none()); -// assert!(self.http2_conn_window.is_none()); -// self.http2_adaptive_window = true; -// self -// } - -// fn method(mut self, m: Method) -> Self { -// self.request_method = m; -// self -// } - -// fn request_body(mut self, body: &'static [u8]) -> Self { -// self.request_body = Some(body); -// self -// } - -// fn request_chunks(mut self, chunk: &'static [u8], cnt: usize) -> Self { -// assert!(cnt > 0); -// self.request_body = Some(chunk); -// self.request_chunks = cnt; -// self -// } - -// fn response_body(mut self, body: &'static [u8]) -> Self { -// self.response_body = body; -// self -// } - -// fn parallel(mut self, cnt: u32) -> Self { -// assert!(cnt > 0, "parallel count must be larger than 0"); -// self.parallel_cnt = cnt; -// self -// } - -// fn bench(self, b: &mut test::Bencher) { -// use std::sync::Arc; -// let _ = pretty_env_logger::try_init(); -// // Create a runtime of current thread. -// let rt = Arc::new( -// tokio::runtime::Builder::new_current_thread() -// .enable_all() -// .build() -// .expect("rt build"), -// ); -// let exec = rt.clone(); - -// let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64; -// let req_len = if self.request_chunks > 0 { -// req_len * self.request_chunks as u64 -// } else { -// req_len -// }; -// let bytes_per_iter = (req_len + self.response_body.len() as u64) * self.parallel_cnt as u64; -// b.bytes = bytes_per_iter; - -// let addr = spawn_server(&rt, &self); - -// let connector = HttpConnector::new(); -// let client = hyper::Client::builder() -// .http2_only(self.http2) -// .http2_initial_stream_window_size(self.http2_stream_window) -// .http2_initial_connection_window_size(self.http2_conn_window) -// .http2_adaptive_window(self.http2_adaptive_window) -// .build::<_, Body>(connector); - -// let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); - -// let make_request = || { -// let chunk_cnt = self.request_chunks; -// let body = if chunk_cnt > 0 { -// let (mut tx, body) = Body::channel(); -// let chunk = self -// .request_body -// .expect("request_chunks means request_body"); -// exec.spawn(async move { -// for _ in 0..chunk_cnt { -// tx.send_data(chunk.into()).await.expect("send_data"); -// } -// }); -// body -// } else { -// self.request_body -// .map(Body::from) -// .unwrap_or_else(Body::empty) -// }; -// let mut req = Request::new(body); -// *req.method_mut() = self.request_method.clone(); -// *req.uri_mut() = url.clone(); -// req -// }; - -// let send_request = |req: Request| { -// let fut = client.request(req); -// async { -// let res = fut.await.expect("client wait"); -// let mut body = res.into_body(); -// while let Some(_chunk) = body.data().await {} -// } -// }; - -// if self.parallel_cnt == 1 { -// b.iter(|| { -// let req = make_request(); -// rt.block_on(send_request(req)); -// }); -// } else { -// b.iter(|| { -// let futs = (0..self.parallel_cnt).map(|_| { -// let req = make_request(); -// send_request(req) -// }); -// // Await all spawned futures becoming completed. -// rt.block_on(join_all(futs)); -// }); -// } -// } -// } - -// fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { -// use hyper::service::{make_service_fn, service_fn}; -// let addr = "127.0.0.1:0".parse().unwrap(); - -// let body = opts.response_body; -// let srv = rt.block_on(async move { -// Server::bind(&addr) -// .http2_only(opts.http2) -// .http2_initial_stream_window_size(opts.http2_stream_window) -// .http2_initial_connection_window_size(opts.http2_conn_window) -// .http2_adaptive_window(opts.http2_adaptive_window) -// .serve(make_service_fn(move |_| async move { -// Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { -// let mut req_body = req.into_body(); -// while let Some(_chunk) = req_body.data().await {} -// Ok::<_, hyper::Error>(Response::new(Body::from(body))) -// })) -// })) -// }); -// let addr = srv.local_addr(); -// rt.spawn(async { -// if let Err(err) = srv.await { -// panic!("server error: {}", err); -// } -// }); -// addr -// } +struct Opts { + http2: bool, + http2_stream_window: Option, + http2_conn_window: Option, + http2_adaptive_window: bool, + parallel_cnt: u32, + request_method: Method, + request_body: Option<&'static [u8]>, + request_chunks: usize, + response_body: &'static [u8], +} + +fn opts() -> Opts { + Opts { + http2: false, + http2_stream_window: None, + http2_conn_window: None, + http2_adaptive_window: false, + parallel_cnt: 1, + request_method: Method::GET, + request_body: None, + request_chunks: 0, + response_body: b"", + } +} + +impl Opts { + fn http2(mut self) -> Self { + self.http2 = true; + self + } + + fn http2_stream_window(mut self, sz: impl Into>) -> Self { + assert!(!self.http2_adaptive_window); + self.http2_stream_window = sz.into(); + self + } + + fn http2_conn_window(mut self, sz: impl Into>) -> Self { + assert!(!self.http2_adaptive_window); + self.http2_conn_window = sz.into(); + self + } + + fn http2_adaptive_window(mut self) -> Self { + assert!(self.http2_stream_window.is_none()); + assert!(self.http2_conn_window.is_none()); + self.http2_adaptive_window = true; + self + } + + fn method(mut self, m: Method) -> Self { + self.request_method = m; + self + } + + fn request_body(mut self, body: &'static [u8]) -> Self { + self.request_body = Some(body); + self + } + + fn request_chunks(mut self, chunk: &'static [u8], cnt: usize) -> Self { + assert!(cnt > 0); + self.request_body = Some(chunk); + self.request_chunks = cnt; + self + } + + fn response_body(mut self, body: &'static [u8]) -> Self { + self.response_body = body; + self + } + + fn parallel(mut self, cnt: u32) -> Self { + assert!(cnt > 0, "parallel count must be larger than 0"); + self.parallel_cnt = cnt; + self + } + + fn bench(self, b: &mut test::Bencher) { + use std::sync::Arc; + let _ = pretty_env_logger::try_init(); + // Create a runtime of current thread. + let rt = Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("rt build"), + ); + let exec = rt.clone(); + + let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64 + * u64::max(1, self.request_chunks as u64); + let bytes_per_iter = (req_len + self.response_body.len() as u64) * self.parallel_cnt as u64; + b.bytes = bytes_per_iter; + + let addr = spawn_server(&rt, &self); + + let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); + + let mut request_sender = prepare_client(&rt, &self, &addr); + + let make_request = || { + let chunk_cnt = self.request_chunks; + let body = if chunk_cnt > 0 { + let (mut tx, body) = Body::channel(); + let chunk = self + .request_body + .expect("request_chunks means request_body"); + exec.spawn(async move { + for _ in 0..chunk_cnt { + tx.send_data(chunk.into()).await.expect("send_data"); + } + }); + body + } else { + self.request_body + .map(Body::from) + .unwrap_or_else(Body::empty) + }; + let mut req = Request::new(body); + *req.method_mut() = self.request_method.clone(); + *req.uri_mut() = url.clone(); + req + }; + + let mut send_request = |req: Request| { + let fut = request_sender.send_request(req); + async { + let res = fut.await.expect("client wait"); + let mut body = res.into_body(); + while let Some(_chunk) = body.data().await {} + } + }; + + if self.parallel_cnt == 1 { + b.iter(|| { + let req = make_request(); + rt.block_on(send_request(req)); + }); + } else { + for _ in 0..self.parallel_cnt { + b.iter(|| { + let req = make_request(); + rt.block_on(send_request(req)); + }); + } + } + } +} + +fn prepare_client( + rt: &tokio::runtime::Runtime, + opts: &Opts, + addr: &SocketAddr, +) -> SendRequest { + let mut builder = Builder::new(); + builder + .http2_only(opts.http2) + .http2_initial_stream_window_size(opts.http2_stream_window) + .http2_initial_connection_window_size(opts.http2_conn_window) + .http2_adaptive_window(opts.http2_adaptive_window); + + let (request_sender, connection) = rt.block_on(async move { + let target_stream = TcpStream::connect(addr).await.unwrap(); + + builder.handshake(target_stream).await.unwrap() + }); + rt.spawn(async move { + if let Err(e) = connection.await { + panic!("Error in connection: {}", e); + } + }); + request_sender +} + +fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let tcp_listener = rt.block_on(async move { TcpListener::bind(addr).await.unwrap() }); + let local_addr = tcp_listener.local_addr().unwrap(); + let mut http = Http::new(); + //configure http based on opts + http.http2_only(opts.http2) + .http2_initial_stream_window_size(opts.http2_stream_window) + .http2_initial_connection_window_size(opts.http2_conn_window) + .http2_adaptive_window(opts.http2_adaptive_window); + rt.spawn(async move { + loop { + let (tcp_stream, addr) = tcp_listener.accept().await.unwrap(); + println!("{:?}", addr); + if let Err(http_err) = http + .serve_connection(tcp_stream, service_fn(handle_request)) + .await + { + panic!("Error while serving HTTP connection: {}", http_err); + } + } + }); + + local_addr +} + +async fn handle_request(_req: Request) -> Result, Infallible> { + let mut req_body = _req.into_body(); + while let Some(_) = req_body.data().await {} + Ok::, Infallible>(Response::new(Body::from(req_body))) +} From 8c386877d98dbc5dc2e9db9a03a06d2341dc2a3b Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Tue, 9 Aug 2022 21:58:56 +0200 Subject: [PATCH 2/6] wip --- src/client/conn/http2.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 5e63f51b35..409b2b4486 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -416,7 +416,7 @@ impl Builder { let opts = self.clone(); async move { - tracing::trace!("client handshake HTTP/1"); + tracing::trace!("client handshake HTTP/2"); let (tx, rx) = dispatch::channel(); let h2 = From edfb77ada3880106a98ba8e35da84b0866b91a3a Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Tue, 9 Aug 2022 22:25:31 +0200 Subject: [PATCH 3/6] wip --- benches/end_to_end.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 9c58d2dc92..36f24fe27e 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -4,7 +4,7 @@ extern crate test; use test::Bencher; use hyper::body::HttpBody; -use hyper::client::conn::{Builder, SendRequest}; +use hyper::client::conn::{Builder, Connection, SendRequest}; use hyper::{server::conn::Http, service::service_fn}; use hyper::{Body, Method, Request, Response}; use std::convert::Infallible; From 3c8a3794393ea2eb83a77dbd5072aef578c7e93f Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Sat, 13 Aug 2022 11:32:31 +0200 Subject: [PATCH 4/6] remove unused parameter --- benches/end_to_end.rs | 149 +++++++++++++++++++++++++++++------------- 1 file changed, 105 insertions(+), 44 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 36f24fe27e..86160f8075 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -3,8 +3,9 @@ extern crate test; use test::Bencher; +use futures_util::future::join_all; use hyper::body::HttpBody; -use hyper::client::conn::{Builder, Connection, SendRequest}; +use hyper::client::conn::{Builder, SendRequest}; use hyper::{server::conn::Http, service::service_fn}; use hyper::{Body, Method, Request, Response}; use std::convert::Infallible; @@ -280,7 +281,6 @@ impl Opts { .build() .expect("rt build"), ); - let exec = rt.clone(); let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64 * u64::max(1, self.request_chunks as u64); @@ -289,38 +289,57 @@ impl Opts { let addr = spawn_server(&rt, &self); - let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); - - let mut request_sender = prepare_client(&rt, &self, &addr); - - let make_request = || { - let chunk_cnt = self.request_chunks; - let body = if chunk_cnt > 0 { - let (mut tx, body) = Body::channel(); - let chunk = self - .request_body - .expect("request_chunks means request_body"); - exec.spawn(async move { - for _ in 0..chunk_cnt { - tx.send_data(chunk.into()).await.expect("send_data"); - } - }); - body - } else { - self.request_body - .map(Body::from) - .unwrap_or_else(Body::empty) - }; - let mut req = Request::new(body); - *req.method_mut() = self.request_method.clone(); - *req.uri_mut() = url.clone(); - req + match self.http2 { + true => self.bench_http2(b, &rt, &addr), + false => self.bench_http1(b, &rt, &addr), + }; + } + + // + // Benches http/1 requests + // + fn bench_http1(self, b: &mut test::Bencher, rt: &tokio::runtime::Runtime, addr: &SocketAddr) { + //Open n connections to the server, + let mut request_senders: Vec> = (0..self.parallel_cnt) + .map(|_| prepare_client(&rt, &self, &addr)) + .collect(); + + let mut send_request = |req: Request, idx: usize| { + let fut = request_senders[idx].send_request(req); + async { + let res = fut.await.expect("Client wait"); + let mut body = res.into_body(); + while let Some(_chunk) = body.data().await {} + } }; + if self.parallel_cnt == 1 { + b.iter(|| { + let req = make_request(&self, &rt, &addr); + rt.block_on(send_request(req, 0)); + }); + } else { + b.iter(|| { + //in each iter, we are going to send one request with each of the request senders + let futs = (0..self.parallel_cnt as usize).map(|idx| { + let req = make_request(&self, &rt, &addr); + send_request(req, idx) + }); + rt.block_on(join_all(futs)); + }); + } + } + + // + // Benches http/2 requests + // + fn bench_http2(&self, b: &mut test::Bencher, rt: &tokio::runtime::Runtime, addr: &SocketAddr) { + //open just one connection, and send all the requests via that connection + let mut request_sender = prepare_client(rt, &self, addr); let mut send_request = |req: Request| { let fut = request_sender.send_request(req); async { - let res = fut.await.expect("client wait"); + let res = fut.await.expect("Client wait"); let mut body = res.into_body(); while let Some(_chunk) = body.data().await {} } @@ -328,20 +347,51 @@ impl Opts { if self.parallel_cnt == 1 { b.iter(|| { - let req = make_request(); + let req = make_request(&self, &rt, &addr); rt.block_on(send_request(req)); }); } else { - for _ in 0..self.parallel_cnt { - b.iter(|| { - let req = make_request(); - rt.block_on(send_request(req)); - }); - } + let futs = (0..self.parallel_cnt).map(|_| { + let req = make_request(&self, &rt, &addr); + send_request(req) + }); + + rt.block_on(join_all(futs)); } } } +// +// Creates a request, for being sent via the request_sender +// +fn make_request(opts: &Opts, rt: &tokio::runtime::Runtime, addr: &SocketAddr) -> Request { + let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); + let chunk_cnt = opts.request_chunks; + let body = if chunk_cnt > 0 { + let (mut tx, body) = Body::channel(); + let chunk = opts + .request_body + .expect("request_chunks means request_body"); + rt.spawn(async move { + for _ in 0..chunk_cnt { + tx.send_data(chunk.into()).await.expect("send_data"); + } + }); + body + } else { + opts.request_body + .map(Body::from) + .unwrap_or_else(Body::empty) + }; + let mut req = Request::new(body); + *req.method_mut() = opts.request_method.clone(); + *req.uri_mut() = url.clone(); + req +} + +// +// Prepares a client (request_sender) for sending requests to the given addr +// fn prepare_client( rt: &tokio::runtime::Runtime, opts: &Opts, @@ -367,6 +417,9 @@ fn prepare_client( request_sender } +// +// Spawns a server in the background +// fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let tcp_listener = rt.block_on(async move { TcpListener::bind(addr).await.unwrap() }); @@ -377,19 +430,27 @@ fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { .http2_initial_stream_window_size(opts.http2_stream_window) .http2_initial_connection_window_size(opts.http2_conn_window) .http2_adaptive_window(opts.http2_adaptive_window); + + let (http2, http2_stream_window, http2_conn_window, http2_adaptive_window) = ( + opts.http2, + opts.http2_stream_window, + opts.http2_conn_window, + opts.http2_adaptive_window, + ); rt.spawn(async move { loop { let (tcp_stream, addr) = tcp_listener.accept().await.unwrap(); - println!("{:?}", addr); - if let Err(http_err) = http - .serve_connection(tcp_stream, service_fn(handle_request)) - .await - { - panic!("Error while serving HTTP connection: {}", http_err); - } + eprintln!("New incoming connection: {:?}", addr); + tokio::task::spawn( + Http::new() + .http2_only(http2) + .http2_initial_stream_window_size(http2_stream_window) + .http2_initial_connection_window_size(http2_conn_window) + .http2_adaptive_window(http2_adaptive_window) + .serve_connection(tcp_stream, service_fn(handle_request)), + ); } }); - local_addr } From 0c2f3201dfec3374672798de15b3ff5976e778c5 Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Sat, 13 Aug 2022 11:33:49 +0200 Subject: [PATCH 5/6] deny warnings --- benches/end_to_end.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 86160f8075..e36ef1fa44 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -1,4 +1,5 @@ #![feature(test)] +#![deny(warnings)] extern crate test; use test::Bencher; From cbd4be3b558116b186eb0eeac645fde1673b88f9 Mon Sep 17 00:00:00 2001 From: Robertoskr Date: Sun, 14 Aug 2022 18:04:53 +0200 Subject: [PATCH 6/6] use lock in http2 requests --- benches/end_to_end.rs | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index e36ef1fa44..dff477208f 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -11,8 +11,11 @@ use hyper::{server::conn::Http, service::service_fn}; use hyper::{Body, Method, Request, Response}; use std::convert::Infallible; use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::sync::Mutex; +//use tower::ServiceExt; // // HTTP1 #[bench] @@ -273,7 +276,6 @@ impl Opts { } fn bench(self, b: &mut test::Bencher) { - use std::sync::Arc; let _ = pretty_env_logger::try_init(); // Create a runtime of current thread. let rt = Arc::new( @@ -336,11 +338,17 @@ impl Opts { // fn bench_http2(&self, b: &mut test::Bencher, rt: &tokio::runtime::Runtime, addr: &SocketAddr) { //open just one connection, and send all the requests via that connection - let mut request_sender = prepare_client(rt, &self, addr); - let mut send_request = |req: Request| { - let fut = request_sender.send_request(req); - async { - let res = fut.await.expect("Client wait"); + let request_sender = Arc::new(Mutex::new(prepare_client(rt, &self, addr))); + let send_request = |req: Request| { + let mut _sender = Arc::clone(&request_sender); + async move { + let res = _sender + .lock() + .await + .send_request(req) + .await + .expect("Client wait"); + let mut body = res.into_body(); while let Some(_chunk) = body.data().await {} } @@ -352,12 +360,14 @@ impl Opts { rt.block_on(send_request(req)); }); } else { - let futs = (0..self.parallel_cnt).map(|_| { - let req = make_request(&self, &rt, &addr); - send_request(req) - }); + b.iter(|| { + let futs = (0..self.parallel_cnt).map(|_| { + let req = make_request(&self, &rt, &addr); + send_request(req) + }); - rt.block_on(join_all(futs)); + rt.block_on(join_all(futs)); + }); } } }