Skip to content

Commit

Permalink
Introduce DiffGat<'a>
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 28, 2024
1 parent 07e1eb9 commit e214cb5
Show file tree
Hide file tree
Showing 20 changed files with 86 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ where
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.clone()))
output_buffer.push((t.join(time), d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(d); }
if t.less_equal(time) { count.plus_equals(&d.into_owned()); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
Expand Down
1 change: 1 addition & 0 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ where
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.clone(), diff.clone()));
session.give((datum.clone(), time.clone(), diff.into_owned()));
});
}
cursor.step_val(batch);
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ where
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
Expand Down
8 changes: 4 additions & 4 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ where
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff.into_owned()));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

Expand All @@ -100,8 +100,8 @@ where
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff.into_owned()));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
Expand Down
3 changes: 2 additions & 1 deletion src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod threshold;

use crate::lattice::Lattice;
use crate::trace::Cursor;
use crate::trace::cursor::IntoOwned;

/// An accumulation of (value, time, diff) updates.
struct EditList<'a, C: Cursor> {
Expand All @@ -43,7 +44,7 @@ impl<'a, C: Cursor> EditList<'a, C> {
{
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
self.seal(cursor.val(storage));
cursor.step_val(storage);
}
Expand Down
13 changes: 7 additions & 6 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::hashable::Hashable;
use crate::collection::AsCollection;
use crate::operators::arrange::{Arranged, ArrangeBySelf};
use crate::trace::{BatchReader, Cursor, TraceReader};
use crate::trace::cursor::IntoOwned;

/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
Expand Down Expand Up @@ -133,8 +134,8 @@ where
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
count.as_mut().map(|c| c.plus_equals(&diff.into_owned()));
if count.is_none() { count = Some(diff.into_owned()); }
});
}

Expand All @@ -146,18 +147,18 @@ where
match &count {
Some(old) => {
let mut temp = old.clone();
temp.plus_equals(diff);
temp.plus_equals(&diff.into_owned());
thresh(key, &temp, Some(old))
},
None => { thresh(key, diff, None) },
None => { thresh(key, &diff.into_owned(), None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
count.plus_equals(diff);
count.plus_equals(&diff.into_owned());
}
else {
count = Some(diff.clone());
count = Some(diff.into_owned());
}

if let Some(difference) = difference {
Expand Down
3 changes: 2 additions & 1 deletion src/trace/cursor/cursor_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = Vec<C::Storage>;

Expand All @@ -113,7 +114,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
}
#[inline]
fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
for &index in self.min_val.iter() {
self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
}
Expand Down
10 changes: 6 additions & 4 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ pub trait Cursor {
type Val<'a>: Copy + Clone + Ord;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Associated update.
type Diff: Semigroup + ?Sized;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;

/// Storage required by the cursor.
type Storage;
Expand Down Expand Up @@ -79,7 +81,7 @@ pub trait Cursor {

/// Applies `logic` to each pair of time and difference. Intended for mutation of the
/// closure's scope.
fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L);
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L);

/// Advances the cursor to the next key.
fn step_key(&mut self, storage: &Self::Storage);
Expand Down Expand Up @@ -109,7 +111,7 @@ pub trait Cursor {
while self.val_valid(storage) {
let mut kv_out = Vec::new();
self.map_times(storage, |ts, r| {
kv_out.push((ts.clone(), r.clone()));
kv_out.push((ts.clone(), r.into_owned()));
});
out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out));
self.step_val(storage);
Expand Down
16 changes: 10 additions & 6 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod val_batch {
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Cursor = OrdValCursor<L>;
fn cursor(&self) -> Self::Cursor {
Expand Down Expand Up @@ -462,17 +463,18 @@ mod val_batch {
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Storage = OrdValBatch<L>;

fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let diff = storage.storage.diffs.index(index).into_owned();
logic(&time, &diff);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
}
}
fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down Expand Up @@ -704,6 +706,7 @@ mod key_batch {
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Cursor = OrdKeyCursor<L>;
fn cursor(&self) -> Self::Cursor {
Expand Down Expand Up @@ -930,17 +933,18 @@ mod key_batch {
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Storage = OrdKeyBatch<L>;

fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L2) {
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let diff = storage.storage.diffs.index(index).into_owned();
logic(&time, &diff);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down
8 changes: 5 additions & 3 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ mod val_batch {
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Cursor = RhhValCursor<L>;
fn cursor(&self) -> Self::Cursor {
Expand Down Expand Up @@ -631,19 +632,20 @@ mod val_batch {
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Storage = RhhValBatch<L>;

fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> {
storage.storage.keys.index(self.key_cursor)
}
fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let diff = storage.storage.diffs.index(index).into_owned();
logic(&time, &diff);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
}
}
fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down
1 change: 1 addition & 0 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ where
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

type Batch = B;
type Storage = Vec<B>;
Expand Down
18 changes: 13 additions & 5 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ pub trait TraceReader {
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Associated update.
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;

/// The type of an immutable collection of updates.
type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>+Clone+'static;
Expand Down Expand Up @@ -261,8 +263,10 @@ where
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Associated update.
type Diff: Semigroup;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;

/// The type used to enumerate the batch's contents.
type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>;
Expand Down Expand Up @@ -372,6 +376,7 @@ pub mod rc_blanket_impls {
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

/// The type used to enumerate the batch's contents.
type Cursor = RcBatchCursor<B::Cursor>;
Expand Down Expand Up @@ -405,6 +410,7 @@ pub mod rc_blanket_impls {
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = Rc<C::Storage>;

Expand All @@ -415,7 +421,7 @@ pub mod rc_blanket_impls {
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

Expand Down Expand Up @@ -473,6 +479,7 @@ pub mod abomonated_blanket_impls {
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

/// The type used to enumerate the batch's contents.
type Cursor = AbomonatedBatchCursor<B::Cursor>;
Expand Down Expand Up @@ -506,6 +513,7 @@ pub mod abomonated_blanket_impls {
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = Abomonated<C::Storage, Vec<u8>>;

Expand All @@ -516,7 +524,7 @@ pub mod abomonated_blanket_impls {
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

Expand Down
8 changes: 6 additions & 2 deletions src/trace/wrappers/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ where
type Val<'a> = Tr::Val<'a>;
type Time = TInner;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;

type Batch = BatchEnter<Tr::Batch, TInner>;
type Storage = Tr::Storage;
Expand Down Expand Up @@ -117,6 +118,7 @@ where
type Val<'a> = B::Val<'a>;
type Time = TInner;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

type Cursor = BatchCursorEnter<B::Cursor, TInner>;

Expand Down Expand Up @@ -169,6 +171,7 @@ where
type Val<'a> = C::Val<'a>;
type Time = TInner;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = C::Storage;

Expand All @@ -179,7 +182,7 @@ where
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
self.cursor.map_times(storage, |time, diff| {
logic(&TInner::to_inner(time.clone()), diff)
})
Expand Down Expand Up @@ -220,6 +223,7 @@ where
type Val<'a> = C::Val<'a>;
type Time = TInner;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

type Storage = BatchEnter<C::Storage, TInner>;

Expand All @@ -230,7 +234,7 @@ where
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }

#[inline]
fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
self.cursor.map_times(&storage.batch, |time, diff| {
logic(&TInner::to_inner(time.clone()), diff)
})
Expand Down
Loading

0 comments on commit e214cb5

Please sign in to comment.