Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 26 additions & 48 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,49 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
rust: [nightly, beta, stable, 1.39.0]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v2

- name: Install latest ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
override: true

- name: Run basic cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --all-features

- name: Run cargo check
if: startsWith(matrix.rust, '1.39.0') == false
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests --all-features

- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
- run: cargo build --all --all-features --all-targets
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
uses: actions-rs/cargo@v1
with:
command: check
args: -Z features=dev_dep

- name: Run cargo test
if: startsWith(matrix.rust, '1.39.0') == false
uses: actions-rs/cargo@v1
with:
command: test
run: cargo check -Z features=dev_dep
- run: cargo test

# Copied from: https://github.com/rust-lang/stacker/pull/19/files
windows_gnu:
runs-on: windows-latest
strategy:
matrix:
rust_toolchain: [nightly]
rust_target:
rust: [nightly]
target:
- x86_64-pc-windows-gnu
steps:
- uses: actions/checkout@v1
- name: Install Rust nightly
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust_toolchain }}
target: ${{ matrix.rust_target }}
default: true
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
- run: rustup target add ${{ matrix.target }}
# https://github.com/rust-lang/rust/issues/49078
- name: Fix windows-gnu rust-mingw
run : |
for i in crt2.o dllcrt2.o libmingwex.a libmsvcrt.a ; do
cp -f "/C/ProgramData/Chocolatey/lib/mingw/tools/install/mingw64/x86_64-w64-mingw32/lib/$i" "`rustc --print sysroot`/lib/rustlib/x86_64-pc-windows-gnu/lib"
done
shell: bash
- uses: actions-rs/cargo@v1
with:
command: build
args: --target ${{ matrix.rust_target }} --all --benches --bins --examples --tests --all-features
- uses: actions-rs/cargo@v1
with:
command: test
args: --target ${{ matrix.rust_target }}
- run: cargo build --target ${{ matrix.target }} --all --all-features --all-targets
- run: cargo test --target ${{ matrix.target }}

msrv:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [1.46.0]
steps:
- uses: actions/checkout@v2
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
10 changes: 3 additions & 7 deletions .github/workflows/cross.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@ jobs:
os: [ubuntu-latest, macos-latest]

steps:
- uses: actions/checkout@master

- name: Install nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- uses: actions/checkout@v2
- name: Install Rust
run: rustup update stable

- name: Install cross
run: cargo install cross
Expand Down
8 changes: 2 additions & 6 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: clippy
- name: Install Rust
run: rustup update stable

- uses: actions-rs/clippy-check@v1
with:
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod driver;
mod reactor;

pub use driver::block_on;
pub use reactor::{Readable, Writable};

/// Use `Duration::MAX` once `duration_constants` are stabilized.
fn duration_max() -> Duration {
Expand Down Expand Up @@ -686,8 +687,8 @@ impl<T> Async<T> {
/// listener.readable().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.source.readable().await
pub fn readable(&self) -> Readable {
self.source.readable()
}

/// Waits until the I/O handle is writable.
Expand All @@ -708,8 +709,8 @@ impl<T> Async<T> {
/// stream.writable().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.source.writable().await
pub fn writable(&self) -> Writable {
self.source.writable()
}

/// Polls the I/O handle for readability.
Expand Down
180 changes: 121 additions & 59 deletions src/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

use concurrent_queue::ConcurrentQueue;
use futures_lite::future;
use futures_lite::ready;
use once_cell::sync::Lazy;
use polling::{Event, Poller};
use slab::Slab;
Expand Down Expand Up @@ -441,78 +443,138 @@ impl Source {
}

/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
self.ready(READ).await?;
log::trace!("readable: fd={}", self.raw);
Ok(())
pub(crate) fn readable(self: &Arc<Self>) -> Readable {
Readable(self.ready(READ))
}

/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
self.ready(WRITE).await?;
log::trace!("writable: fd={}", self.raw);
Ok(())
pub(crate) fn writable(self: &Arc<Self>) -> Writable {
Writable(self.ready(WRITE))
}

/// Waits until the I/O source is readable or writable.
async fn ready(&self, dir: usize) -> io::Result<()> {
let mut ticks = None;
let mut index = None;
let mut _guard = None;

future::poll_fn(|cx| {
let mut state = self.state.lock().unwrap();

// Check if the reactor has delivered an event.
if let Some((a, b)) = ticks {
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[dir].tick != a && state[dir].tick != b {
return Poll::Ready(Ok(()));
}
fn ready(self: &Arc<Self>, dir: usize) -> Ready {
Ready {
source: self.clone(),
dir,
ticks: None,
index: None,
_guard: None,
}
}
}

/// Future for [`Async::readable`](crate::Async::readable).
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Readable(Ready);

impl Future for Readable {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("readable: fd={}", self.0.source.raw);
Poll::Ready(Ok(()))
}
}

/// Future for [`Async::writable`](crate::Async::writable).
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Writable(Ready);

impl Future for Writable {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("writable: fd={}", self.0.source.raw);
Poll::Ready(Ok(()))
}
}

#[derive(Debug)]
struct Ready {
source: Arc<Source>,
dir: usize,
ticks: Option<(usize, usize)>,
index: Option<usize>,
_guard: Option<RemoveOnDrop>,
}

impl Future for Ready {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
source,
dir,
ticks,
index,
_guard,
} = &mut *self;

let mut state = source.state.lock().unwrap();

// Check if the reactor has delivered an event.
if let Some((a, b)) = *ticks {
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[*dir].tick != a && state[*dir].tick != b {
return Poll::Ready(Ok(()));
}
}

let was_empty = state[dir].is_empty();

// Register the current task's waker.
let i = match index {
Some(i) => i,
None => {
let i = state[dir].wakers.insert(None);
_guard = Some(CallOnDrop(move || {
let mut state = self.state.lock().unwrap();
state[dir].wakers.remove(i);
}));
index = Some(i);
ticks = Some((Reactor::get().ticker(), state[dir].tick));
i
}
};
state[dir].wakers[i] = Some(cx.waker().clone());

// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.modify(
self.raw,
Event {
key: self.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
let was_empty = state[*dir].is_empty();

// Register the current task's waker.
let i = match *index {
Some(i) => i,
None => {
let i = state[*dir].wakers.insert(None);
*_guard = Some(RemoveOnDrop {
source: source.clone(),
dir: *dir,
key: i,
});
*index = Some(i);
*ticks = Some((Reactor::get().ticker(), state[*dir].tick));
i
}
};
state[*dir].wakers[i] = Some(cx.waker().clone());

Poll::Pending
})
.await
// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.modify(
source.raw,
Event {
key: source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}

Poll::Pending
}
}

/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);
/// Remove waker when dropped.
#[derive(Debug)]
struct RemoveOnDrop {
source: Arc<Source>,
dir: usize,
key: usize,
}

impl<F: Fn()> Drop for CallOnDrop<F> {
impl Drop for RemoveOnDrop {
fn drop(&mut self) {
(self.0)();
let mut state = self.source.state.lock().unwrap();
let wakers = &mut state[self.dir].wakers;
if wakers.contains(self.key) {
wakers.remove(self.key);
}
}
}