diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 2179bae3261..c0469f3ffe4 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -133,6 +133,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" } futures = { version = "0.3.0", features = ["async-await"] } mockall = "0.11.1" async-stream = "0.3" +futures-concurrency = "7.6.3" [target.'cfg(not(target_family = "wasm"))'.dev-dependencies] socket2 = "0.5.5" diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index d1e59d34736..10a71aa7b10 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -398,6 +398,153 @@ macro_rules! doc { /// } /// } /// ``` + /// # Alternatives from the Ecosystem + /// + /// The `select!` macro is a powerful tool for managing multiple asynchronous + /// branches, enabling tasks to run concurrently within the same thread. However, + /// its use can introduce challenges, particularly around cancellation safety, which + /// can lead to subtle and hard-to-debug errors. For many use cases, ecosystem + /// alternatives may be preferable as they mitigate these concerns by offering + /// clearer syntax, more predictable control flow, and reducing the need to manually + /// handle issues like fuse semantics or cancellation safety. + /// + /// ## Merging Streams + /// + /// For cases where `loop { select! { ... } }` is used to poll multiple tasks, + /// stream merging offers a concise alternative, inherently handle cancellation-safe + /// processing, removing the risk of data loss. Libraries such as [`tokio_stream`], + /// [`futures::stream`] and [`futures_concurrency`] provide tools for merging + /// streams and handling their outputs sequentially. + /// + /// [`tokio_stream`]: https://docs.rs/tokio-stream/latest/tokio_stream/ + /// [`futures::stream`]: https://docs.rs/futures/latest/futures/stream/ + /// [`futures_concurrency`]: https://docs.rs/futures-concurrency/latest/futures_concurrency/ + /// + /// ### Example with `select!` + /// + /// ``` + /// struct File; + /// struct Channel; + /// struct Socket; + /// + /// impl Socket { + /// async fn read_packet(&mut self) -> Vec { + /// vec![] + /// } + /// } + /// + /// async fn read_send(_file: &mut File, _channel: &mut Channel) { + /// // do work that is not cancel safe + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// // open our IO types + /// let mut file = File; + /// let mut channel = Channel; + /// let mut socket = Socket; + /// + /// loop { + /// tokio::select! { + /// _ = read_send(&mut file, &mut channel) => { /* ... */ }, + /// _data = socket.read_packet() => { /* ... */ } + /// _ = futures::future::ready(()) => break + /// } + /// } + /// } + /// + /// ``` + /// + /// ### Moving to `merge` + /// + /// By using merge, you can unify multiple asynchronous tasks into a single stream, + /// eliminating the need to manage tasks manually and reducing the risk of + /// unintended behavior like data loss. + /// + /// ``` + /// use std::pin::pin; + /// + /// use futures::stream::unfold; + /// use tokio_stream::StreamExt; + /// + /// struct File; + /// struct Channel; + /// struct Socket; + /// + /// impl Socket { + /// async fn read_packet(&mut self) -> Vec { + /// vec![] + /// } + /// } + /// + /// async fn read_send(_file: &mut File, _channel: &mut Channel) { + /// // do work that is not cancel safe + /// } + /// + /// enum Message { + /// Stop, + /// Sent, + /// Data(Vec), + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// // open our IO types + /// let file = File; + /// let channel = Channel; + /// let socket = Socket; + /// + /// let a = unfold((file, channel), |(mut file, mut channel)| async { + /// read_send(&mut file, &mut channel).await; + /// Some((Message::Sent, (file, channel))) + /// }); + /// let b = unfold(socket, |mut socket| async { + /// let data = socket.read_packet().await; + /// Some((Message::Data(data), socket)) + /// }); + /// let c = tokio_stream::iter([Message::Stop]); + /// + /// let mut s = pin!(a.merge(b).merge(c)); + /// while let Some(msg) = s.next().await { + /// match msg { + /// Message::Data(_data) => { /* ... */ } + /// Message::Sent => continue, + /// Message::Stop => break, + /// } + /// } + /// } + /// ``` + /// + /// ## Racing Futures + /// + /// If you need to wait for the first completion among several asynchronous tasks, + /// ecosystem utilities such as + /// [`futures`](https://docs.rs/futures/latest/futures/), + /// [`futures-lite`](https://docs.rs/futures-lite/latest/futures_lite/) or + /// [`futures-concurrency`](https://docs.rs/futures-concurrency/latest/futures_concurrency/) + /// provide streamlined syntax for racing futures: + /// + /// - [`futures_concurrency::future::Race`](https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Race.html) + /// - [`futures::select`](https://docs.rs/futures/latest/futures/macro.select.html) + /// - [`futures::stream::select_all`](https://docs.rs/futures/latest/futures/stream/select_all/index.html) (for streams) + /// - [`futures_lite::future::or`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.or.html) + /// - [`futures_lite::future::race`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.race.html) + /// + /// ``` + /// use futures_concurrency::future::Race; + /// + /// #[tokio::main] + /// async fn main() { + /// let task_a = async { Ok("ok") }; + /// let task_b = async { Err("error") }; + /// let result = (task_a, task_b).race().await; + /// + /// match result { + /// Ok(output) => println!("First task completed with: {output}"), + /// Err(err) => eprintln!("Error occurred: {err}"), + /// } + /// } + /// ``` #[macro_export] #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] $select