Skip to content

Commit

Permalink
Introduce TimeGat<'a>
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed May 28, 2024
1 parent e214cb5 commit 0678b80
Show file tree
Hide file tree
Showing 21 changed files with 98 additions and 50 deletions.
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ where
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
output_buffer.push((t.join(time), d.into_owned()))
if comparison(&t.into_owned(), initial) {
output_buffer.push((t.into_owned().join(time), d.into_owned()))
}
});
consolidate(&mut output_buffer);
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.less_equal(time) { count.plus_equals(&d.into_owned()); }
if t.into_owned().less_equal(time) { count.plus_equals(&d.into_owned()); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
Expand Down
1 change: 1 addition & 0 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where
type Key<'a> = Tr::Key<'a>;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type TimeGat<'a> = Tr::TimeGat<'a>;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;

Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
-> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
where
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &Tr::Time)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
Expand Down Expand Up @@ -217,7 +217,7 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.clone(), diff.into_owned()));
session.give((datum.clone(), time.into_owned(), diff.into_owned()));
});
}
cursor.step_val(batch);
Expand Down
4 changes: 2 additions & 2 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ where

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(&diff.into_owned()));
if count.is_none() { count = Some(diff.into_owned()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
}
}
});
Expand Down
6 changes: 4 additions & 2 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,10 @@ where
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
Ordering::Equal => {

thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet));
thinker.history2.edits.load(batch, batch_storage, |time| time.clone());
use crate::trace::cursor::IntoOwned;

thinker.history1.edits.load(trace, trace_storage, |time| time.into_owned().join(meet));
thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned());

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
Expand Down
6 changes: 3 additions & 3 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a, C: Cursor> EditList<'a, C> {
/// Loads the contents of a cursor.
fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.clear();
while cursor.val_valid(storage) {
Expand Down Expand Up @@ -103,7 +103,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
}
fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.edits.load(cursor, storage, logic);
}
Expand All @@ -119,7 +119,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
logic: L
) -> HistoryReplay<'storage, 'history, C>
where
L: Fn(&C::Time)->C::Time,
L: Fn(C::TimeGat<'_>)->C::Time,
{
self.clear();
cursor.seek_key(storage, key);
Expand Down
10 changes: 5 additions & 5 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ mod history_replay {
// loaded times by performing the lattice `join` with this value.

// Load the batch contents.
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.clone());
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned());

// We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
// can be used to advance other historical times, which may consolidate their representation. As
Expand Down Expand Up @@ -791,16 +791,16 @@ mod history_replay {

// Load the input and output histories.
let mut input_replay = if let Some(meet) = meet.as_ref() {
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.join(meet))
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned().join(meet))
}
else {
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.clone())
self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned())
};
let mut output_replay = if let Some(meet) = meet.as_ref() {
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.join(meet))
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned().join(meet))
}
else {
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.clone())
self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned())
};

self.synth_times.clear();
Expand Down
2 changes: 1 addition & 1 deletion src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where

if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.clone(), difference));
session.give((key.clone(), time.into_owned(), difference));
}
}
});
Expand Down
3 changes: 2 additions & 1 deletion src/trace/cursor/cursor_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

Expand All @@ -114,7 +115,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
self.cursors[self.min_val[0]].val(&storage[self.min_val[0]])
}
#[inline]
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Vec<C::Storage>, mut logic: L) {
for &index in self.min_val.iter() {
self.cursors[index].map_times(&storage[index], |t,d| logic(t,d));
}
Expand Down
6 changes: 4 additions & 2 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub trait Cursor {
type Val<'a>: Copy + Clone + Ord;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Borrowed form of timestamp.
type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
Expand Down Expand Up @@ -81,7 +83,7 @@ pub trait Cursor {

/// Applies `logic` to each pair of time and difference. Intended for mutation of the
/// closure's scope.
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L);
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L);

/// Advances the cursor to the next key.
fn step_key(&mut self, storage: &Self::Storage);
Expand Down Expand Up @@ -111,7 +113,7 @@ pub trait Cursor {
while self.val_valid(storage) {
let mut kv_out = Vec::new();
self.map_times(storage, |ts, r| {
kv_out.push((ts.clone(), r.into_owned()));
kv_out.push((ts.into_owned(), r.into_owned()));
});
out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out));
self.step_val(storage);
Expand Down
16 changes: 10 additions & 6 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod val_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

Expand Down Expand Up @@ -462,19 +463,20 @@ mod val_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Storage = OrdValBatch<L>;

fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let time = storage.storage.times.index(index);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
logic(time, diff);
}
}
fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down Expand Up @@ -705,6 +707,7 @@ mod key_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

Expand Down Expand Up @@ -932,19 +935,20 @@ mod key_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

type Storage = OrdKeyBatch<L>;

fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let time = storage.storage.times.index(index);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
logic(time, diff);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down
8 changes: 5 additions & 3 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ mod val_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

Expand Down Expand Up @@ -631,6 +632,7 @@ mod val_batch {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;

Expand All @@ -640,12 +642,12 @@ mod val_batch {
storage.storage.keys.index(self.key_cursor)
}
fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index).into_owned();
let time = storage.storage.times.index(index);
let diff = storage.storage.diffs.index(index);
logic(&time, diff);
logic(time, diff);
}
}
fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
Expand Down
1 change: 1 addition & 0 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ where
type Key<'a> = B::Key<'a>;
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type TimeGat<'a> = B::TimeGat<'a>;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

Expand Down
12 changes: 10 additions & 2 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub trait TraceReader {
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Borrowed form of timestamp.
type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
Expand Down Expand Up @@ -263,6 +265,8 @@ where
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Borrowed form of timestamp.
type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
Expand Down Expand Up @@ -375,6 +379,7 @@ pub mod rc_blanket_impls {
type Key<'a> = B::Key<'a>;
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type TimeGat<'a> = B::TimeGat<'a>;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

Expand Down Expand Up @@ -409,6 +414,7 @@ pub mod rc_blanket_impls {
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

Expand All @@ -421,7 +427,7 @@ pub mod rc_blanket_impls {
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

Expand Down Expand Up @@ -478,6 +484,7 @@ pub mod abomonated_blanket_impls {
type Key<'a> = B::Key<'a>;
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type TimeGat<'a> = B::TimeGat<'a>;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;

Expand Down Expand Up @@ -512,6 +519,7 @@ pub mod abomonated_blanket_impls {
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;

Expand All @@ -524,7 +532,7 @@ pub mod abomonated_blanket_impls {
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&Self::Time, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

Expand Down
Loading

0 comments on commit 0678b80

Please sign in to comment.