From be676402ff4d8a866ffa08026b79e1f7bd3a23bd Mon Sep 17 00:00:00 2001 From: Stepan Koltsov Date: Wed, 14 Sep 2016 22:15:01 +0300 Subject: [PATCH] Do not hang in poll if reactor is destroyed If channel is dropped, receiver may still return EOF, and if channel is alive, receiver produces an error. --- src/channel.rs | 2 +- src/net/tcp.rs | 4 ++-- src/net/udp.rs | 4 ++-- src/reactor/channel.rs | 36 +++++++++++++++++++++++++----- src/reactor/io_token.rs | 18 ++++++++++----- src/reactor/mod.rs | 43 +++++++++++++++++++++++++----------- src/reactor/poll_evented.rs | 38 ++++++++++++++++++++----------- src/reactor/timeout.rs | 5 +++-- src/reactor/timeout_token.rs | 8 +++++-- tests/channel.rs | 32 +++++++++++++++++++++++++++ tests/spawn.rs | 4 ++-- 11 files changed, 146 insertions(+), 48 deletions(-) create mode 100644 tests/channel.rs diff --git a/src/channel.rs b/src/channel.rs index 85c1d410..e9e86ba0 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -99,7 +99,7 @@ impl Stream for Receiver { match self.rx.get_ref().try_recv() { Ok(t) => Ok(Async::Ready(Some(t))), Err(TryRecvError::Empty) => { - self.rx.need_read(); + try!(self.rx.need_read()); Ok(Async::NotReady) } Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)), diff --git a/src/net/tcp.rs b/src/net/tcp.rs index c6c65983..15dca80b 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -108,7 +108,7 @@ impl TcpListener { match self.inner.io.get_ref().accept() { Ok(pair) => Ok(Async::Ready(Some(pair))), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.inner.io.need_read(); + try!(self.inner.io.need_read()); Ok(Async::NotReady) } Err(e) => Err(e) @@ -127,7 +127,7 @@ impl TcpListener { }); tx.complete(res); Ok(()) - }); + }).expect("failed to spawn"); rx.then(|r| r.expect("shouldn't be canceled")) }).boxed(), } diff --git a/src/net/udp.rs b/src/net/udp.rs index ceaf3615..bf950b25 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -79,7 +79,7 @@ impl UdpSocket { match self.io.get_ref().send_to(buf, target) { Ok(Some(n)) => Ok(n), Ok(None) => { - self.io.need_write(); + try!(self.io.need_write()); Err(mio::would_block()) } Err(e) => Err(e), @@ -95,7 +95,7 @@ impl UdpSocket { match self.io.get_ref().recv_from(buf) { Ok(Some(n)) => Ok(n), Ok(None) => { - self.io.need_read(); + try!(self.io.need_read()); Err(mio::would_block()) } Err(e) => Err(e), diff --git a/src/reactor/channel.rs b/src/reactor/channel.rs index e3ddd393..2d20543e 100644 --- a/src/reactor/channel.rs +++ b/src/reactor/channel.rs @@ -10,25 +10,35 @@ use std::cell::Cell; use std::io; use std::marker; use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use mio; use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl}; use mpsc_queue::{Queue, PopResult}; +struct Inner { + queue: Queue, + receiver_alive: AtomicBool, +} + pub struct Sender { ctl: SenderCtl, - inner: Arc>, + inner: Arc>, } pub struct Receiver { ctl: ReceiverCtl, - inner: Arc>, + inner: Arc>, _marker: marker::PhantomData>, // this type is not Sync } pub fn channel() -> (Sender, Receiver) { - let inner = Arc::new(Queue::new()); + let inner = Arc::new(Inner { + queue: Queue::new(), + receiver_alive: AtomicBool::new(true), + }); let (tx, rx) = ctl_pair(); let tx = Sender { @@ -45,7 +55,10 @@ pub fn channel() -> (Sender, Receiver) { impl Sender { pub fn send(&self, data: T) -> io::Result<()> { - self.inner.push(data); + if !self.inner.receiver_alive.load(Ordering::SeqCst) { + return Err(io::Error::new(io::ErrorKind::Other, "receiver has been closed")); + } + self.inner.queue.push(data); self.ctl.inc() } } @@ -57,7 +70,7 @@ impl Receiver { // // We, however, are the only thread with a `Receiver` because this // type is not `Sync`. and we never handed out another instance. - match unsafe { self.inner.pop() } { + match unsafe { self.inner.queue.pop() } { PopResult::Data(t) => { try!(self.ctl.dec()); Ok(Some(t)) @@ -85,6 +98,13 @@ impl Receiver { } } +// Close receiver, so further send operations would fail. +// This function is used internally in `Core` and is not exposed as public API. +pub fn close_receiver(receiver: &Receiver) { + receiver.inner.as_ref().receiver_alive.store(false, Ordering::SeqCst); +} + + // Just delegate everything to `self.ctl` impl mio::Evented for Receiver { fn register(&self, @@ -108,6 +128,12 @@ impl mio::Evented for Receiver { } } +impl Drop for Receiver { + fn drop(&mut self) { + close_receiver(self); + } +} + impl Clone for Sender { fn clone(&self) -> Sender { Sender { diff --git a/src/reactor/io_token.rs b/src/reactor/io_token.rs index 94782e5f..0334e916 100644 --- a/src/reactor/io_token.rs +++ b/src/reactor/io_token.rs @@ -71,6 +71,8 @@ impl IoToken { /// receive further notifications it will need to call `schedule_read` /// again. /// + /// This function returns an error if reactor is destroyed. + /// /// > **Note**: This method should generally not be used directly, but /// > rather the `ReadinessStream` type should be used instead. /// @@ -82,8 +84,8 @@ impl IoToken { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_read(&self, handle: &Remote) { - handle.send(Message::Schedule(self.token, task::park(), Direction::Read)); + pub fn schedule_read(&self, handle: &Remote) -> io::Result<()> { + handle.send(Message::Schedule(self.token, task::park(), Direction::Read)) } /// Schedule the current future task to receive a notification when the @@ -98,6 +100,8 @@ impl IoToken { /// receive further notifications it will need to call `schedule_write` /// again. /// + /// This function returns an error if reactor is destroyed. + /// /// > **Note**: This method should generally not be used directly, but /// > rather the `ReadinessStream` type should be used instead. /// @@ -109,8 +113,8 @@ impl IoToken { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_write(&self, handle: &Remote) { - handle.send(Message::Schedule(self.token, task::park(), Direction::Write)); + pub fn schedule_write(&self, handle: &Remote) -> io::Result<()> { + handle.send(Message::Schedule(self.token, task::park(), Direction::Write)) } /// Unregister all information associated with a token on an event loop, @@ -127,6 +131,8 @@ impl IoToken { /// ensure that the callbacks are **not** invoked, so pending scheduled /// callbacks cannot be relied upon to get called. /// + /// This function returns an error if reactor is destroyed. + /// /// > **Note**: This method should generally not be used directly, but /// > rather the `ReadinessStream` type should be used instead. /// @@ -135,7 +141,7 @@ impl IoToken { /// This function will panic if the event loop this handle is associated /// with has gone away, or if there is an error communicating with the event /// loop. - pub fn drop_source(&self, handle: &Remote) { - handle.send(Message::DropSource(self.token)); + pub fn drop_source(&self, handle: &Remote) -> io::Result<()> { + handle.send(Message::DropSource(self.token)) } } diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index a5090497..ca51869f 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -408,6 +408,27 @@ impl Core { } } +impl Drop for Core { + fn drop(&mut self) { + // Close the receiver, so all schedule operations will be rejected. + // Do it explicitly before unparking to avoid race condition. + channel::close_receiver(&self.rx); + + // Unpark all tasks. + // It has no effect for tasks in this event loop, + // however tasks in another executors get an error + // when they do `poll` right after wakeup. + for io in self.inner.borrow_mut().io_dispatch.iter_mut() { + if let Some(ref mut reader) = io.reader { + reader.unpark(); + } + if let Some(ref mut writer) = io.writer { + writer.unpark(); + } + } + } +} + impl Inner { fn add_source(&mut self, source: &mio::Evented) -> io::Result<(Arc, usize)> { @@ -498,7 +519,7 @@ impl Inner { } impl Remote { - fn send(&self, msg: Message) { + fn send(&self, msg: Message) -> io::Result<()> { self.with_loop(|lp| { match lp { Some(lp) => { @@ -506,18 +527,12 @@ impl Remote { // that our message is processed "in order" lp.consume_queue(); lp.notify(msg); + Ok(()) } None => { - match self.tx.send(msg) { - Ok(()) => {} - - // This should only happen when there was an error - // writing to the pipe to wake up the event loop, - // hopefully that never happens - Err(e) => { - panic!("error sending message to event loop: {}", e) - } - } + // May return an error if receiver is closed + // or if there was an error writing to the pipe. + self.tx.send(msg) } } }) @@ -548,7 +563,9 @@ impl Remote { /// /// Note that while the closure, `F`, requires the `Send` bound as it might /// cross threads, the future `R` does not. - pub fn spawn(&self, f: F) + /// + /// This function returns an error if reactor is destroyed. + pub fn spawn(&self, f: F) -> io::Result<()> where F: FnOnce(&Handle) -> R + Send + 'static, R: IntoFuture, R::Future: 'static, @@ -556,7 +573,7 @@ impl Remote { self.send(Message::Run(Box::new(|lp: &Core| { let f = f(&lp.handle()); lp.inner.borrow_mut().spawn(Box::new(f.into_future())); - }))); + }))) } } diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 5dd89713..7c0a88d9 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -71,8 +71,11 @@ impl PollEvented { if self.readiness.load(Ordering::SeqCst) & 1 != 0 { Async::Ready(()) } else { - self.token.schedule_read(&self.handle); - Async::NotReady + match self.token.schedule_read(&self.handle) { + Ok(()) => Async::NotReady, + // next read will return error + Err(_) => Async::Ready(()), + } } } @@ -91,8 +94,12 @@ impl PollEvented { if self.readiness.load(Ordering::SeqCst) & 2 != 0 { Async::Ready(()) } else { - self.token.schedule_write(&self.handle); - Async::NotReady + // ignore error, will be handled in need_write + match self.token.schedule_write(&self.handle) { + Ok(()) => Async::NotReady, + // next read will return error + Err(_) => Async::Ready(()), + } } } @@ -111,7 +118,9 @@ impl PollEvented { /// Note that it is also only valid to call this method if `poll_read` /// previously indicated that the object is readable. That is, this function /// must always be paired with calls to `poll_read` previously. - pub fn need_read(&self) { + /// + /// This function returns an error if reactor is destroyed. + pub fn need_read(&self) -> io::Result<()> { self.readiness.fetch_and(!1, Ordering::SeqCst); self.token.schedule_read(&self.handle) } @@ -131,7 +140,9 @@ impl PollEvented { /// Note that it is also only valid to call this method if `poll_write` /// previously indicated that the object is writeable. That is, this function /// must always be paired with calls to `poll_write` previously. - pub fn need_write(&self) { + /// + /// This function returns an error if reactor is destroyed. + pub fn need_write(&self) -> io::Result<()> { self.readiness.fetch_and(!2, Ordering::SeqCst); self.token.schedule_write(&self.handle) } @@ -162,7 +173,7 @@ impl Read for PollEvented { } let r = self.get_mut().read(buf); if is_wouldblock(&r) { - self.need_read(); + try!(self.need_read()); } return r } @@ -175,7 +186,7 @@ impl Write for PollEvented { } let r = self.get_mut().write(buf); if is_wouldblock(&r) { - self.need_write(); + try!(self.need_write()); } return r } @@ -186,7 +197,7 @@ impl Write for PollEvented { } let r = self.get_mut().flush(); if is_wouldblock(&r) { - self.need_write(); + try!(self.need_write()); } return r } @@ -211,7 +222,7 @@ impl<'a, E> Read for &'a PollEvented } let r = self.get_ref().read(buf); if is_wouldblock(&r) { - self.need_read(); + try!(self.need_read()); } return r } @@ -226,7 +237,7 @@ impl<'a, E> Write for &'a PollEvented } let r = self.get_ref().write(buf); if is_wouldblock(&r) { - self.need_write(); + try!(self.need_write()); } return r } @@ -237,7 +248,7 @@ impl<'a, E> Write for &'a PollEvented } let r = self.get_ref().flush(); if is_wouldblock(&r) { - self.need_write(); + try!(self.need_write()); } return r } @@ -264,6 +275,7 @@ fn is_wouldblock(r: &io::Result) -> bool { impl Drop for PollEvented { fn drop(&mut self) { - self.token.drop_source(&self.handle); + // Ignore error + drop(self.token.drop_source(&self.handle)); } } diff --git a/src/reactor/timeout.rs b/src/reactor/timeout.rs index 7efbf54b..4fa31198 100644 --- a/src/reactor/timeout.rs +++ b/src/reactor/timeout.rs @@ -56,7 +56,7 @@ impl Future for Timeout { if *self.token.when() <= now { Ok(Async::Ready(())) } else { - self.token.update_timeout(&self.handle); + try!(self.token.update_timeout(&self.handle)); Ok(Async::NotReady) } } @@ -64,6 +64,7 @@ impl Future for Timeout { impl Drop for Timeout { fn drop(&mut self) { - self.token.cancel_timeout(&self.handle); + // Ignore error + drop(self.token.cancel_timeout(&self.handle)); } } diff --git a/src/reactor/timeout_token.rs b/src/reactor/timeout_token.rs index 98b38716..ca74bcd3 100644 --- a/src/reactor/timeout_token.rs +++ b/src/reactor/timeout_token.rs @@ -36,21 +36,25 @@ impl TimeoutToken { /// Updates a previously added timeout to notify a new task instead. /// + /// This function returns an error if reactor is destroyed. + /// /// # Panics /// /// This method will panic if the timeout specified was not created by this /// loop handle's `add_timeout` method. - pub fn update_timeout(&self, handle: &Remote) { + pub fn update_timeout(&self, handle: &Remote) -> io::Result<()> { handle.send(Message::UpdateTimeout(self.token, task::park())) } /// Cancel a previously added timeout. /// + /// This function returns an error if reactor is destroyed. + /// /// # Panics /// /// This method will panic if the timeout specified was not created by this /// loop handle's `add_timeout` method. - pub fn cancel_timeout(&self, handle: &Remote) { + pub fn cancel_timeout(&self, handle: &Remote) -> io::Result<()> { debug!("cancel timeout {}", self.token); handle.send(Message::CancelTimeout(self.token)) } diff --git a/tests/channel.rs b/tests/channel.rs new file mode 100644 index 00000000..d53d8d0a --- /dev/null +++ b/tests/channel.rs @@ -0,0 +1,32 @@ +extern crate tokio_core; +extern crate futures; + +use tokio_core::reactor::Core; +use tokio_core::channel::channel; + +use futures::stream::Stream; + + +#[test] +fn recv_after_core_drop() { + let core: Core = Core::new().unwrap(); + + let (_sender, receiver) = channel::(&core.handle()).unwrap(); + + drop(core); + + assert!(receiver.wait().next().unwrap().is_err()); +} + +#[test] +fn recv_after_core_and_sender_drop() { + let core: Core = Core::new().unwrap(); + + let (sender, receiver) = channel::(&core.handle()).unwrap(); + + drop(core); + drop(sender); + + // although core is dropped, receiver still properly returns EOF + assert!(receiver.wait().next().is_none()); +} diff --git a/tests/spawn.rs b/tests/spawn.rs index 1d39543d..ed88a9ef 100644 --- a/tests/spawn.rs +++ b/tests/spawn.rs @@ -21,7 +21,7 @@ fn simple() { tx2.complete(2); Ok(()) }) - }); + }).expect("failed to spawn"); assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2)); } @@ -41,7 +41,7 @@ fn spawn_in_poll() { tx2.complete(2); Ok(()) }) - }); + }).expect("failed to spawn"); Ok(()) }));