diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 0bf760aa811..20e2c90d4e6 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -439,6 +439,10 @@ impl Rx { return Ok(value); } TryPopResult::Closed => return Err(TryRecvError::Disconnected), + // If close() was called, an empty queue should report Disconnected. + TryPopResult::Empty if rx_fields.rx_closed => { + return Err(TryRecvError::Disconnected) + } TryPopResult::Empty => return Err(TryRecvError::Empty), TryPopResult::Busy => {} // fall through } diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 90d9b828c8e..118bac85633 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -35,8 +35,14 @@ pub(crate) enum TryPopResult { /// Successfully popped a value. Ok(T), /// The channel is empty. + /// + /// Note that `list.rs` only tracks the close state set by senders. If the + /// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still + /// returned, and the close state needs to be handled by `chan.rs`. Empty, /// The channel is empty and closed. + /// + /// Returned when the send half is closed (all senders dropped). Closed, /// The channel is not empty, but the first value is being written. Busy, diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 577e9c35faa..3ebac739015 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -966,6 +966,15 @@ fn try_recv_unbounded() { } } +#[test] +fn try_recv_after_receiver_close() { + let (_tx, mut rx) = mpsc::channel::<()>(5); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + rx.close(); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + #[test] fn try_recv_close_while_empty_bounded() { let (tx, mut rx) = mpsc::channel::<()>(5);