From bbe3fb69e481f3b8afb465103ede6a7017b08d4b Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Fri, 17 Dec 2021 18:50:51 +0000 Subject: [PATCH 01/18] wip: try initial solution The `UnownedTask` type will have an extra field called `is_mandatory` which will determine whether the tasks get executed at shutdown. Maybe the `RawTask` is a better place to put this field, but I don't know my way around these types yet. I have verified that this code change fixes the problem by running the code in the original issue a hundred thousand times. Without the fix, that code consistently misses a write in the first few hundred executions. With the change, I've never seen it miss the write. I think we might be able to use `loom` to test this. Will try to do so. --- examples/Cargo.toml | 4 ++++ examples/write_all_stuff.rs | 11 +++++++++++ tokio/src/blocking.rs | 5 ++++- tokio/src/fs/file.rs | 6 +++--- tokio/src/fs/mocks.rs | 15 +++++++++++++++ tokio/src/runtime/blocking/mod.rs | 5 ++++- tokio/src/runtime/blocking/pool.rs | 18 +++++++++++++++++- tokio/src/runtime/handle.rs | 28 ++++++++++++++++++++++++---- tokio/src/runtime/mod.rs | 5 ++++- tokio/src/runtime/task/mod.rs | 9 ++++++++- tokio/src/runtime/tests/mod.rs | 2 +- 11 files changed, 95 insertions(+), 13 deletions(-) create mode 100644 examples/write_all_stuff.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d2aca69d84a..223cee26b22 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -91,3 +91,7 @@ path = "named-pipe-ready.rs" [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" + +[[example]] +name = "write_all_stuff" +path = "write_all_stuff.rs" diff --git a/examples/write_all_stuff.rs b/examples/write_all_stuff.rs new file mode 100644 index 00000000000..19532c26924 --- /dev/null +++ b/examples/write_all_stuff.rs @@ -0,0 +1,11 @@ +use tokio::io::{self, AsyncWriteExt}; +use tokio::fs::File; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut file = File::create("foo.txt").await?; + + file.write_all(b"some bytes").await?; + //file.flush().await?; + Ok(()) +} diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs index f88b1db11cc..cd43342fa0e 100644 --- a/tokio/src/blocking.rs +++ b/tokio/src/blocking.rs @@ -1,5 +1,8 @@ cfg_rt! { - pub(crate) use crate::runtime::spawn_blocking; + pub(crate) use crate::runtime::{ + spawn_blocking, + spawn_mandatory_blocking + }; pub(crate) use crate::task::JoinHandle; } diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 61071cf6309..7a9fbfcab0d 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -20,13 +20,13 @@ use std::task::Poll; use std::task::Poll::*; #[cfg(test)] -use super::mocks::spawn_blocking; +use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; #[cfg(test)] use super::mocks::JoinHandle; #[cfg(test)] use super::mocks::MockFile as StdFile; #[cfg(not(test))] -use crate::blocking::spawn_blocking; +use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; #[cfg(not(test))] use crate::blocking::JoinHandle; #[cfg(not(test))] @@ -649,7 +649,7 @@ impl AsyncWrite for File { let n = buf.copy_from(src); let std = me.std.clone(); - inner.state = Busy(spawn_blocking(move || { + inner.state = Busy(spawn_mandatory_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) } else { diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index 68ef4f3a7a4..f78223e1e71 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -105,6 +105,21 @@ where JoinHandle { rx } } +pub(super) fn spawn_mandatory_blocking(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let task = Box::new(move || { + let _ = tx.send(f()); + }); + + QUEUE.with(|cell| cell.borrow_mut().push_back(task)); + + JoinHandle { rx } +} + impl Future for JoinHandle { type Output = Result; diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 670ec3a4b34..59c3528acaf 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,10 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +pub(crate) use pool::{ + spawn_blocking, + spawn_mandatory_blocking, + BlockingPool, Spawner}; mod schedule; mod shutdown; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index bb6c1ee6606..a305ffc59fe 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -84,6 +84,17 @@ where rt.spawn_blocking(func) } +/// Runs the provided function on an executor dedicated to blocking operations. +pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let rt = context::current(); + rt.spawn_mandatory_blocking(func) +} + + // ===== impl BlockingPool ===== impl BlockingPool { @@ -302,7 +313,12 @@ impl Inner { // Drain the queue while let Some(task) = shared.queue.pop_front() { drop(shared); - task.shutdown(); + + if task.is_mandatory() { + task.run(); + } else { + task.shutdown(); + } shared = self.shared.lock(); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 3481a2552f3..71cf8cf205d 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -190,14 +190,34 @@ impl Handle { R: Send + 'static, { if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), None) + self.spawn_blocking_inner(Box::new(func), false, None) } else { - self.spawn_blocking_inner(func, None) + self.spawn_blocking_inner(func, false, None) } } + + #[track_caller] + pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + self.spawn_blocking_inner(Box::new(func), true, None) + } else { + self.spawn_blocking_inner(func, true, None) + } + } + + #[track_caller] - pub(crate) fn spawn_blocking_inner(&self, func: F, name: Option<&str>) -> JoinHandle + pub(crate) fn spawn_blocking_inner( + &self, + func: F, + is_mandatory: bool, + name: Option<&str> + ) -> JoinHandle where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -222,7 +242,7 @@ impl Handle { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule); + let (task, handle) = task::unowned(fut, NoopSchedule, is_mandatory); let _ = self.blocking_spawner.spawn(task, self); handle } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index c8d97e1b19a..dd02712ea31 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -199,7 +199,10 @@ cfg_rt! { mod blocking; use blocking::BlockingPool; - pub(crate) use blocking::spawn_blocking; + pub(crate) use blocking::{ + spawn_blocking, + spawn_mandatory_blocking + }; mod builder; pub use self::builder::Builder; diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 0592cca1a09..4e362f37c10 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -204,6 +204,7 @@ pub(crate) struct LocalNotified { /// This type holds two ref-counts. pub(crate) struct UnownedTask { raw: RawTask, + is_mandatory: bool, _p: PhantomData, } @@ -264,7 +265,8 @@ cfg_rt! { /// only when the task is not going to be stored in an `OwnedTasks` list. /// /// Currently only blocking tasks use this method. - pub(crate) fn unowned(task: T, scheduler: S) -> (UnownedTask, JoinHandle) + pub(crate) fn unowned(task: T, scheduler: S, + is_mandatory: bool) -> (UnownedTask, JoinHandle) where S: Schedule, T: Send + Future + 'static, @@ -276,6 +278,7 @@ cfg_rt! { // This is valid because an UnownedTask holds two ref-counts. let unowned = UnownedTask { raw: task.raw, + is_mandatory, _p: PhantomData, }; std::mem::forget(task); @@ -384,6 +387,10 @@ impl UnownedTask { pub(crate) fn shutdown(self) { self.into_task().shutdown() } + + pub(crate) fn is_mandatory(&self) -> bool { + self.is_mandatory + } } impl Drop for Task { diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index be36d6ffe4d..55987f618fd 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -23,7 +23,7 @@ mod unowned_wrapper { T: std::future::Future + Send + 'static, T::Output: Send + 'static, { - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, false); (task.into_notified(), handle) } } From 1ab9f3a027313b9620212a835beaf618cdbdf1f6 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sun, 12 Dec 2021 20:17:23 +0000 Subject: [PATCH 02/18] add comment to start discussion --- tokio/src/runtime/blocking/pool.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index a305ffc59fe..47e5bc74254 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -281,6 +281,10 @@ impl Inner { shared.num_idle += 1; while !shared.shutdown { + // NOTE: A task spawned by spawn_blocking if: + // 1. It is spawned (thus the `condvar` gets a `notify_one`) + // 2. `shutdown` is called and manages to acquire the `shared.lock` *before* the + // thread woken up here gets a hold of it. Then `shutdown` will be true. let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; From fc71d43236460bcb0bfbc06268b2415210624b81 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Mon, 27 Dec 2021 22:14:45 +0000 Subject: [PATCH 03/18] write a test that proves the problem is there This is just a prototype to communicate that I didn't get the loom test to work and to see if this testing approach would be accepted. It's not great, because the test implementation is very coupled to the blocking pool implementation (f.i, spawning a previous blocking task and awaiting on it before launching the offending task). However, it takes 150ms to run on my machine when it succeeds, and fails in the first few attempts when using `spawn_blocking` instead of `spawn_mandatory_blocking`, which is a good sign --- tokio/src/runtime/tests/queue.rs | 55 ++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 2de17b9f4c4..293da0d77e9 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -227,6 +227,61 @@ fn stress2() { } } +#[test] +fn mandatory_blocking_tasks_get_executed() { + use crate::runtime; + use std::sync::{Arc, atomic::AtomicBool}; + + // We need to execute the test a few times because the failure is + // non-deterministic. I tried to write a loom that would prove the + // bug is there (when using spawn_blocking) but failed. The test + // I wrote didn't fail, even when running >30 minutes. + for i in 1..1000 { + let rt = runtime::Builder::new_multi_thread() + .max_blocking_threads(1) + .worker_threads(1) + .build().unwrap(); + + let did_blocking_task_execute = Arc::new( + AtomicBool::new(false) + ); + + { + let did_blocking_task_execute = did_blocking_task_execute.clone(); + let _enter = rt.enter(); + + rt.block_on(async move { + // We need to have spawned a previous blocking task + // so that the thread in the blocking pool gets to + // the state where it's waiting either for a shutdown + // or another task. + runtime::spawn_blocking(move || { + + }).await.unwrap(); + + // Changing this spawn_mandatory_blocking to + // spawn_mandatory makes the test fail in a few + // iterations + runtime::spawn_mandatory_blocking(move || { + did_blocking_task_execute.store( + true, + std::sync::atomic::Ordering::Release + ); + }) + }); + drop(rt); + } + + assert!( + did_blocking_task_execute.load( + std::sync::atomic::Ordering::Acquire + ), + "Failed at iteration {:?}", i + ); + } +} + + struct Runtime; impl Schedule for Runtime { From fc51289d9a9d452e1df5f4dd1cdfa887f18ab297 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Wed, 29 Dec 2021 09:34:29 +0000 Subject: [PATCH 04/18] cleanup --- examples/Cargo.toml | 4 -- examples/write_all_stuff.rs | 11 ----- tokio/src/fs/file.rs | 8 ++-- tokio/src/runtime/blocking/mod.rs | 5 +-- tokio/src/runtime/blocking/pool.rs | 5 --- tokio/src/runtime/handle.rs | 4 +- tokio/src/runtime/tests/loom_blocking.rs | 20 +++++++++ tokio/src/runtime/tests/queue.rs | 55 ------------------------ 8 files changed, 26 insertions(+), 86 deletions(-) delete mode 100644 examples/write_all_stuff.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 223cee26b22..d2aca69d84a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -91,7 +91,3 @@ path = "named-pipe-ready.rs" [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" - -[[example]] -name = "write_all_stuff" -path = "write_all_stuff.rs" diff --git a/examples/write_all_stuff.rs b/examples/write_all_stuff.rs deleted file mode 100644 index 19532c26924..00000000000 --- a/examples/write_all_stuff.rs +++ /dev/null @@ -1,11 +0,0 @@ -use tokio::io::{self, AsyncWriteExt}; -use tokio::fs::File; - -#[tokio::main] -async fn main() -> io::Result<()> { - let mut file = File::create("foo.txt").await?; - - file.write_all(b"some bytes").await?; - //file.flush().await?; - Ok(()) -} diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 7a9fbfcab0d..79db4a65a45 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -19,17 +19,17 @@ use std::task::Context; use std::task::Poll; use std::task::Poll::*; -#[cfg(test)] -use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; #[cfg(test)] use super::mocks::JoinHandle; #[cfg(test)] use super::mocks::MockFile as StdFile; -#[cfg(not(test))] -use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(test)] +use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; #[cfg(not(test))] use crate::blocking::JoinHandle; #[cfg(not(test))] +use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(not(test))] use std::fs::File as StdFile; /// A reference to an open file on the filesystem. diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 59c3528acaf..5b9d63c905a 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,10 +4,7 @@ //! compilation. mod pool; -pub(crate) use pool::{ - spawn_blocking, - spawn_mandatory_blocking, - BlockingPool, Spawner}; +pub(crate) use pool::{spawn_blocking, spawn_mandatory_blocking, BlockingPool, Spawner}; mod schedule; mod shutdown; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 47e5bc74254..7ae55033983 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -94,7 +94,6 @@ where rt.spawn_mandatory_blocking(func) } - // ===== impl BlockingPool ===== impl BlockingPool { @@ -281,10 +280,6 @@ impl Inner { shared.num_idle += 1; while !shared.shutdown { - // NOTE: A task spawned by spawn_blocking if: - // 1. It is spawned (thus the `condvar` gets a `notify_one`) - // 2. `shutdown` is called and manages to acquire the `shared.lock` *before* the - // thread woken up here gets a hold of it. Then `shutdown` will be true. let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 71cf8cf205d..9748147f2b0 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -196,7 +196,6 @@ impl Handle { } } - #[track_caller] pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle where @@ -210,13 +209,12 @@ impl Handle { } } - #[track_caller] pub(crate) fn spawn_blocking_inner( &self, func: F, is_mandatory: bool, - name: Option<&str> + name: Option<&str>, ) -> JoinHandle where F: FnOnce() -> R + Send + 'static, diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 8fb54c5657e..248799faa3a 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -23,6 +23,26 @@ fn blocking_shutdown() { }); } +#[test] +fn spawn_blocking_should_always_run() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build.unwrap(); + + let (tx, rx) = loom_oneshot::channel(); + let _enter = rt.enter(); + runtime::spawn_blocking(|| {}); + runtime::spawn_blocking(move || { + let _ = tx.send(()); + }); + + drop(rt); + + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 293da0d77e9..2de17b9f4c4 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -227,61 +227,6 @@ fn stress2() { } } -#[test] -fn mandatory_blocking_tasks_get_executed() { - use crate::runtime; - use std::sync::{Arc, atomic::AtomicBool}; - - // We need to execute the test a few times because the failure is - // non-deterministic. I tried to write a loom that would prove the - // bug is there (when using spawn_blocking) but failed. The test - // I wrote didn't fail, even when running >30 minutes. - for i in 1..1000 { - let rt = runtime::Builder::new_multi_thread() - .max_blocking_threads(1) - .worker_threads(1) - .build().unwrap(); - - let did_blocking_task_execute = Arc::new( - AtomicBool::new(false) - ); - - { - let did_blocking_task_execute = did_blocking_task_execute.clone(); - let _enter = rt.enter(); - - rt.block_on(async move { - // We need to have spawned a previous blocking task - // so that the thread in the blocking pool gets to - // the state where it's waiting either for a shutdown - // or another task. - runtime::spawn_blocking(move || { - - }).await.unwrap(); - - // Changing this spawn_mandatory_blocking to - // spawn_mandatory makes the test fail in a few - // iterations - runtime::spawn_mandatory_blocking(move || { - did_blocking_task_execute.store( - true, - std::sync::atomic::Ordering::Release - ); - }) - }); - drop(rt); - } - - assert!( - did_blocking_task_execute.load( - std::sync::atomic::Ordering::Acquire - ), - "Failed at iteration {:?}", i - ); - } -} - - struct Runtime; impl Schedule for Runtime { From 6ff802c8a2a2034ca963fa131ac0bc35f467f5db Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Wed, 29 Dec 2021 18:44:19 +0000 Subject: [PATCH 05/18] clean up warnings --- tokio/src/blocking.rs | 19 +++++++++++++++---- tokio/src/runtime/blocking/mod.rs | 6 +++++- tokio/src/runtime/blocking/pool.rs | 16 +++++++++------- tokio/src/runtime/handle.rs | 20 +++++++++++--------- tokio/src/runtime/mod.rs | 9 +++++---- 5 files changed, 45 insertions(+), 25 deletions(-) diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs index cd43342fa0e..7825b4b73e9 100644 --- a/tokio/src/blocking.rs +++ b/tokio/src/blocking.rs @@ -1,8 +1,10 @@ cfg_rt! { - pub(crate) use crate::runtime::{ - spawn_blocking, - spawn_mandatory_blocking - }; + pub(crate) use crate::runtime::spawn_blocking; + + cfg_fs! { + pub(crate) use crate::runtime::spawn_mandatory_blocking; + } + pub(crate) use crate::task::JoinHandle; } @@ -19,7 +21,16 @@ cfg_not_rt! { { assert_send_sync::>>(); panic!("requires the `rt` Tokio feature flag") + } + cfg_fs! { + pub(crate) fn spawn_mandatory_blocking(_f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + panic!("requires the `rt` Tokio feature flag") + } } pub(crate) struct JoinHandle { diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 5b9d63c905a..54a69c4f6d7 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,11 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, spawn_mandatory_blocking, BlockingPool, Spawner}; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; + +cfg_fs! { + pub(crate) use pool::spawn_mandatory_blocking; +} mod schedule; mod shutdown; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 7ae55033983..591274ad77a 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -84,14 +84,16 @@ where rt.spawn_blocking(func) } -/// Runs the provided function on an executor dedicated to blocking operations. -pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle -where - F: FnOnce() -> R + Send + 'static, +cfg_fs! { + /// Runs the provided function on an executor dedicated to blocking operations. + pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, R: Send + 'static, -{ - let rt = context::current(); - rt.spawn_mandatory_blocking(func) + { + let rt = context::current(); + rt.spawn_mandatory_blocking(func) + } } // ===== impl BlockingPool ===== diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 9748147f2b0..a8f4743a759 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -196,16 +196,18 @@ impl Handle { } } - #[track_caller] - pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, + cfg_fs! { + #[track_caller] + pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, R: Send + 'static, - { - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), true, None) - } else { - self.spawn_blocking_inner(func, true, None) + { + if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + self.spawn_blocking_inner(Box::new(func), true, None) + } else { + self.spawn_blocking_inner(func, true, None) + } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index dd02712ea31..f099632b65c 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -199,10 +199,11 @@ cfg_rt! { mod blocking; use blocking::BlockingPool; - pub(crate) use blocking::{ - spawn_blocking, - spawn_mandatory_blocking - }; + pub(crate) use blocking::spawn_blocking; + + cfg_fs! { + pub(crate) use blocking::spawn_mandatory_blocking; + } mod builder; pub use self::builder::Builder; From 14c3840bea9c9944f1b6772b2955fbe9b77b36aa Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Wed, 29 Dec 2021 18:52:55 +0000 Subject: [PATCH 06/18] fix test --- tokio/src/runtime/tests/loom_blocking.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 248799faa3a..e5dbccf6f69 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -24,15 +24,15 @@ fn blocking_shutdown() { } #[test] -fn spawn_blocking_should_always_run() { +fn spawn_mandatory_blocking_should_always_run() { use crate::runtime::tests::loom_oneshot; loom::model(|| { - let rt = runtime::Builder::new_current_thread().build.unwrap(); + let rt = runtime::Builder::new_current_thread().build().unwrap(); let (tx, rx) = loom_oneshot::channel(); let _enter = rt.enter(); runtime::spawn_blocking(|| {}); - runtime::spawn_blocking(move || { + runtime::spawn_mandatory_blocking(move || { let _ = tx.send(()); }); From 137bad02ef984e4bd746d9fe8fd898efe64e0ffd Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Wed, 29 Dec 2021 19:24:34 +0000 Subject: [PATCH 07/18] fix more things --- tokio/src/blocking.rs | 1 + tokio/src/runtime/blocking/pool.rs | 1 + tokio/src/runtime/tests/mod.rs | 2 +- tokio/src/task/builder.rs | 2 +- 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs index 7825b4b73e9..3057743dd35 100644 --- a/tokio/src/blocking.rs +++ b/tokio/src/blocking.rs @@ -2,6 +2,7 @@ cfg_rt! { pub(crate) use crate::runtime::spawn_blocking; cfg_fs! { + #[allow(unused_imports)] pub(crate) use crate::runtime::spawn_mandatory_blocking; } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 591274ad77a..71d69f67824 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -85,6 +85,7 @@ where } cfg_fs! { + #[allow(dead_code)] /// Runs the provided function on an executor dedicated to blocking operations. pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle where diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 55987f618fd..78bcca45442 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -13,7 +13,7 @@ mod unowned_wrapper { use tracing::Instrument; let span = tracing::trace_span!("test_span"); let task = task.instrument(span); - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, false); (task.into_notified(), handle) } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 0a7fe3c371a..01d61fe91f8 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -107,6 +107,6 @@ impl<'a> Builder<'a> { Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - context::current().spawn_blocking_inner(function, self.name) + context::current().spawn_blocking_inner(function, false, self.name) } } From 7ae245de6bc852799d73475d5f2cf66fbd0f0b65 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 09:38:52 +0000 Subject: [PATCH 08/18] fix some style issues --- tokio/src/blocking.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 2 +- tokio/src/runtime/handle.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs index 3057743dd35..2d66b1e0556 100644 --- a/tokio/src/blocking.rs +++ b/tokio/src/blocking.rs @@ -28,7 +28,7 @@ cfg_not_rt! { pub(crate) fn spawn_mandatory_blocking(_f: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + R: Send + 'static, { panic!("requires the `rt` Tokio feature flag") } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 71d69f67824..01917d1f52f 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -90,7 +90,7 @@ cfg_fs! { pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + R: Send + 'static, { let rt = context::current(); rt.spawn_mandatory_blocking(func) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a8f4743a759..e0e64719ee9 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -201,7 +201,7 @@ impl Handle { pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + R: Send + 'static, { if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner(Box::new(func), true, None) From 11b0be4e7b7a323645de4b916a9721e0ced2b672 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 10:03:59 +0000 Subject: [PATCH 09/18] add `is_mandatory` into a new `BlockingTask` type The previous approach added it to the `UnownedTask` type. However, the type is also used in contexts where the concept of mandatory doesn't apply (unit tests). This new approach adds the information about "mandatory-ness" right where it makes sense: the blocking pool. --- tokio/src/runtime/blocking/mod.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 21 +++++++++++++++------ tokio/src/runtime/handle.rs | 6 ++++-- tokio/src/runtime/task/mod.rs | 9 +-------- tokio/src/runtime/tests/mod.rs | 4 ++-- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 54a69c4f6d7..3c1aab7b3d8 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,7 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner, Task}; cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 01917d1f52f..df8b02828f4 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -70,7 +70,16 @@ struct Shared { worker_thread_index: usize, } -type Task = task::UnownedTask; +pub(crate) struct Task { + task: task::UnownedTask, + is_mandatory: bool, +} + +impl Task { + pub(crate) fn new(task: task::UnownedTask, is_mandatory: bool) -> Task { + Task { task, is_mandatory } + } +} const KEEP_ALIVE: Duration = Duration::from_secs(10); @@ -190,7 +199,7 @@ impl Spawner { if shared.shutdown { // Shutdown the task - task.shutdown(); + task.task.shutdown(); // no need to even push this task; it would never get picked up return Err(()); @@ -274,7 +283,7 @@ impl Inner { // BUSY while let Some(task) = shared.queue.pop_front() { drop(shared); - task.run(); + task.task.run(); shared = self.shared.lock(); } @@ -316,10 +325,10 @@ impl Inner { while let Some(task) = shared.queue.pop_front() { drop(shared); - if task.is_mandatory() { - task.run(); + if task.is_mandatory { + task.task.run(); } else { - task.shutdown(); + task.task.shutdown(); } shared = self.shared.lock(); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index e0e64719ee9..2c09ff30a26 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -242,8 +242,10 @@ impl Handle { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, is_mandatory); - let _ = self.blocking_spawner.spawn(task, self); + let (task, handle) = task::unowned(fut, NoopSchedule); + let _ = self + .blocking_spawner + .spawn(blocking::Task::new(task, is_mandatory), self); handle } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 4e362f37c10..0592cca1a09 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -204,7 +204,6 @@ pub(crate) struct LocalNotified { /// This type holds two ref-counts. pub(crate) struct UnownedTask { raw: RawTask, - is_mandatory: bool, _p: PhantomData, } @@ -265,8 +264,7 @@ cfg_rt! { /// only when the task is not going to be stored in an `OwnedTasks` list. /// /// Currently only blocking tasks use this method. - pub(crate) fn unowned(task: T, scheduler: S, - is_mandatory: bool) -> (UnownedTask, JoinHandle) + pub(crate) fn unowned(task: T, scheduler: S) -> (UnownedTask, JoinHandle) where S: Schedule, T: Send + Future + 'static, @@ -278,7 +276,6 @@ cfg_rt! { // This is valid because an UnownedTask holds two ref-counts. let unowned = UnownedTask { raw: task.raw, - is_mandatory, _p: PhantomData, }; std::mem::forget(task); @@ -387,10 +384,6 @@ impl UnownedTask { pub(crate) fn shutdown(self) { self.into_task().shutdown() } - - pub(crate) fn is_mandatory(&self) -> bool { - self.is_mandatory - } } impl Drop for Task { diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 78bcca45442..be36d6ffe4d 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -13,7 +13,7 @@ mod unowned_wrapper { use tracing::Instrument; let span = tracing::trace_span!("test_span"); let task = task.instrument(span); - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, false); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); (task.into_notified(), handle) } @@ -23,7 +23,7 @@ mod unowned_wrapper { T: std::future::Future + Send + 'static, T::Output: Send + 'static, { - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, false); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule); (task.into_notified(), handle) } } From 6555247bb463f60fb86270f5bed163cf4935c6b0 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 10:56:39 +0000 Subject: [PATCH 10/18] introduce `Mandatory` enum and clean up The purpose of this enum is to be just like the `is_mandatory: bool`, only that it makes call-site easier to understand. I have also introduced some methods in the `pool::Task` struct to make the code a bit nicer --- tokio/src/runtime/blocking/mod.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 32 +++++++++++++++++++++--------- tokio/src/runtime/handle.rs | 18 ++++++++++++----- tokio/src/runtime/mod.rs | 4 ++++ tokio/src/task/builder.rs | 3 ++- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 3c1aab7b3d8..15fe05c9ade 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,7 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner, Task}; +pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task}; cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index df8b02828f4..f8fed5976cb 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -72,12 +72,30 @@ struct Shared { pub(crate) struct Task { task: task::UnownedTask, - is_mandatory: bool, + mandatory: Mandatory, +} + +#[derive(PartialEq, Eq)] +pub(crate) enum Mandatory { + #[cfg_attr(not(fs), allow(dead_code))] + Mandatory, + NonMandatory, } impl Task { - pub(crate) fn new(task: task::UnownedTask, is_mandatory: bool) -> Task { - Task { task, is_mandatory } + pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { + Task { task, mandatory } + } + + fn run(self) { + self.task.run(); + } + + fn shutdown_or_run_if_mandatory(self) { + match self.mandatory { + Mandatory::NonMandatory => self.task.shutdown(), + Mandatory::Mandatory => self.task.run(), + } } } @@ -283,7 +301,7 @@ impl Inner { // BUSY while let Some(task) = shared.queue.pop_front() { drop(shared); - task.task.run(); + task.run(); shared = self.shared.lock(); } @@ -325,11 +343,7 @@ impl Inner { while let Some(task) = shared.queue.pop_front() { drop(shared); - if task.is_mandatory { - task.task.run(); - } else { - task.task.shutdown(); - } + task.shutdown_or_run_if_mandatory(); shared = self.shared.lock(); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 2c09ff30a26..ca991fa17b4 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -190,9 +190,9 @@ impl Handle { R: Send + 'static, { if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), false, None) + self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None) } else { - self.spawn_blocking_inner(func, false, None) + self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None) } } @@ -204,9 +204,17 @@ impl Handle { R: Send + 'static, { if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), true, None) + self.spawn_blocking_inner( + Box::new(func), + blocking::Mandatory::Mandatory, + None + ) } else { - self.spawn_blocking_inner(func, true, None) + self.spawn_blocking_inner( + func, + blocking::Mandatory::Mandatory, + None + ) } } } @@ -215,7 +223,7 @@ impl Handle { pub(crate) fn spawn_blocking_inner( &self, func: F, - is_mandatory: bool, + is_mandatory: blocking::Mandatory, name: Option<&str>, ) -> JoinHandle where diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index f099632b65c..b607d72f0dc 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -201,6 +201,10 @@ cfg_rt! { use blocking::BlockingPool; pub(crate) use blocking::spawn_blocking; + cfg_trace! { + pub(crate) use blocking::Mandatory; + } + cfg_fs! { pub(crate) use blocking::spawn_mandatory_blocking; } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 01d61fe91f8..ff6345029f9 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -107,6 +107,7 @@ impl<'a> Builder<'a> { Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - context::current().spawn_blocking_inner(function, false, self.name) + use crate::runtime::Mandatory; + context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name) } } From 52e11f58c6f637e16aea42452548a74c0df401c8 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 11:10:11 +0000 Subject: [PATCH 11/18] only allow dead_code in tests There are only two pieces of code that use `spawn_blocking_inner`: 1. `tokio::fs::File`, which uses a mock of `spawn_blocking_inner` in the tests. 2. The loom test that covers it. All other test build result in the function not being used, hence the dead_code --- tokio/src/runtime/blocking/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f8fed5976cb..a68a8460e22 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -112,7 +112,7 @@ where } cfg_fs! { - #[allow(dead_code)] + #[cfg_attr(test, allow(dead_code))] /// Runs the provided function on an executor dedicated to blocking operations. pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle where From ae607e3e365619f50e807127573d81f6b49e8921 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 13:51:08 +0000 Subject: [PATCH 12/18] fix build attributes for spawn_mandatory_blocking When running `cargo test` on `/tokio`, you would expect that `rustc` only gets executed against `tokio` using the `--test` flag, which would make the `cfg_attr(test, ...)` conditional attribute apply. That's not true though. When running `cargo test` on `tokio`, the tokio crate gets built *twice*, once without `--test` and once with (this can be verified using strace). This is because, f.i, tokio specifies a dev-dependency on `tokio-test, which specifies back a dependency on `tokio`. So when running a regular test build, tokio will first get built without `--test`. We will not get dead code errors there because for that build, `tokio::fs::File` uses the new `spawn_mandatory_blocking`. For the next build, the one with `--test`, we will not get dead code errors because, even though `tokio::fs::File` uses a mock `spawn_mandatory_blocking`, the `cfg_attr(test` is working as expected. Things are different for loom builds. We will first get a build of tokio without `--test` but with the `--cfg loom` flag. The fact that `tokio::fs::File` uses `spawn_mandatory_blocking` won't save us here, because `tokio:fs` doesn't get compiled in loom builds The solution I can think of is extending the `cfg_attr(test` to `cfg_attr(any(test, loom)`, which is a bit unfortunate because `spawn_mandatory_blocking` *is* used in the loom tests, only not in the regular loom build of the crate, which is triggered by the cyclical dependency between tokio and tokio-test. --- tokio/src/runtime/blocking/pool.rs | 5 ++++- tokio/src/runtime/handle.rs | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index a68a8460e22..87c7def15ad 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -112,7 +112,10 @@ where } cfg_fs! { - #[cfg_attr(test, allow(dead_code))] + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] /// Runs the provided function on an executor dedicated to blocking operations. pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle where diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index ca991fa17b4..05e12b7ee46 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -198,6 +198,10 @@ impl Handle { cfg_fs! { #[track_caller] + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, From 0bb408593c2823e653bd02005c53d59adf3954d2 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Thu, 30 Dec 2021 16:31:29 +0000 Subject: [PATCH 13/18] update contract for `spawn[_mandatory]_blocking` --- tokio/src/runtime/blocking/pool.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 87c7def15ad..de66b9eec02 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -102,6 +102,8 @@ impl Task { const KEEP_ALIVE: Duration = Duration::from_secs(10); /// Runs the provided function on an executor dedicated to blocking operations. +/// Tasks will be scheduled as non-mandatory, meaning they may not get executed +/// in case of runtime shutdown. pub(crate) fn spawn_blocking(func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, @@ -117,6 +119,8 @@ cfg_fs! { test ), allow(dead_code))] /// Runs the provided function on an executor dedicated to blocking operations. + /// Tasks will be scheduled as mandatory, meaning they are guaranteed to run + /// unless a shutdown is already taking place. pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, From 34f7c7ddcee5d7292d04c0436f8ca271e8f0766e Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Tue, 11 Jan 2022 21:15:54 +0000 Subject: [PATCH 14/18] spawn_mandatory_blocking: fix race with shutdown On a previous iteration of this PR, there could be a race between calling `spawn_mandatory_blocking` and dropping the runtime. This could result in f.i, a call to `File::write` returning successfully but the actual write not getting scheduled. Now `spawn_mandatory_blocking` will keep track of whether the task was actually scheduled. If not, the `File::write` method will return an error, letting users know that the write did not, and will not happen. I have also added a loom test that checks that the return value of `spawn_mandatory_blocking` can be trusted, even when shutting down from a different thread. --- tokio/src/fs/file.rs | 10 ++++++-- tokio/src/fs/mocks.rs | 4 +-- tokio/src/runtime/blocking/pool.rs | 13 ++++++---- tokio/src/runtime/handle.rs | 29 ++++++++++++++-------- tokio/src/runtime/tests/loom_blocking.rs | 31 +++++++++++++++++++++++- tokio/src/task/builder.rs | 4 ++- 6 files changed, 70 insertions(+), 21 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 79db4a65a45..5d21f346976 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -649,7 +649,7 @@ impl AsyncWrite for File { let n = buf.copy_from(src); let std = me.std.clone(); - inner.state = Busy(spawn_mandatory_blocking(move || { + let blocking_task_join_handle = spawn_mandatory_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) } else { @@ -657,7 +657,13 @@ impl AsyncWrite for File { }; (Operation::Write(res), buf) - })); + }) + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "runtime was shutting down", + ))?; + + inner.state = Busy(blocking_task_join_handle); return Ready(Ok(n)); } diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index f78223e1e71..b1861726778 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -105,7 +105,7 @@ where JoinHandle { rx } } -pub(super) fn spawn_mandatory_blocking(f: F) -> JoinHandle +pub(super) fn spawn_mandatory_blocking(f: F) -> Option> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -117,7 +117,7 @@ where QUEUE.with(|cell| cell.borrow_mut().push_back(task)); - JoinHandle { rx } + Some(JoinHandle { rx }) } impl Future for JoinHandle { diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index de66b9eec02..f0d7d4e0fe9 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -118,10 +118,11 @@ cfg_fs! { all(loom, not(test)), // the function is covered by loom tests test ), allow(dead_code))] - /// Runs the provided function on an executor dedicated to blocking operations. - /// Tasks will be scheduled as mandatory, meaning they are guaranteed to run - /// unless a shutdown is already taking place. - pub(crate) fn spawn_mandatory_blocking(func: F) -> JoinHandle + /// Runs the provided function on an executor dedicated to blocking + /// operations. Tasks will be scheduled as mandatory, meaning they are + /// guaranteed to run unless a shutdown is already taking place. In case a + /// shutdown is already taking place, `None` will be returned. + pub(crate) fn spawn_mandatory_blocking(func: F) -> Option> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -223,7 +224,9 @@ impl Spawner { let mut shared = self.inner.shared.lock(); if shared.shutdown { - // Shutdown the task + // Shutdown the task: it's fine to shutdown this task (even if + // mandatory) because it was scheduled after the shutdown of the + // runtime began. task.task.shutdown(); // no need to even push this task; it would never get picked up diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 05e12b7ee46..4446f059743 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -189,11 +189,14 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None) - } else { - self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None) - } + let (join_handle, _was_spawned) = + if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None) + } else { + self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None) + }; + + join_handle } cfg_fs! { @@ -202,12 +205,12 @@ impl Handle { all(loom, not(test)), // the function is covered by loom tests test ), allow(dead_code))] - pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> JoinHandle + pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> Option> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner( Box::new(func), blocking::Mandatory::Mandatory, @@ -219,6 +222,12 @@ impl Handle { blocking::Mandatory::Mandatory, None ) + }; + + if was_spawned { + Some(join_handle) + } else { + None } } } @@ -229,7 +238,7 @@ impl Handle { func: F, is_mandatory: blocking::Mandatory, name: Option<&str>, - ) -> JoinHandle + ) -> (JoinHandle, bool) where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -255,10 +264,10 @@ impl Handle { let _ = name; let (task, handle) = task::unowned(fut, NoopSchedule); - let _ = self + let spawned = self .blocking_spawner .spawn(blocking::Task::new(task, is_mandatory), self); - handle + (handle, !spawned.is_err()) } /// Runs a future to completion on this `Handle`'s associated `Runtime`. diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index e5dbccf6f69..51f1519ed72 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -34,7 +34,8 @@ fn spawn_mandatory_blocking_should_always_run() { runtime::spawn_blocking(|| {}); runtime::spawn_mandatory_blocking(move || { let _ = tx.send(()); - }); + }) + .unwrap(); drop(rt); @@ -43,6 +44,34 @@ fn spawn_mandatory_blocking_should_always_run() { }); } +#[test] +fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() { + use crate::runtime::tests::loom_oneshot; + loom::model(|| { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + let _enter = rt.enter(); + + // Drop the runtime in a different thread + { + loom::thread::spawn(move || { + drop(rt); + }); + } + + let (tx, rx) = loom_oneshot::channel(); + let handle = runtime::spawn_mandatory_blocking(move || { + let _ = tx.send(()); + }); + + // handle.is_some() means that `spawn_mandatory_blocking` + // promised us to run the blocking task + if handle.is_some() { + // This call will deadlock if `spawn_mandatory_blocking` doesn't run. + let () = rx.recv(); + } + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index ff6345029f9..5a128420ee2 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -108,6 +108,8 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name) + let (join_handle, _was_spawned) = + context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name); + join_handle } } From a3339926d1c84307c98dfc599c4f6db0785c3706 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Tue, 11 Jan 2022 22:23:22 +0000 Subject: [PATCH 15/18] fixup! spawn_mandatory_blocking: fix race with shutdown --- tokio/src/blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/blocking.rs b/tokio/src/blocking.rs index 2d66b1e0556..f172399d5ef 100644 --- a/tokio/src/blocking.rs +++ b/tokio/src/blocking.rs @@ -25,7 +25,7 @@ cfg_not_rt! { } cfg_fs! { - pub(crate) fn spawn_mandatory_blocking(_f: F) -> JoinHandle + pub(crate) fn spawn_mandatory_blocking(_f: F) -> Option> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, From a31b367a50a457889f1d09ff348743606d2391aa Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Tue, 11 Jan 2022 22:30:27 +0000 Subject: [PATCH 16/18] fixup! fixup! spawn_mandatory_blocking: fix race with shutdown --- tokio/src/fs/file.rs | 7 +++---- tokio/src/runtime/handle.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5d21f346976..fb61293c658 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -658,10 +658,9 @@ impl AsyncWrite for File { (Operation::Write(res), buf) }) - .ok_or(std::io::Error::new( - std::io::ErrorKind::Other, - "runtime was shutting down", - ))?; + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::Other, "runtime was shutting down") + })?; inner.state = Busy(blocking_task_join_handle); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 4446f059743..9dbe6774dd0 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -267,7 +267,7 @@ impl Handle { let spawned = self .blocking_spawner .spawn(blocking::Task::new(task, is_mandatory), self); - (handle, !spawned.is_err()) + (handle, spawned.is_ok()) } /// Runs a future to completion on this `Handle`'s associated `Runtime`. From 75f487f24fe6d489fa5d7216061c178e5d5efbb2 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Mon, 24 Jan 2022 21:20:05 +0000 Subject: [PATCH 17/18] change return error value This way it is consistent with `asyncify` --- tokio/src/fs/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index fb61293c658..2c38e8059f5 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -659,7 +659,7 @@ impl AsyncWrite for File { (Operation::Write(res), buf) }) .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::Other, "runtime was shutting down") + io::Error::new(io::ErrorKind::Other, "background task failed") })?; inner.state = Busy(blocking_task_join_handle); From f96fa96ce28e3d4f2c31242a4ea50d37a349c52b Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Mon, 24 Jan 2022 22:22:23 +0000 Subject: [PATCH 18/18] fixup! spawn_mandatory_blocking: fix race with shutdown --- tokio/src/runtime/tests/loom_blocking.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 51f1519ed72..89de85e4362 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -49,7 +49,7 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread use crate::runtime::tests::loom_oneshot; loom::model(|| { let rt = runtime::Builder::new_current_thread().build().unwrap(); - let _enter = rt.enter(); + let handle = rt.handle().clone(); // Drop the runtime in a different thread { @@ -58,6 +58,7 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread }); } + let _enter = handle.enter(); let (tx, rx) = loom_oneshot::channel(); let handle = runtime::spawn_mandatory_blocking(move || { let _ = tx.send(());