From 7fb3b9bc2900e3405202d9052ba973a3014ec143 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 8 Apr 2024 11:49:51 -0700 Subject: [PATCH] tests: Replace async-channel with flume This commit replaces async-channel in some tests with flume. It appears that async-channel doesn't work under MIRI but flume does, so we can work around this for now by replacing it with flume. cc https://github.com/smol-rs/async-channel/issues/85 Signed-off-by: John Nunley --- Cargo.toml | 2 +- src/barrier.rs | 2 ++ tests/barrier.rs | 12 ++++++------ tests/mutex.rs | 6 +++--- tests/rwlock.rs | 37 +++++++++++++++++++------------------ 5 files changed, 31 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 88a24be..801d78b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,8 +24,8 @@ default = ["std"] std = ["event-listener/std", "event-listener-strategy/std"] [dev-dependencies] -async-channel = "2.2.0" fastrand = "2.0.0" +flume = "0.11.0" futures-lite = "2.0.0" waker-fn = "1.1.0" diff --git a/src/barrier.rs b/src/barrier.rs index 8d06fc1..cbdc810 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -123,6 +123,8 @@ impl Barrier { /// println!("after wait"); /// }); /// } + /// # // Wait for threads to stop. + /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] pub fn wait_blocking(&self) -> BarrierWaitResult { diff --git a/tests/barrier.rs b/tests/barrier.rs index b79209a..8561505 100644 --- a/tests/barrier.rs +++ b/tests/barrier.rs @@ -12,7 +12,7 @@ fn smoke() { let barrier = Arc::new(Barrier::new(N)); for _ in 0..10 { - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); for _ in 0..N - 1 { let c = barrier.clone(); @@ -21,7 +21,7 @@ fn smoke() { thread::spawn(move || { future::block_on(async move { let res = c.wait().await; - tx.send(res.is_leader()).await.unwrap(); + tx.send_async(res.is_leader()).await.unwrap(); }) }); } @@ -35,7 +35,7 @@ fn smoke() { // Now, the barrier is cleared and we should get data. for _ in 0..N - 1 { - if rx.recv().await.unwrap() { + if rx.recv_async().await.unwrap() { assert!(!leader_found); leader_found = true; } @@ -55,7 +55,7 @@ fn smoke_blocking() { let barrier = Arc::new(Barrier::new(N)); for _ in 0..10 { - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); for _ in 0..N - 1 { let c = barrier.clone(); @@ -63,7 +63,7 @@ fn smoke_blocking() { thread::spawn(move || { let res = c.wait_blocking(); - tx.send_blocking(res.is_leader()).unwrap(); + tx.send(res.is_leader()).unwrap(); }); } @@ -76,7 +76,7 @@ fn smoke_blocking() { // Now, the barrier is cleared and we should get data. for _ in 0..N - 1 { - if rx.recv().await.unwrap() { + if rx.recv_async().await.unwrap() { assert!(!leader_found); leader_found = true; } diff --git a/tests/mutex.rs b/tests/mutex.rs index 01d4924..289e9fe 100644 --- a/tests/mutex.rs +++ b/tests/mutex.rs @@ -63,7 +63,7 @@ fn get_mut() { #[test] fn contention() { future::block_on(async { - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); let tx = Arc::new(tx); let mutex = Arc::new(Mutex::new(0i32)); @@ -77,14 +77,14 @@ fn contention() { future::block_on(async move { let mut lock = mutex.lock().await; *lock += 1; - tx.send(()).await.unwrap(); + tx.send_async(()).await.unwrap(); drop(lock); }) }); } for _ in 0..num_tasks { - rx.recv().await.unwrap(); + rx.recv_async().await.unwrap(); } let lock = mutex.lock().await; diff --git a/tests/rwlock.rs b/tests/rwlock.rs index cd6fb92..95e4528 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -25,13 +25,13 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); #[cfg(not(target_family = "wasm"))] fn spawn(f: impl Future + Send + 'static) -> future::Boxed { - let (s, r) = async_channel::bounded(1); + let (s, r) = flume::bounded(1); thread::spawn(move || { future::block_on(async { - let _ = s.send(f.await).await; + let _ = s.send_async(f.await).await; }) }); - async move { r.recv().await.unwrap() }.boxed() + async move { r.recv_async().await.unwrap() }.boxed() } #[test] @@ -119,13 +119,14 @@ fn get_mut() { } // Miri bug; this works when async is replaced with blocking -#[cfg(not(any(target_family = "wasm", miri)))] +#[cfg(not(target_family = "wasm"))] #[test] +#[cfg_attr(miri, ignore)] fn contention() { const N: u32 = 10; const M: usize = if cfg!(miri) { 100 } else { 1000 }; - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); let tx = Arc::new(tx); let rw = Arc::new(RwLock::new(())); @@ -142,25 +143,25 @@ fn contention() { drop(rw.read().await); } } - tx.send(()).await.unwrap(); + tx.send_async(()).await.unwrap(); }); } future::block_on(async move { for _ in 0..N { - rx.recv().await.unwrap(); + rx.recv_async().await.unwrap(); } }); } -// Miri bug; this works when async is replaced with blocking -#[cfg(not(any(target_family = "wasm", miri)))] +#[cfg(not(target_family = "wasm"))] #[test] +#[cfg_attr(miri, ignore)] fn contention_arc() { const N: u32 = 10; const M: usize = if cfg!(miri) { 100 } else { 1000 }; - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); let tx = Arc::new(tx); let rw = Arc::new(RwLock::new(())); @@ -177,13 +178,13 @@ fn contention_arc() { drop(rw.read_arc().await); } } - tx.send(()).await.unwrap(); + tx.send_async(()).await.unwrap(); }); } future::block_on(async move { for _ in 0..N { - rx.recv().await.unwrap(); + rx.recv_async().await.unwrap(); } }); } @@ -192,7 +193,7 @@ fn contention_arc() { #[test] fn writer_and_readers() { let lock = Arc::new(RwLock::new(0i32)); - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); // Spawn a writer task. let _spawned = spawn({ @@ -205,7 +206,7 @@ fn writer_and_readers() { future::yield_now().await; *lock = tmp + 1; } - tx.send(()).await.unwrap(); + tx.send_async(()).await.unwrap(); } }); @@ -228,7 +229,7 @@ fn writer_and_readers() { } // Wait for writer to finish. - rx.recv().await.unwrap(); + rx.recv_async().await.unwrap(); let lock = lock.read().await; assert_eq!(*lock, 1000); }); @@ -238,7 +239,7 @@ fn writer_and_readers() { #[test] fn writer_and_readers_arc() { let lock = Arc::new(RwLock::new(0i32)); - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = flume::unbounded(); // Spawn a writer task. let _spawned = spawn({ @@ -251,7 +252,7 @@ fn writer_and_readers_arc() { future::yield_now().await; *lock = tmp + 1; } - tx.send(()).await.unwrap(); + tx.send_async(()).await.unwrap(); } }); @@ -274,7 +275,7 @@ fn writer_and_readers_arc() { } // Wait for writer to finish. - rx.recv().await.unwrap(); + rx.recv_async().await.unwrap(); let lock = lock.read_arc().await; assert_eq!(*lock, 1000); });