Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,24 @@ pub trait Watcher: Clone {
}
}

fn filter<T>(mut self, filter: impl Fn(&T) -> bool + Send + Sync + 'static) -> Filter<Self, T>
where
T: Clone + Eq,
Self: Watcher<Value = T>,
{
let current = self.get();
let current = if filter(&current) {
Some(current)
} else {
None
};
Filter {
current,
filter: Arc::new(filter),
watcher: self,
}
}

/// Returns a watcher that updates every time this or the other watcher
/// updates, and yields both watcher's items together when that happens.
fn or<W: Watcher>(self, other: W) -> (Self, W) {
Expand Down Expand Up @@ -519,6 +537,57 @@ impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
}
}

/// Wraps a [`Watcher`] to allow observing a derived value.
///
/// See [`Watcher::map`].
#[derive(derive_more::Debug, Clone)]
pub struct Filter<W, T>
where
T: Clone + Eq,
W: Watcher<Value = T>,
{
#[debug("Arc<dyn Fn(&T) -> bool + 'static>")]
filter: Arc<dyn Fn(&T) -> bool + Send + Sync + 'static>,
watcher: W,
current: Option<T>,
}

impl<W, T> Watcher for Filter<W, T>
where
T: Clone + Eq,
W: Watcher<Value = T>,
{
type Value = Option<T>;

fn get(&mut self) -> Self::Value {
self.current.clone()
}

fn is_connected(&self) -> bool {
self.watcher.is_connected()
}

fn poll_updated(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::Value, Disconnected>> {
loop {
let value = ready!(self.watcher.poll_updated(cx)?);
let filtered = if (self.filter)(&value) {
Some(value)
} else {
None
};
if filtered != self.current {
self.current = filtered.clone();
return Poll::Ready(Ok(filtered));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaves a bit different than I would have expected. Filter returns an Option instead of just TheOriginalValue, and will trigger even if the value goes from None to Some and Some to None. I would have expected it to return a Watcher instead.

it is a bit weird for filter to change the type of the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to only have filter on watchables that contain Option<T> which is what I had first

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of our watchers have the value wrapped in an option, right?

I am not sure if this is a good API, but I definitely don't think having a fn filter that changes the stream type is intuitive. Maybe it just needs a different name?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you were to use this on our watchers in iroh of type Option, you would get an Option<Option>, and initialized would return immediately, possibly returning a None?

I think having this defined only on Optino is less confusing in comparison.

} else {
self.current = filtered;
}
}
}
}

/// Future returning the next item after the current one in a [`Watcher`].
///
/// See [`Watcher::updated`].
Expand Down Expand Up @@ -1005,4 +1074,50 @@ mod tests {
assert!(!a.has_watchers());
assert!(!b.has_watchers());
}

#[tokio::test]
async fn test_filter_basic() {
let a = Watchable::new(1u8);
let mut filtered = a.watch().filter(|x| *x > 2 && *x < 6);

assert_eq!(filtered.get(), None);

let handle = tokio::task::spawn(async move { filtered.stream().collect::<Vec<_>>().await });

for i in 2u8..10 {
a.set(i).unwrap();
tokio::task::yield_now().await;
}
drop(a);

let values = tokio::time::timeout(Duration::from_secs(5), handle)
.await
.unwrap()
.unwrap();

assert_eq!(values, vec![None, Some(3u8), Some(4), Some(5), None]);
}

#[tokio::test]
async fn test_filter_init() {
let a = Watchable::new(1u8);
let mut filtered = a.watch().filter(|x| *x > 2 && *x < 6);

assert_eq!(filtered.get(), None);

let handle = tokio::task::spawn(async move { filtered.initialized().await });

for i in 2u8..10 {
a.set(i).unwrap();
tokio::task::yield_now().await;
}
drop(a);

let value = tokio::time::timeout(Duration::from_secs(5), handle)
.await
.unwrap()
.unwrap();

assert_eq!(value, 3);
}
}
Loading