diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs index 8df81a580f7b8..e7283e5c5b733 100644 --- a/library/std/src/sync/mpmc/mod.rs +++ b/library/std/src/sync/mpmc/mod.rs @@ -1164,6 +1164,15 @@ impl Receiver { pub fn try_iter(&self) -> TryIter<'_, T> { TryIter { rx: self } } + + /// Checks if all senders have disconnected. + pub(crate) fn is_disconnected(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.is_disconnected(), + ReceiverFlavor::List(chan) => chan.is_disconnected(), + ReceiverFlavor::Zero(chan) => chan.is_disconnected(), + } + } } impl Receiver { diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs index f1ecf80fcb9f6..1376c5e489fd8 100644 --- a/library/std/src/sync/mpmc/zero.rs +++ b/library/std/src/sync/mpmc/zero.rs @@ -280,6 +280,11 @@ impl Channel { }) } + /// Returns `true` when the channel is disconnected. + pub(crate) fn is_disconnected(&self) -> bool { + self.inner.lock().unwrap().is_disconnected + } + /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. diff --git a/library/std/src/sync/oneshot.rs b/library/std/src/sync/oneshot.rs index b2c9ba34c0ff6..0053cd271e6be 100644 --- a/library/std/src/sync/oneshot.rs +++ b/library/std/src/sync/oneshot.rs @@ -212,6 +212,26 @@ impl Receiver { // Fallible methods. + /// Returns `Ok(true)` if the sender has sent a value over the channel. + /// + /// Returns `Ok(false)` if no value has been sent, but the corresponding [`Sender`] still exists + /// (hasn't been dropped yet). + /// + /// Once this method has returned `Ok(true)`, then any of the `recv` methods are guaranteed to + /// return the value successfully without blocking. + /// + /// Returns a [`RecvError`] if the corresponding [`Sender`] has disconnected before a value was + /// sent. + pub fn is_ready(&self) -> Result { + if !self.inner.is_empty() { + Ok(true) + } else if self.inner.is_disconnected() { + Err(RecvError) + } else { + Ok(false) + } + } + /// Attempts to return a pending value on this receiver without blocking. /// /// # Examples diff --git a/library/std/tests/sync/oneshot.rs b/library/std/tests/sync/oneshot.rs index 6eaacfbc6497d..d986cd363f05f 100644 --- a/library/std/tests/sync/oneshot.rs +++ b/library/std/tests/sync/oneshot.rs @@ -11,6 +11,7 @@ fn send_before_try_recv() { let (sender, receiver) = oneshot::channel(); assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.is_ready(), Ok(true)); match receiver.try_recv() { Ok(19) => {} @@ -42,6 +43,7 @@ fn sender_drop() { let (sender, receiver) = oneshot::channel::(); mem::drop(sender); + assert_eq!(receiver.is_ready(), Err(RecvError)); match receiver.recv() { Err(RecvError) => {} @@ -53,6 +55,7 @@ fn sender_drop() { let (sender, receiver) = oneshot::channel::(); mem::drop(sender); + assert_eq!(receiver.is_ready(), Err(RecvError)); match receiver.try_recv() { Err(TryRecvError::Disconnected) => {} @@ -63,6 +66,7 @@ fn sender_drop() { let (sender, receiver) = oneshot::channel::(); mem::drop(sender); + assert_eq!(receiver.is_ready(), Err(RecvError)); match receiver.recv_timeout(Duration::from_secs(1)) { Err(RecvTimeoutError::Disconnected) => {} @@ -76,6 +80,7 @@ fn send_never_deadline() { let (sender, receiver) = oneshot::channel::(); mem::drop(sender); + assert_eq!(receiver.is_ready(), Err(RecvError)); match receiver.recv_deadline(Instant::now()) { Err(RecvTimeoutError::Disconnected) => {} @@ -88,6 +93,7 @@ fn send_before_recv_timeout() { let (sender, receiver) = oneshot::channel(); assert!(sender.send(22i128).is_ok()); + assert_eq!(receiver.is_ready(), Ok(true)); let timeout = Duration::from_secs(1); match receiver.recv_timeout(timeout) { @@ -185,6 +191,8 @@ fn drop_sender_then_recv() { #[test] fn try_recv_empty() { let (sender, receiver) = oneshot::channel::(); + assert_eq!(receiver.is_ready(), Ok(false)); + match receiver.try_recv() { Err(TryRecvError::Empty(_)) => {} _ => panic!("expected empty error"), @@ -192,6 +200,29 @@ fn try_recv_empty() { mem::drop(sender); } +#[test] +fn not_ready_then_ready() { + let (sender, receiver) = oneshot::channel(); + assert_eq!(receiver.is_ready(), Ok(false)); + assert_eq!(receiver.is_ready(), Ok(false)); // Check stability. + + sender.send(42u128).unwrap(); + assert_eq!(receiver.is_ready(), Ok(true)); + assert_eq!(receiver.is_ready(), Ok(true)); // Check stability. + + assert_eq!(receiver.recv(), Ok(42)); +} + +#[test] +fn not_ready_then_disconnected() { + let (sender, receiver) = oneshot::channel::(); + assert_eq!(receiver.is_ready(), Ok(false)); + + mem::drop(sender); + assert_eq!(receiver.is_ready(), Err(RecvError)); + assert_eq!(receiver.is_ready(), Err(RecvError)); // Check stability. +} + #[test] fn try_recv_then_drop_receiver() { let (sender, receiver) = oneshot::channel::(); @@ -275,6 +306,8 @@ fn non_send_type_can_be_used_on_same_thread() { let (sender, receiver) = oneshot::channel(); sender.send(NotSend(ptr::null_mut())).unwrap(); + assert_eq!(receiver.is_ready(), Ok(true)); + let reply = receiver.try_recv().unwrap(); assert_eq!(reply, NotSend(ptr::null_mut())); } @@ -314,6 +347,7 @@ fn message_in_channel_dropped_on_receiver_drop() { sender.send(message).unwrap(); assert_eq!(counter.count(), 0); + assert_eq!(receiver.is_ready(), Ok(true)); mem::drop(receiver); assert_eq!(counter.count(), 1);