diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 7034a9df0..f1f34690c 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -1,12 +1,15 @@ //! Filters a stream by a predicate. +use timely_container::columnation::{Columnation, TimelyStack}; use crate::Data; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Stream, Scope, StreamCore}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. -pub trait Filter { +pub trait Filter { + /// The data type we operate on. + type Data<'a>; /// Returns a new instance of `self` containing only records satisfying `predicate`. /// /// # Examples @@ -19,10 +22,13 @@ pub trait Filter { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn filterbool+'static>(&self, predicate: P) -> Self; + fn filter(&self, predicate: P) -> Self + where + for<'a> P: FnMut(Self::Data<'a>)->bool; } -impl Filter for Stream { +impl Filter for Stream { + type Data<'a> = &'a D; fn filterbool+'static>(&self, mut predicate: P) -> Stream { let mut vector = Vec::new(); self.unary(Pipeline, "Filter", move |_,_| move |input, output| { @@ -36,3 +42,22 @@ impl Filter for Stream { }) } } + +impl Filter for StreamCore> { + type Data<'a> = &'a D; + fn filterbool+'static>(&self, mut predicate: P) -> StreamCore> { + let mut vector = Default::default(); + let mut filtered = TimelyStack::default(); + self.unary(Pipeline, "Filter", move |_,_| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut vector); + for item in vector.iter().filter(|x| predicate(x)) { + filtered.copy(item); + } + if !filtered.is_empty() { + output.session(&time).give_container(&mut filtered); + } + }); + }) + } +}