From 96246cec0a980a00a911bc3636e04f0fe2d76d17 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Fri, 12 Jun 2020 13:59:20 +0200 Subject: [PATCH] Use tokio in tests. --- Cargo.toml | 3 ++- benches/concurrent.rs | 8 +++++--- src/frame/io.rs | 2 +- src/tests.rs | 32 ++++++++++++++++++-------------- tests/concurrent.rs | 15 ++++++++------- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5fa387b2..3514ca29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,10 +20,11 @@ static_assertions = "1" [dev-dependencies] anyhow = "1" -async-std = "1" criterion = "0.3" futures = "0.3.4" quickcheck = "0.9" +tokio = { version = "0.2", features = ["tcp", "rt-threaded", "macros"] } +tokio-util = { version = "0.3", features = ["compat"] } [[bench]] name = "concurrent" diff --git a/benches/concurrent.rs b/benches/concurrent.rs index 829f2252..108b5d92 100644 --- a/benches/concurrent.rs +++ b/benches/concurrent.rs @@ -8,10 +8,10 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use async_std::task; use criterion::{criterion_group, criterion_main, Criterion}; use futures::{channel::mpsc, future, prelude::*, ready}; use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll}}; +use tokio::{runtime::Runtime, task}; use yamux::{Config, Connection, Mode}; criterion_group!(benches, concurrent); @@ -52,16 +52,18 @@ fn concurrent(c: &mut Criterion) { c.bench_function_over_inputs("one by one", move |b, &¶ms| { let data = data1.clone(); + let mut rt = Runtime::new().unwrap(); b.iter(move || { - task::block_on(roundtrip(params.streams, params.messages, data.clone(), false)) + rt.block_on(roundtrip(params.streams, params.messages, data.clone(), false)) }) }, params); c.bench_function_over_inputs("all at once", move |b, &¶ms| { let data = data2.clone(); + let mut rt = Runtime::new().unwrap(); b.iter(move || { - task::block_on(roundtrip(params.streams, params.messages, data.clone(), true)) + rt.block_on(roundtrip(params.streams, params.messages, data.clone(), true)) }) }, params); diff --git a/src/frame/io.rs b/src/frame/io.rs index c03f072c..a8d24c81 100644 --- a/src/frame/io.rs +++ b/src/frame/io.rs @@ -232,7 +232,7 @@ mod tests { #[test] fn encode_decode_identity() { fn property(f: Frame<()>) -> bool { - async_std::task::block_on(async move { + futures::executor::block_on(async move { let id = crate::connection::Id::random(); let mut io = Io::new(id, futures::io::Cursor::new(Vec::new()), f.body.len()); if io.send(&f).await.is_err() { diff --git a/src/tests.rs b/src/tests.rs index b636d81e..e985358b 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -8,12 +8,13 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use async_std::{net::{TcpStream, TcpListener}, task}; use crate::{Config, Connection, ConnectionError, Mode, Control, connection::State}; use futures::{future, prelude::*}; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; use rand::Rng; use std::{fmt::Debug, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}}; +use tokio::{net::{TcpStream, TcpListener}, runtime::Runtime, task}; +use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt}; #[test] fn prop_send_recv() { @@ -21,20 +22,21 @@ fn prop_send_recv() { if msgs.is_empty() { return TestResult::discard() } - task::block_on(async move { + let mut rt = Runtime::new().unwrap(); + rt.block_on(async move { let num_requests = msgs.len(); let iter = msgs.into_iter().map(|m| m.0); - let (listener, address) = bind().await.expect("bind"); + let (mut listener, address) = bind().await.expect("bind"); let server = async { - let socket = listener.accept().await.expect("accept").0; + let socket = listener.accept().await.expect("accept").0.compat(); let connection = Connection::new(socket, Config::default(), Mode::Server); repeat_echo(connection).await.expect("repeat_echo") }; let client = async { - let socket = TcpStream::connect(address).await.expect("connect"); + let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, Config::default(), Mode::Client); let control = connection.control(); task::spawn(crate::into_stream(connection).for_each(|_| future::ready(()))); @@ -55,19 +57,20 @@ fn prop_max_streams() { let mut cfg = Config::default(); cfg.set_max_num_streams(max_streams); - task::block_on(async move { - let (listener, address) = bind().await.expect("bind"); + let mut rt = Runtime::new().unwrap(); + rt.block_on(async move { + let (mut listener, address) = bind().await.expect("bind"); let cfg_s = cfg.clone(); let server = async move { - let socket = listener.accept().await.expect("accept").0; + let socket = listener.accept().await.expect("accept").0.compat(); let connection = Connection::new(socket, cfg_s, Mode::Server); repeat_echo(connection).await }; task::spawn(server); - let socket = TcpStream::connect(address).await.expect("connect"); + let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, cfg, Mode::Client); let mut control = connection.control(); task::spawn(crate::into_stream(connection).for_each(|_| future::ready(()))); @@ -89,12 +92,13 @@ fn prop_max_streams() { fn prop_send_recv_half_closed() { fn prop(msg: Msg) { let msg_len = msg.0.len(); - task::block_on(async move { - let (listener, address) = bind().await.expect("bind"); + let mut rt = Runtime::new().unwrap(); + rt.block_on(async move { + let (mut listener, address) = bind().await.expect("bind"); // Server should be able to write on a stream shutdown by the client. let server = async { - let socket = listener.accept().await.expect("accept").0; + let socket = listener.accept().await.expect("accept").0.compat(); let mut connection = Connection::new(socket, Config::default(), Mode::Server); let mut stream = connection.next_stream().await .expect("S: next_stream") @@ -108,7 +112,7 @@ fn prop_send_recv_half_closed() { // Client should be able to read after shutting down the stream. let client = async { - let socket = TcpStream::connect(address).await.expect("connect"); + let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, Config::default(), Mode::Client); let mut control = connection.control(); task::spawn(crate::into_stream(connection).for_each(|_| future::ready(()))); @@ -150,7 +154,7 @@ async fn bind() -> io::Result<(TcpListener, SocketAddr)> { } /// For each incoming stream of `c` echo back to the sender. -async fn repeat_echo(c: Connection) -> Result<(), ConnectionError> { +async fn repeat_echo(c: Connection>) -> Result<(), ConnectionError> { let c = crate::into_stream(c); c.try_for_each_concurrent(None, |mut stream| async move { { diff --git a/tests/concurrent.rs b/tests/concurrent.rs index b37f7fbe..ba2f1f06 100644 --- a/tests/concurrent.rs +++ b/tests/concurrent.rs @@ -8,17 +8,18 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use async_std::{net::{TcpStream, TcpListener}, task}; use futures::{channel::mpsc, prelude::*}; use std::{net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc}; +use tokio::{net::{TcpStream, TcpListener}, task}; +use tokio_util::compat::Tokio02AsyncReadCompatExt; use yamux::{Config, Connection, Mode}; async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc>) { - let listener = TcpListener::bind(&address).await.expect("bind"); + let mut listener = TcpListener::bind(&address).await.expect("bind"); let address = listener.local_addr().expect("local address"); let server = async move { - let socket = listener.accept().await.expect("accept").0; + let socket = listener.accept().await.expect("accept").0.compat(); yamux::into_stream(Connection::new(socket, Config::default(), Mode::Server)) .try_for_each_concurrent(None, |mut stream| async move { log::debug!("S: accepted new stream"); @@ -36,7 +37,7 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc>) { task::spawn(server); - let socket = TcpStream::connect(&address).await.expect("connect"); + let socket = TcpStream::connect(&address).await.expect("connect").compat(); let (tx, rx) = mpsc::unbounded(); let conn = Connection::new(socket, Config::default(), Mode::Client); let mut ctrl = conn.control(); @@ -65,9 +66,9 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc>) { assert_eq!(nstreams, n) } -#[test] -fn concurrent_streams() { +#[tokio::test] +async fn concurrent_streams() { let data = Arc::new(vec![0x42; 100 * 1024]); let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)); - task::block_on(roundtrip(addr, 1000, data)) + roundtrip(addr, 1000, data).await }