Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GAT time and diff #487

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn count<G, Tr, R, F, P>(
) -> Collection<G, (P, usize, usize), R>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr: TraceReader<DiffOwned=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
12 changes: 6 additions & 6 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ pub fn half_join<G, V, R, Tr, FF, CF, DOut, S>(
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
) -> Collection<G, (DOut, G::Timestamp), <R as Mul<Tr::DiffOwned>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
Tr::KeyOwned: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
R: Mul<Tr::DiffOwned>,
<R as Mul<Tr::DiffOwned>>::Output: Semigroup,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let output_func = move |k: &Tr::KeyOwned, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::DiffOwned| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
Expand Down Expand Up @@ -140,7 +140,7 @@ where
ROut: Semigroup,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::DiffOwned)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand Down Expand Up @@ -214,7 +214,7 @@ where
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
output_buffer.push((t.join(time), d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
13 changes: 10 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable,
Tr::Diff: Monoid+ExchangeData,
Tr::DiffOwned: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::DiffOwned)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand Down Expand Up @@ -72,6 +72,7 @@ where
input2.for_each(|_, _| { });

if let Some(ref mut trace) = propose_trace {
let mut owned_diff = None;

for (capability, prefixes) in stash.iter_mut() {

Expand All @@ -97,8 +98,14 @@ where
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
let mut count = Tr::DiffOwned::zero();
cursor.map_times(&storage, |t, d| {
let d = if let Some(owned_diff) = &mut owned_diff {
d.clone_onto(owned_diff);
&*owned_diff
} else {
owned_diff.insert(d.into_owned())
};
if t.less_equal(time) { count.plus_equals(d); }
});
if !count.is_zero() {
Expand Down
12 changes: 6 additions & 6 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ use differential_dataflow::trace::TraceReader;
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
prefixes: &Collection<G, P, Tr::DiffOwned>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
) -> Collection<G, (P, V), Tr::DiffOwned>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Tr::DiffOwned: Monoid+Multiply<Output = Tr::DiffOwned>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
Expand All @@ -46,16 +46,16 @@ where
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
prefixes: &Collection<G, P, Tr::DiffOwned>,
arrangement: Arranged<G, Tr>,
key_selector: F,
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
) -> Collection<G, (P, V), Tr::DiffOwned>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Tr::DiffOwned: Monoid+Multiply<Output = Tr::DiffOwned>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ use differential_dataflow::trace::TraceReader;
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn validate<G, K, V, Tr, F, P>(
extensions: &Collection<G, (P, V), Tr::Diff>,
extensions: &Collection<G, (P, V), Tr::DiffOwned>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, V), Tr::Diff>
) -> Collection<G, (P, V), Tr::DiffOwned>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V)>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Tr::DiffOwned: Monoid+Multiply<Output = Tr::DiffOwned>+ExchangeData,
F: Fn(&P)->K+Clone+'static,
P: ExchangeData,
{
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub fn bfs_arranged<G, N, Tr>(edges: &Arranged<G, Tr>, roots: &Collection<G, N>)
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, DiffOwned=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn bidijkstra_arranged<G, N, Tr>(
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, DiffOwned=isize>+Clone+'static,
{
forward
.stream
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, DiffOwned=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
3 changes: 2 additions & 1 deletion src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ where
type KeyOwned = Tr::KeyOwned;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type Diff = Tr::Diff;
type Diff<'a> = Tr::Diff<'a>;
type DiffOwned = Tr::DiffOwned;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
Expand Down
31 changes: 16 additions & 15 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
use crate::trace::cursor::MyTrait;
use crate::trace::implementations::{KeySpine, ValSpine};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
Expand Down Expand Up @@ -172,7 +173,7 @@ where
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::DiffOwned>
where
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
{
Expand All @@ -183,7 +184,7 @@ where
///
/// The supplied logic may produce an iterator over output values, allowing either
/// filtering or flat mapping as part of the extraction.
pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::DiffOwned>
where
I: IntoIterator,
I::Item: Data,
Expand All @@ -199,7 +200,7 @@ where
///
/// This method exists for streams of batches without the corresponding arrangement.
/// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::DiffOwned>
where
I: IntoIterator,
I::Item: Data,
Expand All @@ -215,7 +216,7 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.clone(), diff.clone()));
session.give((datum.clone(), time.clone(), diff.into_owned()))
});
}
cursor.step_val(batch);
Expand All @@ -238,16 +239,16 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of the `JoinCore::join_core` method.
pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::DiffOwned as Multiply<T2::DiffOwned>>::Output>
where
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
T1::Diff: Multiply<T2::Diff>,
<T1::Diff as Multiply<T2::Diff>>::Output: Semigroup,
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
T1::DiffOwned: Multiply<T2::DiffOwned>,
<T1::DiffOwned as Multiply<T2::DiffOwned>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
{
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::DiffOwned, r2: &T2::DiffOwned| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
Expand All @@ -261,7 +262,7 @@ where
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::DiffOwned,&T2::DiffOwned)->I+'static,
{
use crate::operators::join::join_traces;
join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
Expand Down Expand Up @@ -290,10 +291,10 @@ where
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=T1::Time>+'static,
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::DiffOwned: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::DiffOwned)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::DiffOwned)], &mut Vec<(V, T2::DiffOwned)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
if !input.is_empty() {
Expand All @@ -311,8 +312,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::DiffOwned)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::DiffOwned)], &mut Vec<(V, T2::DiffOwned)>, &mut Vec<(V, T2::DiffOwned)>)+'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,_,V,_,_>(self, name, from, logic)
Expand Down
10 changes: 7 additions & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ pub fn arrange_from_upsert<G, V, F, Tr>(
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr: Trace+TraceReader<Diff=isize>+'static,
// If we ever change `isize` to something else, update the `Diff->DiffOwned` logic!
Tr: Trace+TraceReader<DiffOwned=isize>+'static,
Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::DiffOwned)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -255,7 +256,10 @@ where
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
trace_cursor.map_times(&trace_storage, |_time, diff| {
// Note that `DiffOwned = isize`, so this clone should be cheap.
count += diff.into_owned()
});
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
Expand Down
2 changes: 1 addition & 1 deletion src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Tr>(&self, name: &str) -> Self
where
Tr: crate::trace::Trace<KeyOwned = D,Time=G::Timestamp,Diff=R>+'static,
Tr: crate::trace::Trace<KeyOwned = D, Time=G::Timestamp, DiffOwned=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
Expand Down
21 changes: 17 additions & 4 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ where G::Timestamp: TotalOrder+Lattice+Ord {
}
}

impl<G, T1> CountTotal<G, T1::KeyOwned, T1::Diff> for Arranged<G, T1>
impl<G, T1> CountTotal<G, T1::KeyOwned, T1::DiffOwned> for Arranged<G, T1>
where
G: Scope<Timestamp=T1::Time>,
T1: for<'a> TraceReader<Val<'a>=&'a ()>+Clone+'static,
T1::KeyOwned: ExchangeData,
T1::Time: TotalOrder,
T1::Diff: ExchangeData,
T1::DiffOwned: ExchangeData,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::KeyOwned, T1::Diff), R2> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::KeyOwned, T1::DiffOwned), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand All @@ -69,6 +69,7 @@ where
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

move |input, output| {
let mut owned_diff = None;

use crate::trace::cursor::MyTrait;
input.for_each(|capability, batches| {
Expand All @@ -80,17 +81,29 @@ where
upper_limit.clone_from(batch.upper());

while let Some(key) = batch_cursor.get_key(&batch) {
let mut count: Option<T1::Diff> = None;
let mut count: Option<T1::DiffOwned> = None;

trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
let diff = if let Some(owned_diff) = &mut owned_diff {
diff.clone_onto(owned_diff);
&*owned_diff
} else {
owned_diff.insert(diff.into_owned())
};
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
});
}

batch_cursor.map_times(&batch, |time, diff| {
let diff = if let Some(owned_diff) = &mut owned_diff {
diff.clone_onto(owned_diff);
&*owned_diff
} else {
owned_diff.insert(diff.into_owned())
};

if let Some(count) = count.as_ref() {
if !count.is_zero() {
Expand Down
Loading