diff --git a/CHANGELOG.md b/CHANGELOG.md index c6782f50d..0c6803fee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ - Migrated to 'tokio' crate. - Updated `nix` to version 0.22. - Minimmum supported Rust version updated to 1.46.0. +- Updated `tokio`to version 1. +- Updated `mio` to version 0.7. +- Updated `futures` to version 0.3. ## [0.5.3] - 2018-04-19 diff --git a/Cargo.toml b/Cargo.toml index 3622735eb..43e3094d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,21 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio" documentation = "https://docs.rs/sysfs_gpio/" description = "Provides access to GPIOs using the Linux sysfs interface." readme = "README.md" +edition = "2018" [features] mio-evented = ["mio"] async-tokio = ["futures", "tokio", "mio-evented"] [dependencies] -futures = { version = "0.1", optional = true } +futures = { version = "0.3", optional = true } nix = "0.22" -mio = { version = "0.6", optional = true } -tokio = { version = "0.1", optional = true } +mio = { version = "0.7", optional = true, features = ["os-ext"]} +tokio = { version = "1", optional = true, features = ["net"] } + +[dev-dependencies] +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } + +[[example]] +name = "tokio" +required-features = ["async-tokio"] diff --git a/examples/blinky.rs b/examples/blinky.rs index 156273a81..4e917febc 100644 --- a/examples/blinky.rs +++ b/examples/blinky.rs @@ -58,9 +58,9 @@ fn get_args() -> Option { Err(_) => return None, }; Some(Arguments { - pin: pin, - duration_ms: duration_ms, - period_ms: period_ms, + pin, + duration_ms, + period_ms, }) } diff --git a/examples/tokio.rs b/examples/tokio.rs index a0810b3ce..94d80cee2 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -1,51 +1,37 @@ -#[cfg(feature = "async-tokio")] -extern crate futures; -#[cfg(feature = "async-tokio")] -extern crate sysfs_gpio; -#[cfg(feature = "async-tokio")] -extern crate tokio; +// Copyright (c) 2020. The sysfs-gpio Authors. -#[cfg(feature = "async-tokio")] +use futures::future::join_all; +use futures::StreamExt; use std::env; - -#[cfg(feature = "async-tokio")] -use futures::{lazy, Future, Stream}; - -#[cfg(feature = "async-tokio")] use sysfs_gpio::{Direction, Edge, Pin}; -#[cfg(feature = "async-tokio")] -fn stream(pin_nums: Vec) -> sysfs_gpio::Result<()> { +async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> { + pin.export()?; + pin.set_direction(Direction::In)?; + pin.set_edge(Edge::BothEdges)?; + let mut gpio_events = pin.get_value_stream()?; + while let Some(evt) = gpio_events.next().await { + let val = evt.unwrap(); + println!("Pin {} changed value to {}", pin.get_pin_num(), val); + } + Ok(()) +} + +async fn stream(pin_nums: Vec) { // NOTE: this currently runs forever and as such if // the app is stopped (Ctrl-C), no cleanup will happen // and the GPIO will be left exported. Not much // can be done about this as Rust signal handling isn't // really present at the moment. Revisit later. - let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect(); - let task = lazy(move || { - for &(i, ref pin) in pins.iter() { - pin.export().unwrap(); - pin.set_direction(Direction::In).unwrap(); - pin.set_edge(Edge::BothEdges).unwrap(); - tokio::spawn( - pin.get_value_stream() - .unwrap() - .for_each(move |val| { - println!("Pin {} changed value to {}", i, val); - Ok(()) - }) - .map_err(|_| ()), - ); - } - Ok(()) - }); - tokio::run(task); - - Ok(()) + join_all(pin_nums.into_iter().map(|p| { + let pin = Pin::new(p); + tokio::task::spawn(monitor_pin(pin)) + })) + .await; } -#[cfg(feature = "async-tokio")] -fn main() { +#[tokio::main] +async fn main() { let pins: Vec = env::args() .skip(1) .map(|a| a.parse().expect("Pins must be specified as integers")) @@ -53,11 +39,6 @@ fn main() { if pins.is_empty() { println!("Usage: ./tokio [pin ...]"); } else { - stream(pins).unwrap(); + stream(pins).await; } } - -#[cfg(not(feature = "async-tokio"))] -fn main() { - println!("This example requires the `tokio` feature to be enabled."); -} diff --git a/src/error.rs b/src/error.rs index 5b9a46b31..a2b7db441 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,3 @@ -#[cfg(not(target_os = "wasi"))] -use nix; use std::convert; use std::fmt; use std::io; diff --git a/src/lib.rs b/src/lib.rs index 044c406fa..46a68f853 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,8 +53,6 @@ extern crate nix; #[cfg(feature = "async-tokio")] extern crate tokio; -use std::fs; -use std::fs::File; use std::io; use std::io::prelude::*; #[cfg(any(target_os = "linux", target_os = "android", feature = "async-tokio"))] @@ -62,19 +60,22 @@ use std::io::SeekFrom; #[cfg(not(target_os = "wasi"))] use std::os::unix::prelude::*; use std::path::Path; +use std::{fs, fs::File}; #[cfg(feature = "async-tokio")] -use futures::{Async, Poll, Stream}; +use futures::{ready, Stream}; #[cfg(feature = "mio-evented")] -use mio::unix::EventedFd; +use mio::event::Source; #[cfg(feature = "mio-evented")] -use mio::Evented; +use mio::unix::SourceFd; #[cfg(any(target_os = "linux", target_os = "android"))] use nix::sys::epoll::*; #[cfg(not(target_os = "wasi"))] use nix::unistd::close; #[cfg(feature = "async-tokio")] -use tokio::reactor::{Handle, PollEvented}; +use std::task::Poll; +#[cfg(feature = "async-tokio")] +use tokio::io::unix::AsyncFd; pub use error::Error; @@ -472,17 +473,6 @@ impl Pin { AsyncPinPoller::new(self.pin_num) } - /// Get a Stream of pin interrupts for this pin - /// - /// The PinStream object can be used with the `tokio` crate. You should probably call - /// `set_edge()` before using this. - /// - /// This method is only available when the `async-tokio` crate feature is enabled. - #[cfg(feature = "async-tokio")] - pub fn get_stream_with_handle(&self, handle: &Handle) -> Result { - PinStream::init_with_handle(*self, handle) - } - /// Get a Stream of pin interrupts for this pin /// /// The PinStream object can be used with the `tokio` crate. You should probably call @@ -494,22 +484,6 @@ impl Pin { PinStream::init(*self) } - /// Get a Stream of pin values for this pin - /// - /// The PinStream object can be used with the `tokio` crate. You should probably call - /// `set_edge(Edge::BothEdges)` before using this. - /// - /// Note that the values produced are the value of the pin as soon as we get to handling the - /// interrupt in userspace. Each time this stream produces a value, a change has occurred, but - /// it could end up producing the same value multiple times if the value has changed back - /// between when the interrupt occurred and when the value was read. - /// - /// This method is only available when the `async-tokio` crate feature is enabled. - #[cfg(feature = "async-tokio")] - pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result { - Ok(PinValueStream(PinStream::init_with_handle(*self, handle)?)) - } - /// Get a Stream of pin values for this pin /// /// The PinStream object can be used with the `tokio` crate. You should probably call @@ -536,9 +510,9 @@ fn extract_pin_fom_path_test() { let tok3 = Pin::extract_pin_from_path(&"../../devices/soc0/gpiochip3/gpio/gpio124"); assert_eq!(124, tok3.unwrap()); let err1 = Pin::extract_pin_from_path(&"/sys/CLASS/gpio/gpio"); - assert_eq!(true, err1.is_err()); + assert!(err1.is_err()); let err2 = Pin::extract_pin_from_path(&"/sys/class/gpio/gpioSDS"); - assert_eq!(true, err2.is_err()); + assert!(err2.is_err()); } #[cfg(not(target_os = "wasi"))] #[derive(Debug)] @@ -643,53 +617,48 @@ impl AsyncPinPoller { } #[cfg(feature = "mio-evented")] -impl Evented for AsyncPinPoller { +impl Source for AsyncPinPoller { fn register( - &self, - poll: &mio::Poll, + &mut self, + poll: &mio::Registry, token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + interest: mio::Interest, ) -> io::Result<()> { - EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts) + SourceFd(&self.as_raw_fd()).register(poll, token, interest) } fn reregister( - &self, - poll: &mio::Poll, + &mut self, + poll: &mio::Registry, token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + interest: mio::Interest, ) -> io::Result<()> { - EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts) + SourceFd(&self.as_raw_fd()).reregister(poll, token, interest) } - fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { - EventedFd(&self.devfile.as_raw_fd()).deregister(poll) + fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).deregister(poll) } } -#[cfg(feature = "async-tokio")] -pub struct PinStream { - evented: PollEvented, - skipped_first_event: bool, +#[cfg(any(feature = "async-tokio", feature = "mio-evented"))] +impl AsRawFd for AsyncPinPoller { + fn as_raw_fd(&self) -> RawFd { + self.devfile.as_raw_fd() + } } #[cfg(feature = "async-tokio")] -impl PinStream { - pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result { - Ok(PinStream { - evented: PollEvented::new(pin.get_async_poller()?, handle)?, - skipped_first_event: false, - }) - } +pub struct PinStream { + evented: AsyncFd, + skipped_first_event: bool, } #[cfg(feature = "async-tokio")] impl PinStream { pub fn init(pin: Pin) -> Result { Ok(PinStream { - evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?, + evented: AsyncFd::new(pin.get_async_poller()?)?, skipped_first_event: false, }) } @@ -697,22 +666,21 @@ impl PinStream { #[cfg(feature = "async-tokio")] impl Stream for PinStream { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - Ok(match self.evented.poll_read() { - Async::Ready(()) => { - self.evented.need_read()?; - if self.skipped_first_event { - Async::Ready(Some(())) - } else { - self.skipped_first_event = true; - Async::NotReady - } + type Item = Result<()>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + let mut guard = ready!(self.evented.poll_read_ready(cx))?; + guard.clear_ready(); + if self.skipped_first_event { + return Poll::Ready(Some(Ok(()))); + } else { + self.skipped_first_event = true; } - Async::NotReady => Async::NotReady, - }) + } } } @@ -729,18 +697,13 @@ impl PinValueStream { #[cfg(feature = "async-tokio")] impl Stream for PinValueStream { - type Item = u8; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.0.poll() { - Ok(Async::Ready(Some(()))) => { - let value = self.get_value()?; - Ok(Async::Ready(Some(value))) - } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e), - } + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + ready!(std::pin::Pin::new(&mut self.0).poll_next(cx)); + Poll::Ready(Some(Ok(self.get_value()?))) } }