Skip to content

Commit

Permalink
test(drain): add tests for futures drain utility
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Oct 26, 2018
1 parent 3af6aa8 commit bf188b2
Showing 1 changed file with 118 additions and 0 deletions.
118 changes: 118 additions & 0 deletions src/common/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,121 @@ where
}
}

#[cfg(test)]
mod tests {
use futures::{future, Async, Future, Poll};
use super::*;

struct TestMe {
draining: bool,
finished: bool,
poll_cnt: usize,
}

impl Future for TestMe {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_cnt += 1;
if self.finished {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

#[test]
fn watch() {
future::lazy(|| {
let (tx, rx) = channel();
let fut = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};

let mut watch = rx.watch(fut, |fut| {
fut.draining = true;
});

assert_eq!(watch.future.poll_cnt, 0);

// First poll should poll the inner future
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 1);

// Second poll should poll the inner future again
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 2);

let mut draining = tx.drain();
// Drain signaled, but needs another poll to be noticed.
assert!(!watch.future.draining);
assert_eq!(watch.future.poll_cnt, 2);

// Now, poll after drain has been signaled.
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 3);
assert!(watch.future.draining);

// Draining is not ready until watcher completes
assert!(draining.poll().unwrap().is_not_ready());

// Finishing up the watch future
watch.future.finished = true;
assert!(watch.poll().unwrap().is_ready());
assert_eq!(watch.future.poll_cnt, 4);
drop(watch);

assert!(draining.poll().unwrap().is_ready());

Ok::<_, ()>(())
}).wait().unwrap();
}

#[test]
fn watch_clones() {
future::lazy(|| {
let (tx, rx) = channel();

let fut1 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let fut2 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};

let watch1 = rx.clone().watch(fut1, |fut| {
fut.draining = true;
});
let watch2 = rx.watch(fut2, |fut| {
fut.draining = true;
});

let mut draining = tx.drain();

// Still 2 outstanding watchers
assert!(draining.poll().unwrap().is_not_ready());

// drop 1 for whatever reason
drop(watch1);

// Still not ready, 1 other watcher still pending
assert!(draining.poll().unwrap().is_not_ready());

drop(watch2);

// Now all watchers are gone, draining is complete
assert!(draining.poll().unwrap().is_ready());

Ok::<_, ()>(())
}).wait().unwrap();
}
}

0 comments on commit bf188b2

Please sign in to comment.