Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement half-join operator #320

Merged
merged 2 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions dogsdogsdogs/examples/delta_query2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;

extern crate dogsdogsdogs;

use timely::dataflow::Scope;
use timely::order::Product;
use timely::dataflow::operators::probe::Handle;
use timely::dataflow::operators::UnorderedInput;
use timely::dataflow::operators::Map;
use differential_dataflow::AsCollection;

fn main() {

timely::execute_from_args(std::env::args().skip(2), move |worker| {

let mut probe = Handle::new();

let (mut i1, mut i2, c1, c2) = worker.dataflow::<usize,_,_>(|scope| {

// Nested scope as `Product<usize, usize>` doesn't refine `()`, because .. coherence.
scope.scoped("InnerScope", |inner| {

use timely::dataflow::operators::unordered_input::UnorderedHandle;

let ((input1, capability1), data1): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();
let ((input2, capability2), data2): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();

let edges1 = data1.as_collection();
let edges2 = data2.as_collection();

// Graph oriented both ways, indexed by key.
use differential_dataflow::operators::arrange::ArrangeByKey;
let forward1 = edges1.arrange_by_key();
let forward2 = edges2.arrange_by_key();

// Grab the stream of changes. Stash the initial time as payload.
let changes1 = edges1.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection();
let changes2 = edges2.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection();

use dogsdogsdogs::operators::half_join;

// pick a frontier that will not mislead TOTAL ORDER comparisons.
let closure = |time: &Product<usize, usize>| Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1));

let path1 =
half_join(
&changes1,
forward2,
closure.clone(),
|t1,t2| t1.lt(t2), // This one ignores concurrent updates.
|key, val1, val2| (key.clone(), (val1.clone(), val2.clone())),
);

let path2 =
half_join(
&changes2,
forward1,
closure.clone(),
|t1,t2| t1.le(t2), // This one can "see" concurrent updates.
|key, val1, val2| (key.clone(), (val2.clone(), val1.clone())),
);

// Delay updates until the worked payload time.
// This should be at least the ignored update time.
path1.concat(&path2)
.inner.map(|(((k,v),t),_,r)| ((k,v),t,r)).as_collection()
.inspect(|x| println!("{:?}", x))
.probe_with(&mut probe);

(input1, input2, capability1, capability2)
})
});

i1
.session(c1.clone())
.give(((5, 6), Product::new(0, 13), 1));

i2
.session(c2.clone())
.give(((5, 7), Product::new(11, 0), 1));

}).unwrap();
}
182 changes: 182 additions & 0 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//! Dataflow operator for delta joins over partially ordered timestamps.
//!
//! Given multiple streams of updates `(data, time, diff)` that are each
//! defined over the same partially ordered `time`, we want to form the
//! full cross-join of all relations (we will *later* apply some filters
//! and instead equijoin on keys).
//!
//! The "correct" output is the outer join of these triples, where
//! 1. The `data` entries are just tuple'd up together,
//! 2. The `time` entries are subjected to the lattice `join` operator,
//! 3. The `diff` entries are multiplied.
//!
//! One way to produce the correct output is to form independent dataflow
//! fragments for each input stream, such that each intended output is then
//! produced by exactly one of these input streams.
//!
//! There are several incorrect ways one might do this, but here is one way
//! that I hope is not incorrect:
//!
//! Each input stream of updates is joined with each other input collection,
//! where each input update is matched against each other input update that
//! has a `time` that is less-than the input update's `time`, *UNDER A TOTAL
//! ORDER ON `time`*. The output are the `(data, time, diff)` entries that
//! follow the rules above, except that we additionally preserve the input's
//! initial `time` as well, for use in subsequent joins with the other input
//! collections.
//!
//! There are some caveats about ties, and we should treat each `time` for
//! each input as occuring at distinct times, one after the other (so that
//! ties are resolved by the index of the input). There is also the matter
//! of logical compaction, which should not be done in a way that prevents
//! the correct determination of the total order comparison.

use std::collections::HashMap;

use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
use timely::progress::Antichain;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};

/// A binary equijoin that responds to updates on only its first input.
///
/// This operator responds to inputs of the form
///
/// ```ignore
/// ((key, val1, time1), initial_time, diff1)
/// ```
///
/// where `initial_time` is less or equal to `time`, and produces as output
///
/// ```ignore
/// ((key, (val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
/// ```
///
/// for each `((key, val2), time2, diff2)` present in `arrangement, where
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
/// This last constraint is important to ensure that we correctly produce
/// all pairs of output updates across multiple `half_join` operators.
///
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the dataflow, the updates will be `delay`d to the times
/// specified in the payloads.
pub fn half_join<G, V, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), Tr::R>
where
G: Scope,
G::Timestamp: Lattice,
V: ExchangeData,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
Tr::R: std::ops::Mul<Tr::R, Output=Tr::R>,
S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
let mut arrangement_trace = Some(arrangement.trace);
let arrangement_stream = arrangement.stream;

let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();

stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,_| move |input1, input2, output| {

// drain the first input, stashing requests.
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
});

// Drain input batches; although we do not observe them, we want access to the input
// to observe the frontier and to drive scheduling.
input2.for_each(|_, _| { });

if let Some(ref mut trace) = arrangement_trace {

for (capability, proposals) in stash.iter_mut() {

// defer requests at incomplete times.
// TODO: Verify this is correct for TOTAL ORDER.
if !input2.frontier.less_equal(capability.time()) {

let mut session = output.session(capability);

// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);

let (mut cursor, storage) = trace.cursor();

for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
}
});
consolidate(&mut output_buffer);
for (time, count) in output_buffer.drain(..) {
let dout = output_func(key, val1, val2);
session.give(((dout, time), initial.clone(), count * diff.clone()));
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff = Tr::R::zero();
}
}

proposals.retain(|ptd| !ptd.2.is_zero());
}
}
}

// drop fully processed capabilities.
stash.retain(|_,proposals| !proposals.is_empty());

// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
for time in input1.frontier().frontier().iter() {
frontier.insert(frontier_func(time));
}
for key in stash.keys() {
frontier.insert(frontier_func(key.time()));
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));

if input1.frontier().is_empty() && stash.is_empty() {
arrangement_trace = None;
}

}).as_collection()
}
2 changes: 2 additions & 0 deletions dogsdogsdogs/src/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod half_join;
pub mod lookup_map;

pub mod count;
pub mod propose;
pub mod validate;

pub use self::half_join::half_join;
pub use self::lookup_map::lookup_map;
pub use self::count::count;
pub use self::propose::{propose, propose_distinct};
Expand Down