Skip to content

Commit

Permalink
Arrangement GATs (#438)
Browse files Browse the repository at this point in the history
* compiles

* Non-working commit

* Update dogs^3

* clean-up
  • Loading branch information
frankmcsherry authored Dec 4, 2023
1 parent 9c5dcba commit 5bde556
Show file tree
Hide file tree
Showing 33 changed files with 976 additions and 755 deletions.
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ pub fn count<G, Tr, R, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Val=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr: TraceReader<ValOwned=(), Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &(P,usize,usize), k: &mut Tr::Key| { *k = key_selector(&p.0); },
move |(p,c,i), r, &(), s| {
move |p: &(P,usize,usize), k: &mut Tr::KeyOwned| { *k = key_selector(&p.0); },
move |(p,c,i), r, _, s| {
let s = *s as usize;
if *c < s { ((p.clone(), *c, *i), r.clone()) }
else { ((p.clone(), s, index), r.clone()) }
Expand Down
29 changes: 13 additions & 16 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
//! of logical compaction, which should not be done in a way that prevents
//! the correct determination of the total order comparison.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::ops::Mul;


use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
Expand Down Expand Up @@ -70,8 +68,8 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -80,20 +78,19 @@ pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, &Tr::Val)->DOut+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -125,8 +122,8 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
pub fn half_join_internal_unsafe<G, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
stream: &Collection<G, (Tr::KeyOwned, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
Expand All @@ -136,19 +133,18 @@ pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
where
G: Scope,
G::Timestamp: Lattice,
K: Ord + Hashable + ExchangeData + std::borrow::Borrow<Tr::Key>,
Tr::KeyOwned: Ord + Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Eq,
Tr::Diff: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -158,7 +154,7 @@ where
let mut stash = HashMap::new();
let mut buffer = Vec::new();

let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let exchange = Exchange::new(move |update: &((Tr::KeyOwned, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());

// Stash for (time, diff) accumulation.
let mut output_buffer = Vec::new();
Expand Down Expand Up @@ -216,8 +212,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, key.borrow());
if cursor.get_key(&storage) == Some(key.borrow()) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
24 changes: 12 additions & 12 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ pub fn lookup_map<G, D, R, Tr, F, DOut, ROut, S>(
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: Tr::Key,
supplied_key1: Tr::Key,
supplied_key2: Tr::Key,
supplied_key0: Tr::KeyOwned,
supplied_key1: Tr::KeyOwned,
supplied_key2: Tr::KeyOwned,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Sized,
Tr::Val: Clone,
Tr::KeyOwned: Hashable,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
S: FnMut(&D, &R, &Tr::Val, &Tr::Diff)->(DOut, ROut)+'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -51,14 +50,14 @@ where

let mut buffer = Vec::new();

let mut key: Tr::Key = supplied_key0;
let mut key: Tr::KeyOwned = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});

let mut key1: Tr::Key = supplied_key1;
let mut key2: Tr::Key = supplied_key2;
let mut key1: Tr::KeyOwned = supplied_key1;
let mut key2: Tr::KeyOwned = supplied_key2;

prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -96,8 +95,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
cursor.seek_key(&storage, &key1);
if cursor.get_key(&storage) == Some(&key1) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
23 changes: 11 additions & 12 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -18,22 +19,21 @@ pub fn propose<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.clone()), diff.clone().multiply(sum)),
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -49,22 +49,21 @@ pub fn propose_distinct<G, Tr, F, P>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::Val), Tr::Diff>
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+Default,
Tr::Val: Clone,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::Key+Clone+'static,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::Key| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.clone()), diff.clone()),
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn validate<G, K, V, Tr, F, P>(
where
G: Scope,
G::Timestamp: Lattice,
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=(), Time=G::Timestamp>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand All @@ -32,7 +32,7 @@ where
extensions,
arrangement,
move |(pre,val),key| { *key = (key_selector(pre), val.clone()); },
|(pre,val),r,&(),_| ((pre.clone(), val.clone()), r.clone()),
|(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ fn main() {
fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::Key: Debug + Clone,
Tr::Val: Debug + Clone,
Tr::KeyOwned: Debug + Clone,
Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
{
Expand Down
29 changes: 16 additions & 13 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore};
use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
Expand All @@ -44,28 +44,31 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
// "rhh" => {
// use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
// let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],[u8],_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],u8,_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k,&(),&()| Option::<()>::None)
join_traces(&keys, &data, |k,v1,v2,t,r1,r2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
.probe_with(&mut probe);
},
_ => {
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
5 changes: 3 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::hash::Hash;
use timely::dataflow::*;

use ::{Collection, ExchangeData};
use ::operators::*;
use ::lattice::Lattice;
use ::difference::{Abelian, Multiply};
use ::operators::arrange::arrangement::ArrangeByKey;
Expand Down Expand Up @@ -64,7 +63,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand All @@ -90,6 +89,8 @@ where

use timely::order::Product;

use operators::join::JoinCore;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

Expand Down
6 changes: 4 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
type Key = Tr::Key;
type Val = Tr::Val;
type Key<'a> = Tr::Key<'a>;
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type ValOwned = Tr::ValOwned;
type Time = Tr::Time;
type Diff = Tr::Diff;

Expand Down
Loading

0 comments on commit 5bde556

Please sign in to comment.