Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize arrangement methods #442

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Threshold, JoinCore};
use differential_dataflow::operators::Threshold;

type Node = usize;
type Edge = (Node, Node);
Expand Down
13 changes: 6 additions & 7 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces};
use differential_dataflow::operators::{arrange::Arrange};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
Expand Down Expand Up @@ -54,18 +54,17 @@ fn main() {
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

join_traces(&keys, &data, |k,v1,v2,t,r1,r2| {
keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ where

use timely::order::Product;

use operators::join::JoinCore;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

Expand Down
85 changes: 85 additions & 0 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,91 @@ where
}
}


use difference::Multiply;
// Direct join implementations.
impl<G: Scope, Tr> Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone + 'static,
Tr::Diff: Semigroup,
{
/// A direct implementation of the `JoinCore::join_core` method.
pub fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<Tr::Diff as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>,Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
Tr::Diff: Multiply<Tr2::Diff>,
<Tr::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(Tr::Key<'_>,Tr::Val<'_>,Tr2::Val<'_>)->I+'static
{
let result = move |k: Tr::Key<'_>, v1: Tr::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &Tr::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}
/// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
pub fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>, Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>,Tr2::Val<'_>,&G::Timestamp,&Tr::Diff,&Tr2::Diff)->I+'static,
{
use operators::join::join_traces;
join_traces(self, other, result)
}
}

// Direct reduce implementations.
use difference::Abelian;
impl<G: Scope, T1> Arranged<G, T1>
where
G::Timestamp: Lattice+Ord,
T1: TraceReader<Time=G::Timestamp>+Clone+'static,
T1::Diff: Semigroup,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
crate::consolidation::consolidate(change);
})
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
use operators::reduce::reduce_trace;
reduce_trace(self, name, logic)
}
}


impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G::Timestamp: Lattice+Ord,
Expand Down
41 changes: 0 additions & 41 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,47 +349,6 @@ where
{
self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}

}

impl<G, K, V, T1> JoinCore<G, K, V, T1::Diff> for Arranged<G,T1>
where
G: Scope,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static,
K: Ord+'static,
V: Ord+'static,
T1::Diff: Semigroup,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
T1::Diff: Multiply<Tr2::Diff>,
<T1::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(T1::Key<'_>,T1::Val<'_>,Tr2::Val<'_>)->I+'static
{
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K, &V,Tr2::Val<'_>,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static,
{
join_traces(self, other, result)
}
}

/// An equijoin of two traces, sharing a common key type.
Expand Down
30 changes: 6 additions & 24 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine};
use trace::TraceReader;

/// Extension trait for the `reduce` differential dataflow method.
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> : ReduceCore<G, K, V, R> where G::Timestamp: Lattice+Ord {
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
/// Applies a reduction function on records grouped by key.
///
/// Input data must be structured as `(key, val)` pairs.
Expand Down Expand Up @@ -327,28 +327,10 @@ where
}
}

impl<G: Scope, K, V, T1, R: Semigroup> ReduceCore<G, K, V, R> for Arranged<G, T1>
where
K: ToOwned + Ord + ?Sized,
K::Owned: Data,
V: ToOwned + Ord + ?Sized,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned = <K as ToOwned>::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static,
{
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
reduce_trace(self, name, logic)
}
}

fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
/// A key-wise reduction of values in an input trace.
///
/// This method exists to provide reduce functionality without opinions about qualifying trace types.
pub fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
G: Scope,
G::Timestamp: Lattice+Ord,
Expand All @@ -359,7 +341,7 @@ where
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;

Expand Down