From d375c609d82d5c7c02fac4899fa051fb9c1f4a43 Mon Sep 17 00:00:00 2001 From: KR-bluejay Date: Wed, 15 Oct 2025 13:31:35 +0000 Subject: [PATCH 1/5] sync: treat try_recv after close as disconnected (#7631) --- tokio/src/sync/mpsc/chan.rs | 9 ++++++++- tokio/tests/sync_mpsc.rs | 9 +++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 0bf760aa811..8fa23020d33 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -439,7 +439,14 @@ impl Rx { return Ok(value); } TryPopResult::Closed => return Err(TryRecvError::Disconnected), - TryPopResult::Empty => return Err(TryRecvError::Empty), + TryPopResult::Empty => { + // If close() was called, an empty queue should report Disconnected. + return if rx_fields.rx_closed { + Err(TryRecvError::Disconnected) + } else { + Err(TryRecvError::Empty) + }; + } TryPopResult::Busy => {} // fall through } }; diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 577e9c35faa..3ebac739015 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -966,6 +966,15 @@ fn try_recv_unbounded() { } } +#[test] +fn try_recv_after_receiver_close() { + let (_tx, mut rx) = mpsc::channel::<()>(5); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + rx.close(); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + #[test] fn try_recv_close_while_empty_bounded() { let (tx, mut rx) = mpsc::channel::<()>(5); From 0762d01a29d15d22cb18157b3c0cba4bc9b4c255 Mon Sep 17 00:00:00 2001 From: KR-bluejay Date: Wed, 15 Oct 2025 23:20:06 +0900 Subject: [PATCH 2/5] Update tokio/src/sync/mpsc/chan.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/mpsc/chan.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 8fa23020d33..6d21911834d 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -439,14 +439,9 @@ impl Rx { return Ok(value); } TryPopResult::Closed => return Err(TryRecvError::Disconnected), - TryPopResult::Empty => { - // If close() was called, an empty queue should report Disconnected. - return if rx_fields.rx_closed { - Err(TryRecvError::Disconnected) - } else { - Err(TryRecvError::Empty) - }; - } + // If close() was called, an empty queue should report Disconnected. + TryPopResult::Empty if rx_fields.rx_closed => Err(TryRecvError::Disconnected), + TryPopResult::Empty => Err(TryRecvError::Empty), TryPopResult::Busy => {} // fall through } }; From fa5b8bf0b8686fa3e1afbcf68dd9cb1dce19e875 Mon Sep 17 00:00:00 2001 From: KR-bluejay Date: Wed, 15 Oct 2025 15:02:06 +0000 Subject: [PATCH 3/5] sync: fix try_recv close match arm --- tokio/src/sync/mpsc/chan.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6d21911834d..20e2c90d4e6 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -440,8 +440,10 @@ impl Rx { } TryPopResult::Closed => return Err(TryRecvError::Disconnected), // If close() was called, an empty queue should report Disconnected. - TryPopResult::Empty if rx_fields.rx_closed => Err(TryRecvError::Disconnected), - TryPopResult::Empty => Err(TryRecvError::Empty), + TryPopResult::Empty if rx_fields.rx_closed => { + return Err(TryRecvError::Disconnected) + } + TryPopResult::Empty => return Err(TryRecvError::Empty), TryPopResult::Busy => {} // fall through } }; From e4347422b990294fbfe7e3e562121094c7eec776 Mon Sep 17 00:00:00 2001 From: KR-bluejay Date: Wed, 15 Oct 2025 16:00:17 +0000 Subject: [PATCH 4/5] sync: update list TryPopResult docs for receiver close --- tokio/src/sync/mpsc/list.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 90d9b828c8e..e9b3b8419b7 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -35,8 +35,13 @@ pub(crate) enum TryPopResult { /// Successfully popped a value. Ok(T), /// The channel is empty. + /// + /// Returned after `Rx::close()` when no values remain, + /// even if senders still exist. Empty, /// The channel is empty and closed. + /// + /// Returned when the send half is closed (all senders dropped). Closed, /// The channel is not empty, but the first value is being written. Busy, From 8212c428d09f125e2d2c42583bee374beb5dc14d Mon Sep 17 00:00:00 2001 From: KR-bluejay Date: Sat, 18 Oct 2025 16:05:31 +0900 Subject: [PATCH 5/5] sync: clarify TryPopResult empty doc Co-authored-by: Alice Ryhl --- tokio/src/sync/mpsc/list.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index e9b3b8419b7..118bac85633 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -36,8 +36,9 @@ pub(crate) enum TryPopResult { Ok(T), /// The channel is empty. /// - /// Returned after `Rx::close()` when no values remain, - /// even if senders still exist. + /// Note that `list.rs` only tracks the close state set by senders. If the + /// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still + /// returned, and the close state needs to be handled by `chan.rs`. Empty, /// The channel is empty and closed. ///