From 008680f03b712d6013814db6a16d0818af76d692 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 6 Dec 2023 05:13:27 -0500 Subject: [PATCH] Reorganize arrangement methods (#442) * Make inherent join methods * Make inherent reduce methods --- examples/graspan.rs | 2 +- examples/spines.rs | 13 ++--- src/algorithms/graphs/propagate.rs | 2 - src/operators/arrange/arrangement.rs | 85 ++++++++++++++++++++++++++++ src/operators/join.rs | 41 -------------- src/operators/reduce.rs | 30 ++-------- 6 files changed, 98 insertions(+), 75 deletions(-) diff --git a/examples/graspan.rs b/examples/graspan.rs index f49c48897..c72a8c15a 100644 --- a/examples/graspan.rs +++ b/examples/graspan.rs @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::{Threshold, JoinCore}; +use differential_dataflow::operators::Threshold; type Node = usize; type Edge = (Node, Node); diff --git a/examples/spines.rs b/examples/spines.rs index d8993180c..57b127c2b 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -24,7 +24,7 @@ fn main() { let mut probe = Handle::new(); let (mut data_input, mut keys_input) = worker.dataflow(|scope| { - use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces}; + use differential_dataflow::operators::{arrange::Arrange}; let (data_input, data) = scope.new_collection::(); let (keys_input, keys) = scope.new_collection::(); @@ -54,18 +54,17 @@ fn main() { "slc" => { use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; - use differential_dataflow::operators::reduce::ReduceCore; let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::>(); - // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>() + .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::>(); - // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>() + .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); - join_traces(&keys, &data, |k,v1,v2,t,r1,r2| { + keys.join_core(&data, |k,_v1,_v2| { println!("{:?}", k.text); Option::<((),isize,isize)>::None }) diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 42f9b58b8..a4087e062 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -89,8 +89,6 @@ where use timely::order::Product; - use operators::join::JoinCore; - let edges = edges.enter(scope); let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize)); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 4d5c461bc..d152f7088 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -410,6 +410,91 @@ where } } + +use difference::Multiply; +// Direct join implementations. +impl Arranged +where + G::Timestamp: Lattice+Ord, + Tr: TraceReader + Clone + 'static, + Tr::Diff: Semigroup, +{ + /// A direct implementation of the `JoinCore::join_core` method. + pub fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> + where + Tr2: for<'a> TraceReader=Tr::Key<'a>,Time=G::Timestamp>+Clone+'static, + Tr2::Diff: Semigroup, + Tr::Diff: Multiply, + >::Output: Semigroup, + I: IntoIterator, + I::Item: Data, + L: FnMut(Tr::Key<'_>,Tr::Val<'_>,Tr2::Val<'_>)->I+'static + { + let result = move |k: Tr::Key<'_>, v1: Tr::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &Tr::Diff, r2: &Tr2::Diff| { + let t = t.clone(); + let r = (r1.clone()).multiply(r2); + result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) + }; + self.join_core_internal_unsafe(other, result) + } + /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method. + pub fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection + where + Tr2: for<'a> TraceReader=Tr::Key<'a>, Time=G::Timestamp>+Clone+'static, + Tr2::Diff: Semigroup, + D: Data, + ROut: Semigroup, + I: IntoIterator, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>,Tr2::Val<'_>,&G::Timestamp,&Tr::Diff,&Tr2::Diff)->I+'static, + { + use operators::join::join_traces; + join_traces(self, other, result) + } +} + +// Direct reduce implementations. +use difference::Abelian; +impl Arranged +where + G::Timestamp: Lattice+Ord, + T1: TraceReader+Clone+'static, + T1::Diff: Semigroup, +{ + /// A direct implementation of `ReduceCore::reduce_abelian`. + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + where + T2: for<'a> Trace= T1::Key<'a>, Time=G::Timestamp>+'static, + T2::ValOwned: Data, + T2::Diff: Abelian, + T2::Batch: Batch, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, + { + self.reduce_core::<_,T2>(name, move |key, input, output, change| { + if !input.is_empty() { + logic(key, input, change); + } + change.extend(output.drain(..).map(|(x,d)| (x, d.negate()))); + crate::consolidation::consolidate(change); + }) + } + + /// A direct implementation of `ReduceCore::reduce_core`. + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + where + T2: for<'a> Trace=T1::Key<'a>, Time=G::Timestamp>+'static, + T2::ValOwned: Data, + T2::Diff: Semigroup, + T2::Batch: Batch, + T2::Builder: Builder, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + { + use operators::reduce::reduce_trace; + reduce_trace(self, name, logic) + } +} + + impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, diff --git a/src/operators/join.rs b/src/operators/join.rs index c9a787277..073a49f57 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -349,47 +349,6 @@ where { self.arrange_by_key().join_core_internal_unsafe(stream2, result) } - -} - -impl JoinCore for Arranged - where - G: Scope, - G::Timestamp: Lattice+Ord, - T1: for<'a> TraceReader = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static, - K: Ord+'static, - V: Ord+'static, - T1::Diff: Semigroup, -{ - fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> - where - Tr2: for<'a> TraceReader=T1::Key<'a>,Time=G::Timestamp>+Clone+'static, - Tr2::Diff: Semigroup, - T1::Diff: Multiply, - >::Output: Semigroup, - I: IntoIterator, - I::Item: Data, - L: FnMut(T1::Key<'_>,T1::Val<'_>,Tr2::Val<'_>)->I+'static - { - let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { - let t = t.clone(); - let r = (r1.clone()).multiply(r2); - result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) - }; - self.join_core_internal_unsafe(other, result) - } - - fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection - where - Tr2: for<'a> TraceReader=T1::Key<'a>, Time=G::Timestamp>+Clone+'static, - Tr2::Diff: Semigroup, - D: Data, - ROut: Semigroup, - I: IntoIterator, - L: FnMut(&K, &V,Tr2::Val<'_>,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static, - { - join_traces(self, other, result) - } } /// An equijoin of two traces, sharing a common key type. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 4c2d609f6..b408357b2 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine}; use trace::TraceReader; /// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce : ReduceCore where G::Timestamp: Lattice+Ord { +pub trait Reduce where G::Timestamp: Lattice+Ord { /// Applies a reduction function on records grouped by key. /// /// Input data must be structured as `(key, val)` pairs. @@ -327,28 +327,10 @@ where } } -impl ReduceCore for Arranged -where - K: ToOwned + Ord + ?Sized, - K::Owned: Data, - V: ToOwned + Ord + ?Sized, - G::Timestamp: Lattice+Ord, - T1: for<'a> TraceReader=&'a K, KeyOwned = ::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, -{ - fn reduce_core(&self, name: &str, logic: L) -> Arranged> - where - T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - T2::ValOwned: Data, - T2::Diff: Semigroup, - T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, - { - reduce_trace(self, name, logic) - } -} - -fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +/// A key-wise reduction of values in an input trace. +/// +/// This method exists to provide reduce functionality without opinions about qualifying trace types. +pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, G::Timestamp: Lattice+Ord, @@ -359,7 +341,7 @@ where T2::Diff: Semigroup, T2::Batch: Batch, T2::Builder: Builder, - L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None;