Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize Collection to containers #506

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 134 additions & 105 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

use std::hash::Hash;

use timely::Container;
use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::Scope;
use timely::dataflow::operators::*;
use timely::dataflow::StreamCore;

use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
Expand All @@ -37,24 +39,142 @@ use crate::hashable::Hashable;
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
/// defaults to) `isize`, representing changes to the occurrence count of each record.
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub struct Collection<G: Scope, D, R: Semigroup = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
/// The underlying timely dataflow stream.
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: timely::dataflow::StreamCore<G, C>,
/// Phantom data for unreferenced `D` and `R` types.
phantom: std::marker::PhantomData<(D, R)>,
}

impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, D: Data, R: Semigroup+'static, C> Collection<G, D, R, C> where G::Timestamp: Data {
/// Creates a new Collection from a timely dataflow stream.
///
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
Collection { inner: stream, phantom: std::marker::PhantomData }
}
}
impl<G: Scope, D: Data, R: Semigroup+'static, C: Container> Collection<G, D, R, C> where G::Timestamp: Data {
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Self) -> Self {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Self
where
I: IntoIterator<Item=Self>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
self.inner
.enter(child)
.as_collection()
}
/// Applies a supplied function to each batch of updates.
///
/// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
/// timely dataflow capability associated with the batch of updates. The observed batching depends
/// on how the system executes, and may vary run to run.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
/// scope.new_collection_from(1 .. 10).1
/// .map_in_place(|x| *x *= 2)
/// .filter(|x| x % 2 == 1)
/// .inspect_container(|event| println!("event: {:?}", event));
/// });
/// ```
pub fn inspect_container<F>(&self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static {
self.inner
.inspect_container(func)
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Self {
Self::new(self.inner.probe_with(handle))
}
/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection by applying the supplied function to each input element.
///
/// # Examples
Expand Down Expand Up @@ -146,63 +266,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.filter(move |(data, _, _)| logic(data))
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// ```
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
Expand Down Expand Up @@ -337,17 +400,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.as_collection()
}

/// Brings a Collection into a nested region.
///
/// This method is a specialization of `enter` to the case where the nested scope is a region.
/// It removes the need for an operator that adjusts the timestamp.
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
{
self.inner
.enter(child)
.as_collection()
}

/// Delays each difference by a supplied function.
///
/// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
Expand Down Expand Up @@ -418,25 +470,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
.inspect_batch(move |time, data| func(time, data))
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}

/// Assert if the collection is ever non-empty.
///
Expand Down Expand Up @@ -465,11 +498,6 @@ impl<G: Scope, D: Data, R: Semigroup+'static> Collection<G, D, R> where G::Times
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

use timely::dataflow::scopes::ScopeParent;
Expand Down Expand Up @@ -589,14 +617,14 @@ impl<G: Scope, D: Data, R: Abelian+'static> Collection<G, D, R> where G::Timesta
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
pub trait AsCollection<G: Scope, D: Data, R: Semigroup, C> {
/// Converts the type to a differential dataflow collection.
fn as_collection(&self) -> Collection<G, D, R>;
fn as_collection(&self) -> Collection<G, D, R, C>;
}

impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
impl<G: Scope, D: Data, R: Semigroup+'static, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
fn as_collection(&self) -> Collection<G, D, R, C> {
Collection::<G,D,R,C>::new(self.clone())
}
}

Expand All @@ -621,12 +649,13 @@ impl<G: Scope, D: Data, R: Semigroup+'static> AsCollection<G, D, R> for Stream<G
/// .assert_eq(&data);
/// });
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
where
G: Scope,
D: Data,
R: Semigroup+'static,
I: IntoIterator<Item=Collection<G, D, R>>,
C: Container,
I: IntoIterator<Item=Collection<G, D, R, C>>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
Expand Down
6 changes: 3 additions & 3 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattic
/// be used whenever the variable has an empty input.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
let collection = Collection::<G,D,R>::new(updates);
Variable { collection, feedback, source: None, step }
}

/// Creates a new `Variable` from a supplied `source` stream.
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::new(updates).concat(&source);
let collection = Collection::<G,D,R>::new(updates).concat(&source);
Variable { collection, feedback, source: Some(source), step }
}

Expand Down Expand Up @@ -233,7 +233,7 @@ impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timest
/// Creates a new initially empty `SemigroupVariable`.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
let collection = Collection::<G,D,R>::new(updates);
SemigroupVariable { collection, feedback, step }
}

Expand Down