Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use select() instead of mio polling in Unix EventSource #711

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ crossterm_winapi = "0.9"
#
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio = { version = "0.8", features = ["os-poll"] }
signal-hook = { version = "0.3.13" }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }

#
# Dev dependencies (examples, ...)
Expand Down
238 changes: 122 additions & 116 deletions src/event/source/unix.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,66 @@
use std::{collections::VecDeque, io, time::Duration};
use std::os::unix::prelude::AsRawFd;
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration};

use mio::{unix::SourceFd, Events, Interest, Poll, Token};
use signal_hook_mio::v0_8::Signals;
use signal_hook::low_level::pipe;

use crate::event::timeout::PollTimeout;
use crate::event::Event;
use crate::Result;

mod select;
use self::select::Selector;

#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,

use super::{
super::{
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,
},
InternalEvent,
},
timeout::PollTimeout,
Event, InternalEvent,
EventSource,
};

// Tokens to identify file descriptor
const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1);
/// Holds a prototypical Waker and a receiver we can wait on when doing select().
#[cfg(feature = "event-stream")]
const WAKE_TOKEN: Token = Token(2);
struct WakePipe {
receiver: UnixStream,
waker: Waker,
}

#[cfg(feature = "event-stream")]
impl WakePipe {
fn new() -> Result<Self> {
let (receiver, sender) = nonblocking_unix_pair()?;
Ok(WakePipe {
receiver,
waker: Waker::new(sender),
})
}
}

// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_024;

pub(crate) struct UnixInternalEventSource {
poll: Poll,
events: Events,
parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc,
signals: Signals,
tty: FileDesc,
winch_signal_receiver: UnixStream,
#[cfg(feature = "event-stream")]
waker: Waker,
wake_pipe: WakePipe,
}

fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> {
let (receiver, sender) = UnixStream::pair()?;
receiver.set_nonblocking(true)?;
sender.set_nonblocking(true)?;
Ok((receiver, sender))
}

impl UnixInternalEventSource {
Expand All @@ -45,128 +69,110 @@ impl UnixInternalEventSource {
}

pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
let poll = Poll::new()?;
let registry = poll.registry();

let tty_raw_fd = input_fd.raw_fd();
let mut tty_ev = SourceFd(&tty_raw_fd);
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;

let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?;
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;

#[cfg(feature = "event-stream")]
let waker = Waker::new(registry, WAKE_TOKEN)?;

Ok(UnixInternalEventSource {
poll,
events: Events::with_capacity(3),
parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd,
signals,
tty: input_fd,
winch_signal_receiver: {
let (receiver, sender) = nonblocking_unix_pair()?;
// Unregistering is unnecessary because EventSource is a singleton
pipe::register(libc::SIGWINCH, sender)?;
receiver
},
#[cfg(feature = "event-stream")]
waker,
wake_pipe: WakePipe::new()?,
})
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
if let Some(event) = self.parser.next() {
return Ok(Some(event));
/// read_complete reads from a non-blocking file descriptor
/// until the buffer is full or it would block.
///
/// Similar to `std::io::Read::read_to_end`, except this function
/// only fills the given buffer and does not read beyond that.
fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result<usize> {
loop {
match fd.read(buf, buf.len()) {
Ok(x) => return Ok(x),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => return Ok(0),
io::ErrorKind::Interrupted => continue,
_ => return Err(e),
},
}
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
let timeout = PollTimeout::new(timeout);
let mut selector = Selector::default();

loop {
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
return Err(e);
}
};

if self.events.is_empty() {
// No readiness events = timeout
return Ok(None);
while timeout.leftover().map_or(true, |t| !t.is_zero()) {
// check if there are buffered events from the last read
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
selector.add(&self.tty);
selector.add(&self.winch_signal_receiver);

for token in self.events.iter().map(|x| x.token()) {
match token {
TTY_TOKEN => {
loop {
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
Ok(read_count) => {
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
}
Err(e) => {
// No more data to read at the moment. We will receive another event
if e.kind() == io::ErrorKind::WouldBlock {
break;
}
// once more data is available to read.
else if e.kind() == io::ErrorKind::Interrupted {
continue;
}
}
};

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
}
#[cfg(feature = "event-stream")]
selector.add(&self.wake_pipe.receiver);

let _ = selector.select(timeout.leftover())?;
if selector.get(&self.tty).is_some() {
loop {
let read_count = read_complete(&self.tty, &mut self.tty_buffer)?;
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
SIGNAL_TOKEN => {
for signal in self.signals.pending() {
match signal {
signal_hook::consts::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!("Synchronize signal registration & handling"),
};
}

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
#[cfg(feature = "event-stream")]
WAKE_TOKEN => {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));

if read_count == 0 {
break;
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
}
}

// Processing above can take some time, check if timeout expired
if timeout.elapsed() {
return Ok(None);
if selector.get(&self.winch_signal_receiver).is_some() {
let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
#[cfg(feature = "event-stream")]
if selector.get(&self.wake_pipe.receiver).is_some() {
let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}

return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
}
Ok(None)
}

#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.waker.clone()
self.wake_pipe.waker.clone()
}
}

Expand Down
Loading