From 5a23ad64d37cb4e7a7a4ce335fdf67dc8d9f06c0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Nov 2023 22:53:12 -0500 Subject: [PATCH] Cursor repivoting (#435) * Make Storage an associated type, again * Reorient reduce::perkeycompute to depend on cursors * Pivot history replay to be based on cursors * Rename Cursor::R associated type to Diff * Encourage idiomatic cursor use * Update dogs3 --- dogsdogsdogs/src/operators/count.rs | 2 +- dogsdogsdogs/src/operators/half_join.rs | 20 +-- dogsdogsdogs/src/operators/lookup_map.rs | 6 +- dogsdogsdogs/src/operators/propose.rs | 12 +- dogsdogsdogs/src/operators/validate.rs | 6 +- examples/cursors.rs | 2 +- src/algorithms/graphs/bfs.rs | 2 +- src/algorithms/graphs/bijkstra.rs | 2 +- src/algorithms/graphs/propagate.rs | 2 +- src/operators/arrange/agent.rs | 2 +- src/operators/arrange/arrangement.rs | 28 ++-- src/operators/arrange/upsert.rs | 4 +- src/operators/consolidate.rs | 2 +- src/operators/count.rs | 12 +- src/operators/join.rs | 130 +++++++++------- src/operators/mod.rs | 106 +++++++------ src/operators/reduce.rs | 179 ++++++++++++---------- src/operators/threshold.rs | 11 +- src/trace/cursor/cursor_list.rs | 42 ++--- src/trace/cursor/cursor_pair.rs | 32 ++-- src/trace/cursor/mod.rs | 37 +++-- src/trace/implementations/ord.rs | 24 +-- src/trace/implementations/ord_neu.rs | 12 +- src/trace/implementations/rhh.rs | 12 +- src/trace/implementations/spine_fueled.rs | 14 +- src/trace/mod.rs | 30 ++-- src/trace/wrappers/enter.rs | 44 +++--- src/trace/wrappers/enter_at.rs | 44 +++--- src/trace/wrappers/filter.rs | 44 +++--- src/trace/wrappers/freeze.rs | 48 +++--- src/trace/wrappers/frontier.rs | 44 +++--- src/trace/wrappers/rc.rs | 2 +- 32 files changed, 512 insertions(+), 445 deletions(-) diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index d86fb5089..df26ddee7 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -21,7 +21,7 @@ pub fn count( where G: Scope, G::Timestamp: Lattice, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index a2920c163..b713d07d1 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -68,12 +68,12 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. pub fn half_join( - stream: &Collection, + stream: &Collection, arrangement: Arranged, frontier_func: FF, comparison: CF, mut output_func: S, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, @@ -81,14 +81,14 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::R: Monoid+ExchangeData, + Tr::Diff: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - Tr::R: std::ops::Mul, + Tr::Diff: std::ops::Mul, S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, { - let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::R, diff2: &Tr::R| { + let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::Diff, diff2: &Tr::Diff| { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); Some((dout, initial.clone(), diff)) @@ -121,7 +121,7 @@ where /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. pub fn half_join_internal_unsafe( - stream: &Collection, + stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -135,14 +135,14 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::R: Monoid+ExchangeData, + Tr::Diff: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, ROut: Monoid, Y: Fn(std::time::Instant, usize) -> bool + 'static, I: IntoIterator, - S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static, + S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::Diff, &Tr::Diff)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -152,7 +152,7 @@ where let mut stash = HashMap::new(); let mut buffer = Vec::new(); - let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::R)| (update.0).0.hashed().into()); + let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::Diff)| (update.0).0.hashed().into()); // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); @@ -229,7 +229,7 @@ where } cursor.rewind_vals(&storage); } - *diff1 = Tr::R::zero(); + *diff1 = Tr::Diff::zero(); } } diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index a35472479..9228d2d4d 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -32,13 +32,13 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Sized, Tr::Val: Clone, - Tr::R: Monoid+ExchangeData, + Tr::Diff: Monoid+ExchangeData, F: FnMut(&D, &mut Tr::Key)+Clone+'static, D: ExchangeData, R: ExchangeData+Monoid, DOut: Clone+'static, ROut: Monoid, - S: FnMut(&D, &R, &Tr::Val, &Tr::R)->(DOut, ROut)+'static, + S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -99,7 +99,7 @@ where cursor.seek_key(&storage, &key1); if cursor.get_key(&storage) == Some(&key1) { while let Some(value) = cursor.get_val(&storage) { - let mut count = Tr::R::zero(); + let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { if t.less_equal(time) { count.plus_equals(d); } }); diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 2a5e6747a..6d6ed95f3 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -15,17 +15,17 @@ use differential_dataflow::trace::TraceReader; /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. pub fn propose( - prefixes: &Collection, + prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::R: Monoid+Multiply+ExchangeData, + Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, { @@ -46,17 +46,17 @@ where /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. pub fn propose_distinct( - prefixes: &Collection, + prefixes: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::R: Monoid+Multiply+ExchangeData, + Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, { diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 41efa8df2..b41ea8687 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -14,17 +14,17 @@ use differential_dataflow::trace::TraceReader; /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. pub fn validate( - extensions: &Collection, + extensions: &Collection, arrangement: Arranged, key_selector: F, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, - Tr::R: Monoid+Multiply+ExchangeData, + Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, P: ExchangeData, { diff --git a/examples/cursors.rs b/examples/cursors.rs index cd916530c..068a69a42 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -138,7 +138,7 @@ where Tr::Key: Debug + Clone, Tr::Val: Debug + Clone, Tr::Time: Debug + Clone, - Tr::R: Debug + Clone, + Tr::Diff: Debug + Clone, { let (mut cursor, storage) = trace.cursor(); for ((k, v), diffs) in cursor.to_vec(&storage).iter() { diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 6774f535f..8f42cb47d 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -29,7 +29,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index eb803f160..f8a4662be 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -45,7 +45,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, { forward .stream diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index b0404c8ee..32947d4c7 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -64,7 +64,7 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index c477adaa3..03ea5af1b 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -50,7 +50,7 @@ where type Key = Tr::Key; type Val = Tr::Val; type Time = Tr::Time; - type R = Tr::R; + type Diff = Tr::Diff; type Batch = Tr::Batch; type Storage = Tr::Storage; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 7373dc188..42bf28f87 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -91,7 +91,7 @@ where where Tr::Key: 'static, Tr::Val: 'static, - Tr::R: 'static, + Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, { @@ -110,7 +110,7 @@ where where Tr::Key: 'static, Tr::Val: 'static, - Tr::R: 'static, + Tr::Diff: 'static, G::Timestamp: Clone+'static, { Arranged { @@ -129,7 +129,7 @@ where where Tr::Key: 'static, Tr::Val: 'static, - Tr::R: 'static, + Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static, @@ -179,7 +179,7 @@ where where Tr::Key: 'static, Tr::Val: 'static, - Tr::R: 'static, + Tr::Diff: 'static, G::Timestamp: Clone+'static, F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static, { @@ -195,9 +195,9 @@ where /// The underlying `Stream>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(&self, mut logic: L) -> Collection + pub fn as_collection(&self, mut logic: L) -> Collection where - Tr::R: Semigroup, + Tr::Diff: Semigroup, L: FnMut(&Tr::Key, &Tr::Val) -> D+'static, { self.flat_map_ref(move |key, val| Some(logic(key,val))) @@ -207,9 +207,9 @@ where /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. - pub fn flat_map_ref(&self, logic: L) -> Collection + pub fn flat_map_ref(&self, logic: L) -> Collection where - Tr::R: Semigroup, + Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, @@ -224,9 +224,9 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection + pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection where - Tr::R: Semigroup, + Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, @@ -258,12 +258,12 @@ where /// /// This method consumes a stream of (key, time) queries and reports the corresponding stream of /// (key, value, time, diff) accumulations in the `self` trace. - pub fn lookup(&self, queries: &Stream) -> Stream + pub fn lookup(&self, queries: &Stream) -> Stream where G::Timestamp: Data+Lattice+Ord+TotalOrder, Tr::Key: ExchangeData+Hashable, Tr::Val: ExchangeData, - Tr::R: ExchangeData+Semigroup, + Tr::Diff: ExchangeData+Semigroup, Tr: 'static, { // while the arrangement is already correctly distributed, the query stream may not be. @@ -280,8 +280,8 @@ where let mut active = Vec::new(); let mut retain = Vec::new(); - let mut working: Vec<(G::Timestamp, Tr::Val, Tr::R)> = Vec::new(); - let mut working2: Vec<(Tr::Val, Tr::R)> = Vec::new(); + let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new(); + let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new(); move |input1, input2, output| { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index a374dbc10..b5b4618bc 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -144,9 +144,9 @@ where G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData, Tr::Key: ExchangeData+Hashable+std::hash::Hash, Tr::Val: ExchangeData, - Tr: Trace+TraceReader+'static, + Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 73e29a80f..7345a3ef9 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -56,7 +56,7 @@ where /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Tr: crate::trace::Trace+crate::trace::TraceReader+'static, + Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, diff --git a/src/operators/count.rs b/src/operators/count.rs index 0fbb9bb15..5d8a6dbeb 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl CountTotal for Arranged +impl CountTotal for Arranged where G::Timestamp: TotalOrder+Lattice+Ord, T1: TraceReader+Clone+'static, T1::Key: ExchangeData, - T1::R: ExchangeData+Semigroup, + T1::Diff: ExchangeData+Semigroup, { - fn count_total_core>(&self) -> Collection { + fn count_total_core>(&self) -> Collection { let mut trace = self.trace.clone(); let mut buffer = Vec::new(); @@ -82,10 +82,8 @@ where let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap(); upper_limit.clone_from(batch.upper()); - while batch_cursor.key_valid(&batch) { - - let key = batch_cursor.key(&batch); - let mut count: Option = None; + while let Some(key) = batch_cursor.get_key(&batch) { + let mut count: Option = None; trace_cursor.seek_key(&trace_storage, key); if trace_cursor.get_key(&trace_storage) == Some(key) { diff --git a/src/operators/join.rs b/src/operators/join.rs index 48e508ab6..ae397a5fa 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -181,29 +181,29 @@ where } } -impl Join for Arranged +impl Join for Arranged where G: Scope, G::Timestamp: Lattice+Ord, Tr: TraceReader+Clone+'static, Tr::Key: Data+Hashable, Tr::Val: Data, - Tr::R: Semigroup, + Tr::Diff: Semigroup, { - fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::R: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { + fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { let arranged2 = other.arrange_by_key(); self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } - fn semijoin(&self, other: &Collection) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::R: Multiply, >::Output: Semigroup { + fn semijoin(&self, other: &Collection) -> Collection>::Output> + where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup { let arranged2 = other.arrange_by_self(); self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } - fn antijoin(&self, other: &Collection) -> Collection - where Tr::Key: ExchangeData, Tr::R: Multiply, Tr::R: Abelian { + fn antijoin(&self, other: &Collection) -> Collection + where Tr::Key: ExchangeData, Tr::Diff: Multiply, Tr::Diff: Abelian { self.as_collection(|k,v| (k.clone(), v.clone())) .concat(&self.semijoin(other).negate()) } @@ -251,13 +251,13 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection>::Output> + fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+'static, - Tr2::R: Semigroup, - R: Multiply, - >::Output: Semigroup, + Tr2::Diff: Semigroup, + R: Multiply, + >::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&K,&V,&Tr2::Val)->I+'static, @@ -305,11 +305,11 @@ pub trait JoinCore+Clone+'static, Tr2::Val: Ord+'static, - Tr2::R: Semigroup, + Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, ; } @@ -322,13 +322,13 @@ where R: ExchangeData+Semigroup, G::Timestamp: Lattice+Ord, { - fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> + fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+'static, - Tr2::R: Semigroup, - R: Multiply, - >::Output: Semigroup, + Tr2::Diff: Semigroup, + R: Multiply, + >::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&K,&V,&Tr2::Val)->I+'static, @@ -341,39 +341,39 @@ where where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+'static, - Tr2::R: Semigroup, + Tr2::Diff: Semigroup, R: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, { self.arrange_by_key().join_core_internal_unsafe(stream2, result) } } -impl JoinCore for Arranged +impl JoinCore for Arranged where G: Scope, G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, T1::Key: Ord+'static, T1::Val: Ord+'static, - T1::R: Semigroup, + T1::Diff: Semigroup, { - fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> + fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where Tr2::Val: Ord+'static, Tr2: TraceReader+Clone+'static, - Tr2::R: Semigroup, - T1::R: Multiply, - >::Output: Semigroup, + Tr2::Diff: Semigroup, + T1::Diff: Multiply, + >::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static { - let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::R, r2: &Tr2::R| { + let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { let t = t.clone(); let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) @@ -385,11 +385,11 @@ impl JoinCore for Arranged where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+'static, - Tr2::R: Semigroup, + Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static, + L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static, { join_traces(self, other, result) } @@ -410,14 +410,14 @@ where T1: TraceReader+Clone+'static, T1::Key: Ord, T1::Val: Ord, - T1::R: Semigroup, + T1::Diff: Semigroup, T2: TraceReader+Clone+'static, T2::Val: Ord, - T2::R: Semigroup, + T2::Diff: Semigroup, D: Data, R: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&T2::Val,&G::Timestamp,&T1::R,&T2::R)->I+'static, + L: FnMut(&T1::Key,&T1::Val,&T2::Val,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -660,41 +660,41 @@ where /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where T: Timestamp+Lattice+Ord, R: Semigroup, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord, C2::Val: Ord, - C1::R: Semigroup, - C2::R: Semigroup, + C1::Diff: Semigroup, + C2::Diff: Semigroup, D: Ord+Clone+Data, { trace: C1, - trace_storage: S1, + trace_storage: C1::Storage, batch: C2, - batch_storage: S2, + batch_storage: C2::Storage, capability: Capability, done: bool, temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where C1::Key: Ord+Eq, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, C1::Val: Ord, C2::Val: Ord, - C1::R: Semigroup, - C2::R: Semigroup, + C1::Diff: Semigroup, + C2::Diff: Semigroup, T: Timestamp+Lattice+Ord, R: Semigroup, D: Clone+Data, { - fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability) -> Self { + fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { Deferred { trace, trace_storage, @@ -713,7 +713,7 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { + where I: IntoIterator, L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::Diff, &C2::Diff)->I { let meet = self.capability.time(); @@ -777,12 +777,30 @@ where } } -struct JoinThinker<'a, V1: Ord+'a + ?Sized, V2: Ord+'a + ?Sized, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> { - pub history1: ValueHistory<'a, V1, T, R1>, - pub history2: ValueHistory<'a, V2, T, R2>, +struct JoinThinker<'a, C1, C2> +where + C1: Cursor, + C2: Cursor