From 2cc46fe10747f12a80080cec19aa014e83f06383 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 30 May 2024 11:39:39 -0400 Subject: [PATCH] Generalize Collection to containers --- src/collection.rs | 239 ++++++++++++++++++++++----------------- src/operators/iterate.rs | 6 +- 2 files changed, 137 insertions(+), 108 deletions(-) diff --git a/src/collection.rs b/src/collection.rs index ddf3634cb..ace27c0f2 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -10,12 +10,14 @@ use std::hash::Hash; +use timely::Container; use timely::Data; use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::scopes::{Child, child::Iterative}; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::Scope; use timely::dataflow::operators::*; +use timely::dataflow::StreamCore; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; @@ -37,24 +39,142 @@ use crate::hashable::Hashable; /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and /// defaults to) `isize`, representing changes to the occurrence count of each record. #[derive(Clone)] -pub struct Collection { +pub struct Collection::Timestamp, R)>> { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is /// not intended to be the idiomatic way to work with the collection. - pub inner: Stream + pub inner: timely::dataflow::StreamCore, + /// Phantom data for unreferenced `D` and `R` types. + phantom: std::marker::PhantomData<(D, R)>, } -impl Collection where G::Timestamp: Data { +impl Collection where G::Timestamp: Data { /// Creates a new Collection from a timely dataflow stream. /// /// This method seems to be rarely used, with the `as_collection` method on streams being a more /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait /// provides a `new_collection` method which will create a new collection for you without exposing /// the underlying timely stream at all. - pub fn new(stream: Stream) -> Collection { - Collection { inner: stream } + pub fn new(stream: StreamCore) -> Collection { + Collection { inner: stream, phantom: std::marker::PhantomData } + } +} +impl Collection where G::Timestamp: Data { + /// Creates a new collection accumulating the contents of the two collections. + /// + /// Despite the name, differential dataflow collections are unordered. This method is so named because the + /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the + /// two collections. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concat(&evens) + /// .assert_eq(&data); + /// }); + /// ``` + pub fn concat(&self, other: &Self) -> Self { + self.inner + .concat(&other.inner) + .as_collection() + } + /// Creates a new collection accumulating the contents of the two collections. + /// + /// Despite the name, differential dataflow collections are unordered. This method is so named because the + /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the + /// two collections. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concatenate(Some(evens)) + /// .assert_eq(&data); + /// }); + /// ``` + pub fn concatenate(&self, sources: I) -> Self + where + I: IntoIterator + { + self.inner + .concatenate(sources.into_iter().map(|x| x.inner)) + .as_collection() + } + // Brings a Collection into a nested region. + /// + /// This method is a specialization of `enter` to the case where the nested scope is a region. + /// It removes the need for an operator that adjusts the timestamp. + pub fn enter_region<'a>(&self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, D, R, C> { + self.inner + .enter(child) + .as_collection() + } + /// Applies a supplied function to each batch of updates. + /// + /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the + /// timely dataflow capability associated with the batch of updates. The observed batching depends + /// on how the system executes, and may vary run to run. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// scope.new_collection_from(1 .. 10).1 + /// .map_in_place(|x| *x *= 2) + /// .filter(|x| x % 2 == 1) + /// .inspect_container(|event| println!("event: {:?}", event)); + /// }); + /// ``` + pub fn inspect_container(&self, func: F) -> Self + where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { + self.inner + .inspect_container(func) + .as_collection() + } + /// Attaches a timely dataflow probe to the output of a Collection. + /// + /// This probe is used to determine when the state of the Collection has stabilized and can + /// be read out. + pub fn probe(&self) -> probe::Handle { + self.inner + .probe() + } + /// Attaches a timely dataflow probe to the output of a Collection. + /// + /// This probe is used to determine when the state of the Collection has stabilized and all updates observed. + /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a + /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to + /// avoid swamping the system. + pub fn probe_with(&self, handle: &mut probe::Handle) -> Self { + Self::new(self.inner.probe_with(handle)) + } + /// The scope containing the underlying timely dataflow stream. + pub fn scope(&self) -> G { + self.inner.scope() } +} + +impl Collection where G::Timestamp: Data { /// Creates a new collection by applying the supplied function to each input element. /// /// # Examples @@ -146,63 +266,6 @@ impl Collection where G::Times .filter(move |(data, _, _)| logic(data)) .as_collection() } - /// Creates a new collection accumulating the contents of the two collections. - /// - /// Despite the name, differential dataflow collections are unordered. This method is so named because the - /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the - /// two collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concat(&evens) - /// .assert_eq(&data); - /// }); - /// ``` - pub fn concat(&self, other: &Collection) -> Collection { - self.inner - .concat(&other.inner) - .as_collection() - } - /// Creates a new collection accumulating the contents of the two collections. - /// - /// Despite the name, differential dataflow collections are unordered. This method is so named because the - /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the - /// two collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concatenate(Some(evens)) - /// .assert_eq(&data); - /// }); - /// ``` - pub fn concatenate(&self, sources: I) -> Collection - where - I: IntoIterator> - { - self.inner - .concatenate(sources.into_iter().map(|x| x.inner)) - .as_collection() - } /// Replaces each record with another, with a new difference type. /// /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) @@ -337,17 +400,6 @@ impl Collection where G::Times .as_collection() } - /// Brings a Collection into a nested region. - /// - /// This method is a specialization of `enter` to the case where the nested scope is a region. - /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(&self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, D, R> - { - self.inner - .enter(child) - .as_collection() - } - /// Delays each difference by a supplied function. /// /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly @@ -418,25 +470,6 @@ impl Collection where G::Times .inspect_batch(move |time, data| func(time, data)) .as_collection() } - /// Attaches a timely dataflow probe to the output of a Collection. - /// - /// This probe is used to determine when the state of the Collection has stabilized and can - /// be read out. - pub fn probe(&self) -> probe::Handle { - self.inner - .probe() - } - /// Attaches a timely dataflow probe to the output of a Collection. - /// - /// This probe is used to determine when the state of the Collection has stabilized and all updates observed. - /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a - /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to - /// avoid swamping the system. - pub fn probe_with(&self, handle: &mut probe::Handle) -> Collection { - self.inner - .probe_with(handle) - .as_collection() - } /// Assert if the collection is ever non-empty. /// @@ -465,11 +498,6 @@ impl Collection where G::Times self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); } - - /// The scope containing the underlying timely dataflow stream. - pub fn scope(&self) -> G { - self.inner.scope() - } } use timely::dataflow::scopes::ScopeParent; @@ -589,14 +617,14 @@ impl Collection where G::Timesta } /// Conversion to a differential dataflow Collection. -pub trait AsCollection { +pub trait AsCollection { /// Converts the type to a differential dataflow collection. - fn as_collection(&self) -> Collection; + fn as_collection(&self) -> Collection; } -impl AsCollection for Stream { - fn as_collection(&self) -> Collection { - Collection::new(self.clone()) +impl AsCollection for StreamCore { + fn as_collection(&self) -> Collection { + Collection::::new(self.clone()) } } @@ -621,12 +649,13 @@ impl AsCollection for Stream(scope: &mut G, iterator: I) -> Collection +pub fn concatenate(scope: &mut G, iterator: I) -> Collection where G: Scope, D: Data, R: Semigroup+'static, - I: IntoIterator>, + C: Container, + I: IntoIterator>, { scope .concatenate(iterator.into_iter().map(|x| x.inner)) diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index 32c6d1cda..6e7f70e98 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -166,14 +166,14 @@ impl Variable where G::Timestamp: Lattic /// be used whenever the variable has an empty input. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::new(updates); + let collection = Collection::::new(updates); Variable { collection, feedback, source: None, step } } /// Creates a new `Variable` from a supplied `source` stream. pub fn new_from(source: Collection, step: ::Summary) -> Self { let (feedback, updates) = source.inner.scope().feedback(step.clone()); - let collection = Collection::new(updates).concat(&source); + let collection = Collection::::new(updates).concat(&source); Variable { collection, feedback, source: Some(source), step } } @@ -233,7 +233,7 @@ impl SemigroupVariable where G::Timest /// Creates a new initially empty `SemigroupVariable`. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::new(updates); + let collection = Collection::::new(updates); SemigroupVariable { collection, feedback, step } }