Skip to content

Commit

Permalink
Update halfjoin to new idioms (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Nov 28, 2023
1 parent 5a23ad6 commit c3ca293
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
//! of logical compaction, which should not be done in a way that prevents
//! the correct determination of the total order comparison.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::ops::Mul;


use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
Expand Down Expand Up @@ -67,28 +70,30 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::Diff>,
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), Tr::Diff>
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + Borrow<Tr::Key>,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Diff: Monoid+ExchangeData,
Tr::Key: Eq,
Tr::Diff: Semigroup,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
Tr::Diff: std::ops::Mul<Tr::Diff, Output=Tr::Diff>,
S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static,
S: FnMut(&K, &V, &Tr::Val)->DOut+'static,
{
let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::Diff, diff2: &Tr::Diff| {
let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -120,8 +125,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::Diff>,
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -131,18 +136,19 @@ pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, Y, I, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + std::borrow::Borrow<Tr::Key>,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Diff: Monoid+ExchangeData,
Tr::Key: Eq,
Tr::Diff: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Monoid,
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::Diff, &Tr::Diff)-> I + 'static,
S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -152,7 +158,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::Diff)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -210,8 +216,8 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
cursor.seek_key(&storage, key.borrow());
if cursor.get_key(&storage) == Some(key.borrow()) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand All @@ -221,15 +227,15 @@ where
consolidate(&mut output_buffer);
work += output_buffer.len();
for (time, diff2) in output_buffer.drain(..) {
for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) {
for dout in output_func(&key, val1, val2, initial, &time, &diff1, &diff2) {
session.give(dout);
}
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = Tr::Diff::zero();
*diff1 = R::zero();
}
}

Expand Down

0 comments on commit c3ca293

Please sign in to comment.