Skip to content

Commit

Permalink
Generalize Collection to containers
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 30, 2024
1 parent c8f8917 commit 1622950
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 108 deletions.
239 changes: 134 additions & 105 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

use std::hash::Hash;

use timely::Container;
use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::Scope;
use timely::dataflow::operators::*;
use timely::dataflow::StreamCore;

use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
Expand All @@ -37,24 +39,142 @@ use crate::hashable::Hashable;
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
/// defaults to) `isize`, representing changes to the occurrence count of each record.
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub struct Collection<G: Scope, D, R: Semigroup = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
/// The underlying timely dataflow stream.
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: timely::dataflow::StreamCore<G, C>,
/// Phantom data for unreferenced `D` and `R` types.
phantom: std::marker::PhantomData<(D, R)>,
}

impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, D: Data, R: Semigroup+'static, C> Collection<G, D, R, C> where G::Timestamp: Data {
/// Creates a new Collection from a timely dataflow stream.
///
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
Collection { inner: stream, phantom: std::marker::PhantomData }
}
}
impl<G: Scope, D: Data, R: Semigroup+'static, C: Container> Collection<G, D, R, C> where G::Timestamp: Data {
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Self) -> Self {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Self
where
I: IntoIterator<Item=Self>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
self.inner
.enter(child)
.as_collection()
}
/// Applies a supplied function to each batch of updates.
///
/// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
/// timely dataflow capability associated with the batch of updates. The observed batching depends
/// on how the system executes, and may vary run to run.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
/// scope.new_collection_from(1 .. 10).1
/// .map_in_place(|x| *x *= 2)
/// .filter(|x| x % 2 == 1)
/// .inspect_container(|event| println!("event: {:?}", event));
/// });
/// ```
pub fn inspect_container<F>(&self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static {
self.inner
.inspect_container(func)
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Self {
Self::new(self.inner.probe_with(handle))
}
/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection by applying the supplied function to each input element.
///
/// # Examples
Expand Down Expand Up @@ -146,63 +266,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.filter(move |(data, _, _)| logic(data))
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
Expand Down Expand Up @@ -337,17 +400,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.as_collection()
}

/// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
{
self.inner
.enter(child)
.as_collection()
}

/// Delays each difference by a supplied function.
///
/// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
Expand Down Expand Up @@ -418,25 +470,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.inspect_batch(move |time, data| func(time, data))
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}

/// Assert if the collection is ever non-empty.
///
Expand Down Expand Up @@ -465,11 +498,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

use timely::dataflow::scopes::ScopeParent;
Expand Down Expand Up @@ -589,14 +617,14 @@ impl<G: Scope, D: Data, R: Abelian+'static> Collection<G, D, R> where G::Timesta
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
pub trait AsCollection<G: Scope, D: Data, R: Semigroup, C> {
/// Converts the type to a differential dataflow collection.
fn as_collection(&self) -> Collection<G, D, R>;
fn as_collection(&self) -> Collection<G, D, R, C>;
}

impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
impl<G: Scope, D: Data, R: Semigroup+'static, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
fn as_collection(&self) -> Collection<G, D, R, C> {
Collection::<G,D,R,C>::new(self.clone())
}
}

Expand All @@ -621,12 +649,13 @@ impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
where
G: Scope,
D: Data,
R: Semigroup+'static,
I: IntoIterator<Item=Collection<G, D, R>>,
C: Container,
I: IntoIterator<Item=Collection<G, D, R, C>>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
Expand Down
6 changes: 3 additions & 3 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattic
/// be used whenever the variable has an empty input.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
let collection = Collection::<G,D,R>::new(updates);
Variable { collection, feedback, source: None, step }
}

/// Creates a new `Variable` from a supplied `source` stream.
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::new(updates).concat(&source);
let collection = Collection::<G,D,R>::new(updates).concat(&source);
Variable { collection, feedback, source: Some(source), step }
}

Expand Down Expand Up @@ -233,7 +233,7 @@ impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timest
/// Creates a new initially empty `SemigroupVariable`.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
let collection = Collection::<G,D,R>::new(updates);
SemigroupVariable { collection, feedback, step }
}

Expand Down

0 comments on commit 1622950

Please sign in to comment.