Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tokio in tests. #83

Merged
merged 1 commit into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ static_assertions = "1"

[dev-dependencies]
anyhow = "1"
async-std = "1"
criterion = "0.3"
futures = "0.3.4"
quickcheck = "0.9"
tokio = { version = "0.2", features = ["tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3", features = ["compat"] }

[[bench]]
name = "concurrent"
Expand Down
8 changes: 5 additions & 3 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::task;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{channel::mpsc, future, prelude::*, ready};
use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll}};
use tokio::{runtime::Runtime, task};
use yamux::{Config, Connection, Mode};

criterion_group!(benches, concurrent);
Expand Down Expand Up @@ -52,16 +52,18 @@ fn concurrent(c: &mut Criterion) {

c.bench_function_over_inputs("one by one", move |b, &&params| {
let data = data1.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
task::block_on(roundtrip(params.streams, params.messages, data.clone(), false))
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), false))
})
},
params);

c.bench_function_over_inputs("all at once", move |b, &&params| {
let data = data2.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
task::block_on(roundtrip(params.streams, params.messages, data.clone(), true))
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), true))
})
},
params);
Expand Down
2 changes: 1 addition & 1 deletion src/frame/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
#[test]
fn encode_decode_identity() {
fn property(f: Frame<()>) -> bool {
async_std::task::block_on(async move {
futures::executor::block_on(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use futures::executor here and Runtime::block_on in other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correctness does not depend on the runtime choice so it does not really matter. As this is a small property test without concurrency, executing it on the same thread is more light-weight.

let id = crate::connection::Id::random();
let mut io = Io::new(id, futures::io::Cursor::new(Vec::new()), f.body.len());
if io.send(&f).await.is_err() {
Expand Down
32 changes: 18 additions & 14 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,35 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::{net::{TcpStream, TcpListener}, task};
use crate::{Config, Connection, ConnectionError, Mode, Control, connection::State};
use futures::{future, prelude::*};
use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};
use rand::Rng;
use std::{fmt::Debug, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}};
use tokio::{net::{TcpStream, TcpListener}, runtime::Runtime, task};
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};

#[test]
fn prop_send_recv() {
fn prop(msgs: Vec<Msg>) -> TestResult {
if msgs.is_empty() {
return TestResult::discard()
}
task::block_on(async move {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let num_requests = msgs.len();
let iter = msgs.into_iter().map(|m| m.0);

let (listener, address) = bind().await.expect("bind");
let (mut listener, address) = bind().await.expect("bind");

let server = async {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let connection = Connection::new(socket, Config::default(), Mode::Server);
repeat_echo(connection).await.expect("repeat_echo")
};

let client = async {
let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, Config::default(), Mode::Client);
let control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand All @@ -55,19 +57,20 @@ fn prop_max_streams() {
let mut cfg = Config::default();
cfg.set_max_num_streams(max_streams);

task::block_on(async move {
let (listener, address) = bind().await.expect("bind");
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (mut listener, address) = bind().await.expect("bind");

let cfg_s = cfg.clone();
let server = async move {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let connection = Connection::new(socket, cfg_s, Mode::Server);
repeat_echo(connection).await
};

task::spawn(server);

let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, cfg, Mode::Client);
let mut control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand All @@ -89,12 +92,13 @@ fn prop_max_streams() {
fn prop_send_recv_half_closed() {
fn prop(msg: Msg) {
let msg_len = msg.0.len();
task::block_on(async move {
let (listener, address) = bind().await.expect("bind");
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (mut listener, address) = bind().await.expect("bind");

// Server should be able to write on a stream shutdown by the client.
let server = async {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let mut connection = Connection::new(socket, Config::default(), Mode::Server);
let mut stream = connection.next_stream().await
.expect("S: next_stream")
Expand All @@ -108,7 +112,7 @@ fn prop_send_recv_half_closed() {

// Client should be able to read after shutting down the stream.
let client = async {
let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, Config::default(), Mode::Client);
let mut control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand Down Expand Up @@ -150,7 +154,7 @@ async fn bind() -> io::Result<(TcpListener, SocketAddr)> {
}

/// For each incoming stream of `c` echo back to the sender.
async fn repeat_echo(c: Connection<TcpStream>) -> Result<(), ConnectionError> {
async fn repeat_echo(c: Connection<Compat<TcpStream>>) -> Result<(), ConnectionError> {
let c = crate::into_stream(c);
c.try_for_each_concurrent(None, |mut stream| async move {
{
Expand Down
15 changes: 8 additions & 7 deletions tests/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::{net::{TcpStream, TcpListener}, task};
use futures::{channel::mpsc, prelude::*};
use std::{net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc};
use tokio::{net::{TcpStream, TcpListener}, task};
use tokio_util::compat::Tokio02AsyncReadCompatExt;
use yamux::{Config, Connection, Mode};

async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {
let listener = TcpListener::bind(&address).await.expect("bind");
let mut listener = TcpListener::bind(&address).await.expect("bind");
let address = listener.local_addr().expect("local address");

let server = async move {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
yamux::into_stream(Connection::new(socket, Config::default(), Mode::Server))
.try_for_each_concurrent(None, |mut stream| async move {
log::debug!("S: accepted new stream");
Expand All @@ -36,7 +37,7 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {

task::spawn(server);

let socket = TcpStream::connect(&address).await.expect("connect");
let socket = TcpStream::connect(&address).await.expect("connect").compat();
let (tx, rx) = mpsc::unbounded();
let conn = Connection::new(socket, Config::default(), Mode::Client);
let mut ctrl = conn.control();
Expand Down Expand Up @@ -65,9 +66,9 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {
assert_eq!(nstreams, n)
}

#[test]
fn concurrent_streams() {
#[tokio::test]
async fn concurrent_streams() {
let data = Arc::new(vec![0x42; 100 * 1024]);
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
task::block_on(roundtrip(addr, 1000, data))
roundtrip(addr, 1000, data).await
}