Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
- Migrated to 'tokio' crate.
- Updated `nix` to version 0.22.
- Minimmum supported Rust version updated to 1.46.0.
- Updated `tokio`to version 1.
- Updated `mio` to version 0.7.
- Updated `futures` to version 0.3.

## [0.5.3] - 2018-04-19

Expand Down
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio"
documentation = "https://docs.rs/sysfs_gpio/"
description = "Provides access to GPIOs using the Linux sysfs interface."
readme = "README.md"
edition = "2018"

[features]
mio-evented = ["mio"]
async-tokio = ["futures", "tokio", "mio-evented"]

[dependencies]
futures = { version = "0.1", optional = true }
futures = { version = "0.3", optional = true }
nix = "0.22"
mio = { version = "0.6", optional = true }
tokio = { version = "0.1", optional = true }
mio = { version = "0.7", optional = true, features = ["os-ext"]}
tokio = { version = "1", optional = true, features = ["net"] }

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

[[example]]
name = "tokio"
required-features = ["async-tokio"]
6 changes: 3 additions & 3 deletions examples/blinky.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ fn get_args() -> Option<Arguments> {
Err(_) => return None,
};
Some(Arguments {
pin: pin,
duration_ms: duration_ms,
period_ms: period_ms,
pin,
duration_ms,
period_ms,
})
}

Expand Down
67 changes: 24 additions & 43 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,44 @@
#[cfg(feature = "async-tokio")]
extern crate futures;
#[cfg(feature = "async-tokio")]
extern crate sysfs_gpio;
#[cfg(feature = "async-tokio")]
extern crate tokio;
// Copyright (c) 2020. The sysfs-gpio Authors.

#[cfg(feature = "async-tokio")]
use futures::future::join_all;
use futures::StreamExt;
use std::env;

#[cfg(feature = "async-tokio")]
use futures::{lazy, Future, Stream};

#[cfg(feature = "async-tokio")]
use sysfs_gpio::{Direction, Edge, Pin};

#[cfg(feature = "async-tokio")]
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> {
pin.export()?;
pin.set_direction(Direction::In)?;
pin.set_edge(Edge::BothEdges)?;
let mut gpio_events = pin.get_value_stream()?;
while let Some(evt) = gpio_events.next().await {
let val = evt.unwrap();
println!("Pin {} changed value to {}", pin.get_pin_num(), val);
}
Ok(())
}

async fn stream(pin_nums: Vec<u64>) {
// NOTE: this currently runs forever and as such if
// the app is stopped (Ctrl-C), no cleanup will happen
// and the GPIO will be left exported. Not much
// can be done about this as Rust signal handling isn't
// really present at the moment. Revisit later.
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
let task = lazy(move || {
for &(i, ref pin) in pins.iter() {
pin.export().unwrap();
pin.set_direction(Direction::In).unwrap();
pin.set_edge(Edge::BothEdges).unwrap();
tokio::spawn(
pin.get_value_stream()
.unwrap()
.for_each(move |val| {
println!("Pin {} changed value to {}", i, val);
Ok(())
})
.map_err(|_| ()),
);
}
Ok(())
});
tokio::run(task);

Ok(())
join_all(pin_nums.into_iter().map(|p| {
let pin = Pin::new(p);
tokio::task::spawn(monitor_pin(pin))
}))
.await;
}

#[cfg(feature = "async-tokio")]
fn main() {
#[tokio::main]
async fn main() {
let pins: Vec<u64> = env::args()
.skip(1)
.map(|a| a.parse().expect("Pins must be specified as integers"))
.collect();
if pins.is_empty() {
println!("Usage: ./tokio <pin> [pin ...]");
} else {
stream(pins).unwrap();
stream(pins).await;
}
}

#[cfg(not(feature = "async-tokio"))]
fn main() {
println!("This example requires the `tokio` feature to be enabled.");
}
2 changes: 0 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#[cfg(not(target_os = "wasi"))]
use nix;
use std::convert;
use std::fmt;
use std::io;
Expand Down
139 changes: 51 additions & 88 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,29 @@ extern crate nix;
#[cfg(feature = "async-tokio")]
extern crate tokio;

use std::fs;
use std::fs::File;
use std::io;
use std::io::prelude::*;
#[cfg(any(target_os = "linux", target_os = "android", feature = "async-tokio"))]
use std::io::SeekFrom;
#[cfg(not(target_os = "wasi"))]
use std::os::unix::prelude::*;
use std::path::Path;
use std::{fs, fs::File};

#[cfg(feature = "async-tokio")]
use futures::{Async, Poll, Stream};
use futures::{ready, Stream};
#[cfg(feature = "mio-evented")]
use mio::unix::EventedFd;
use mio::event::Source;
#[cfg(feature = "mio-evented")]
use mio::Evented;
use mio::unix::SourceFd;
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::sys::epoll::*;
#[cfg(not(target_os = "wasi"))]
use nix::unistd::close;
#[cfg(feature = "async-tokio")]
use tokio::reactor::{Handle, PollEvented};
use std::task::Poll;
#[cfg(feature = "async-tokio")]
use tokio::io::unix::AsyncFd;

pub use error::Error;

Expand Down Expand Up @@ -472,17 +473,6 @@ impl Pin {
AsyncPinPoller::new(self.pin_num)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `async-tokio` crate feature is enabled.
#[cfg(feature = "async-tokio")]
pub fn get_stream_with_handle(&self, handle: &Handle) -> Result<PinStream> {
PinStream::init_with_handle(*self, handle)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -494,22 +484,6 @@ impl Pin {
PinStream::init(*self)
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge(Edge::BothEdges)` before using this.
///
/// Note that the values produced are the value of the pin as soon as we get to handling the
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
/// it could end up producing the same value multiple times if the value has changed back
/// between when the interrupt occurred and when the value was read.
///
/// This method is only available when the `async-tokio` crate feature is enabled.
#[cfg(feature = "async-tokio")]
pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result<PinValueStream> {
Ok(PinValueStream(PinStream::init_with_handle(*self, handle)?))
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -536,9 +510,9 @@ fn extract_pin_fom_path_test() {
let tok3 = Pin::extract_pin_from_path(&"../../devices/soc0/gpiochip3/gpio/gpio124");
assert_eq!(124, tok3.unwrap());
let err1 = Pin::extract_pin_from_path(&"/sys/CLASS/gpio/gpio");
assert_eq!(true, err1.is_err());
assert!(err1.is_err());
let err2 = Pin::extract_pin_from_path(&"/sys/class/gpio/gpioSDS");
assert_eq!(true, err2.is_err());
assert!(err2.is_err());
}
#[cfg(not(target_os = "wasi"))]
#[derive(Debug)]
Expand Down Expand Up @@ -643,76 +617,70 @@ impl AsyncPinPoller {
}

#[cfg(feature = "mio-evented")]
impl Evented for AsyncPinPoller {
impl Source for AsyncPinPoller {
fn register(
&self,
poll: &mio::Poll,
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
interest: mio::Interest,
) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts)
SourceFd(&self.as_raw_fd()).register(poll, token, interest)
}

fn reregister(
&self,
poll: &mio::Poll,
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
interest: mio::Interest,
) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts)
SourceFd(&self.as_raw_fd()).reregister(poll, token, interest)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).deregister(poll)
fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).deregister(poll)
}
}

#[cfg(feature = "async-tokio")]
pub struct PinStream {
evented: PollEvented<AsyncPinPoller>,
skipped_first_event: bool,
#[cfg(any(feature = "async-tokio", feature = "mio-evented"))]
impl AsRawFd for AsyncPinPoller {
fn as_raw_fd(&self) -> RawFd {
self.devfile.as_raw_fd()
}
}

#[cfg(feature = "async-tokio")]
impl PinStream {
pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, handle)?,
skipped_first_event: false,
})
}
pub struct PinStream {
evented: AsyncFd<AsyncPinPoller>,
skipped_first_event: bool,
}

#[cfg(feature = "async-tokio")]
impl PinStream {
pub fn init(pin: Pin) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?,
evented: AsyncFd::new(pin.get_async_poller()?)?,
skipped_first_event: false,
})
}
}

#[cfg(feature = "async-tokio")]
impl Stream for PinStream {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(match self.evented.poll_read() {
Async::Ready(()) => {
self.evented.need_read()?;
if self.skipped_first_event {
Async::Ready(Some(()))
} else {
self.skipped_first_event = true;
Async::NotReady
}
type Item = Result<()>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let mut guard = ready!(self.evented.poll_read_ready(cx))?;
guard.clear_ready();
if self.skipped_first_event {
return Poll::Ready(Some(Ok(())));
} else {
self.skipped_first_event = true;
}
Async::NotReady => Async::NotReady,
})
}
}
}

Expand All @@ -729,18 +697,13 @@ impl PinValueStream {

#[cfg(feature = "async-tokio")]
impl Stream for PinValueStream {
type Item = u8;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(Some(()))) => {
let value = self.get_value()?;
Ok(Async::Ready(Some(value)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
type Item = Result<u8>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
ready!(std::pin::Pin::new(&mut self.0).poll_next(cx));
Poll::Ready(Some(Ok(self.get_value()?)))
}
}