diff --git a/examples/spines.rs b/examples/spines.rs index 81fa438b2..d148d610f 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -31,14 +31,14 @@ fn main() { match mode.as_str() { "new" => { - use differential_dataflow::trace::implementations::ord::ColKeySpine; + use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; let data = data.arrange::>(); let keys = keys.arrange::>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { - use differential_dataflow::trace::implementations::ord::OrdKeySpine; + use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine; let data = data.arrange::>(); let keys = keys.arrange::>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 1b3bad7df..3f1db85a2 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,13 +45,13 @@ pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; -pub mod ord; +// pub mod ord; pub mod ord_neu; pub mod rhh; // Opinionated takes on default spines. -pub use self::ord::OrdValSpine as ValSpine; -pub use self::ord::OrdKeySpine as KeySpine; +pub use self::ord_neu::OrdValSpine as ValSpine; +pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{Borrow, ToOwned}; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 65f1d3155..1d80ef742 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -18,6 +18,7 @@ use trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred}; use self::val_batch::{OrdValBatch, OrdValBuilder}; +use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine< @@ -35,6 +36,22 @@ pub type ColValSpine = Spine< RcBuilder>>, >; +/// A trace implementation using a spine of ordered lists. +pub type OrdKeySpine = Spine< + Rc>>, + MergeBatcher, + RcBuilder>>, +>; +// /// A trace implementation for empty values using a spine of ordered lists. +// pub type OrdKeySpine = Spine>>>; + +/// A trace implementation backed by columnar storage. +pub type ColKeySpine = Spine< + Rc>>, + ColumnatedMergeBatcher, + RcBuilder>>, +>; + /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, @@ -248,8 +265,8 @@ mod val_batch { /// /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. /// If the result does not wholly cancel, they key will be present in `self` with the - /// compacted values and updates. - /// + /// compacted values and updates. + /// /// The caller should be certain to update the cursor, as this method does not do this. fn copy_key(&mut self, source: &OrdValStorage, cursor: usize) { // Capture the initial number of values to determine if the merge was ultimately non-empty. @@ -277,7 +294,7 @@ mod val_batch { fn merge_key(&mut self, source1: &OrdValStorage, source2: &OrdValStorage) { use ::std::cmp::Ordering; match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { - Ordering::Less => { + Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; }, @@ -612,6 +629,435 @@ mod val_batch { mod key_batch { - // Copy the above, once it works! + use std::borrow::Borrow; + use std::convert::TryInto; + use std::marker::PhantomData; + use timely::progress::{Antichain, frontier::AntichainRef}; + + use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; + use trace::implementations::{BatchContainer, OffsetList}; + + use super::{Layout, Update}; + + /// An immutable collection of update tuples, from a contiguous interval of logical times. + #[derive(Abomonation, Debug)] + pub struct OrdKeyStorage { + /// An ordered list of keys, corresponding to entries in `keys_offs`. + pub keys: L::KeyContainer, + /// Offsets used to provide indexes from keys to updates. + /// + /// This list has a special representation that any empty range indicates the singleton + /// element just before the range, as if the start were decremented by one. The empty + /// range is otherwise an invalid representation, and we borrow it to compactly encode + /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). + /// + /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. + pub keys_offs: OffsetList, + /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. + pub updates: L::UpdContainer, + } + + impl OrdKeyStorage { + /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. + fn updates_for_key(&self, index: usize) -> (usize, usize) { + let mut lower = self.keys_offs.index(index); + let upper = self.keys_offs.index(index+1); + // We use equal lower and upper to encode "singleton update; just before here". + // It should only apply when there is a prior element, so `lower` should be greater than zero. + if lower == upper { + assert!(lower > 0); + lower -= 1; + } + (lower, upper) + } + } + + /// An immutable collection of update tuples, from a contiguous interval of logical times. + /// + /// The `L` parameter captures how the updates should be laid out, and `C` determines which + /// merge batcher to select. + #[derive(Abomonation)] + pub struct OrdKeyBatch { + /// The updates themselves. + pub storage: OrdKeyStorage, + /// Description of the update times this layer represents. + pub description: Description<::Time>, + /// The number of updates reflected in the batch. + /// + /// We track this separately from `storage` because due to the singleton optimization, + /// we may have many more updates than `storage.updates.len()`. It should equal that + /// length, plus the number of singleton optimizations employed. + pub updates: usize, + } + + impl BatchReader for OrdKeyBatch { + type Key = ::Key; + type Val = (); + type Time = ::Time; + type Diff = ::Diff; + + type Cursor = OrdKeyCursor; + fn cursor(&self) -> Self::Cursor { + OrdKeyCursor { + key_cursor: 0, + val_stepped: false, + phantom: std::marker::PhantomData, + } + } + fn len(&self) -> usize { + // Normally this would be `self.updates.len()`, but we have a clever compact encoding. + // Perhaps we should count such exceptions to the side, to provide a correct accounting. + self.updates + } + fn description(&self) -> &Description<::Time> { &self.description } + } + + impl Batch for OrdKeyBatch { + type Merger = OrdKeyMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { + OrdKeyMerger::new(self, other, compaction_frontier) + } + } + + /// State for an in-progress merge. + pub struct OrdKeyMerger { + /// Key position to merge next in the first batch. + key_cursor1: usize, + /// Key position to merge next in the second batch. + key_cursor2: usize, + /// result that we are currently assembling. + result: OrdKeyStorage, + /// description + description: Description<::Time>, -} \ No newline at end of file + /// Local stash of updates, to use for consolidation. + /// + /// We could emulate a `ChangeBatch` here, with related compaction smarts. + /// A `ChangeBatch` itself needs an `i64` diff type, which we have not. + update_stash: Vec<(::Time, ::Diff)>, + /// Counts the number of singleton-optimized entries, that we may correctly count the updates. + singletons: usize, + } + + impl Merger> for OrdKeyMerger + where + OrdKeyBatch: Batch::Time> + { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + + assert!(batch1.upper() == batch2.lower()); + use lattice::Lattice; + let mut since = batch1.description().since().join(batch2.description().since()); + since = since.join(&compaction_frontier.to_owned()); + + let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); + + let batch1 = &batch1.storage; + let batch2 = &batch2.storage; + + let mut storage = OrdKeyStorage { + keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), + keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), + }; + + storage.keys_offs.push(0.try_into().ok().unwrap()); + + OrdKeyMerger { + key_cursor1: 0, + key_cursor2: 0, + result: storage, + description, + update_stash: Vec::new(), + singletons: 0, + } + } + fn done(self) -> OrdKeyBatch { + OrdKeyBatch { + updates: self.result.updates.len() + self.singletons, + storage: self.result, + description: self.description, + } + } + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + + // An (incomplete) indication of the amount of work we've done so far. + let starting_updates = self.result.updates.len(); + let mut effort = 0isize; + + // While both mergees are still active, perform single-key merges. + while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { + self.merge_key(&source1.storage, &source2.storage); + // An (incomplete) accounting of the work we've done. + effort = (self.result.updates.len() - starting_updates) as isize; + } + + // Merging is complete, and only copying remains. + // Key-by-key copying allows effort interruption, and compaction. + while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { + self.copy_key(&source1.storage, self.key_cursor1); + self.key_cursor1 += 1; + effort = (self.result.updates.len() - starting_updates) as isize; + } + while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { + self.copy_key(&source2.storage, self.key_cursor2); + self.key_cursor2 += 1; + effort = (self.result.updates.len() - starting_updates) as isize; + } + + *fuel -= effort; + } + } + + // Helper methods in support of merging batches. + impl OrdKeyMerger { + /// Copy the next key in `source`. + /// + /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. + /// If the result does not wholly cancel, they key will be present in `self` with the + /// compacted values and updates. + /// + /// The caller should be certain to update the cursor, as this method does not do this. + fn copy_key(&mut self, source: &OrdKeyStorage, cursor: usize) { + self.stash_updates_for_key(source, cursor); + if let Some(off) = self.consolidate_updates() { + self.result.keys_offs.push(off); + self.result.keys.copy(source.keys.index(cursor)); + } + } + /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. + /// + /// This method only merges a single key. It applies all compaction necessary, and may result in no output + /// if the updates cancel either directly or after compaction. + fn merge_key(&mut self, source1: &OrdKeyStorage, source2: &OrdKeyStorage) { + use ::std::cmp::Ordering; + match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + Ordering::Less => { + self.copy_key(source1, self.key_cursor1); + self.key_cursor1 += 1; + }, + Ordering::Equal => { + // Keys are equal; must merge all updates from both sources for this one key. + self.stash_updates_for_key(source1, self.key_cursor1); + self.stash_updates_for_key(source2, self.key_cursor2); + if let Some(off) = self.consolidate_updates() { + self.result.keys_offs.push(off); + self.result.keys.copy(source1.keys.index(self.key_cursor1)); + } + // Increment cursors in either case; the keys are merged. + self.key_cursor1 += 1; + self.key_cursor2 += 1; + }, + Ordering::Greater => { + self.copy_key(source2, self.key_cursor2); + self.key_cursor2 += 1; + }, + } + } + + /// Transfer updates for an indexed value in `source` into `self`, with compaction applied. + fn stash_updates_for_key(&mut self, source: &OrdKeyStorage, index: usize) { + let (lower, upper) = source.updates_for_key(index); + for i in lower .. upper { + // NB: Here is where we would need to look back if `lower == upper`. + let (time, diff) = &source.updates.index(i); + use lattice::Lattice; + let mut new_time = time.clone(); + new_time.advance_by(self.description.since().borrow()); + self.update_stash.push((new_time, diff.clone())); + } + } + + /// Consolidates `self.updates_stash` and produces the offset to record, if any. + fn consolidate_updates(&mut self) -> Option { + use consolidation; + consolidation::consolidate(&mut self.update_stash); + if !self.update_stash.is_empty() { + // If there is a single element, equal to a just-prior recorded update, + // we push nothing and report an unincremented offset to encode this case. + if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + // Just clear out update_stash, as we won't drain it here. + self.update_stash.clear(); + self.singletons += 1; + } + else { + // Conventional; move `update_stash` into `updates`. + for item in self.update_stash.drain(..) { + self.result.updates.push(item); + } + } + Some(self.result.updates.len().try_into().ok().unwrap()) + } else { + None + } + } + } + + /// A cursor for navigating a single layer. + pub struct OrdKeyCursor { + /// Absolute position of the current key. + key_cursor: usize, + /// If the value has been stepped for the key, there are no more values. + val_stepped: bool, + /// Phantom marker for Rust happiness. + phantom: PhantomData, + } + + impl Cursor for OrdKeyCursor { + type Key = ::Key; + type Val = (); + type Time = ::Time; + type Diff = ::Diff; + + type Storage = OrdKeyBatch; + + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + let (lower, upper) = storage.storage.updates_for_key(self.key_cursor); + for index in lower .. upper { + let (time, diff) = &storage.storage.updates.index(index); + logic(time, diff); + } + } + fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() } + fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped } + fn step_key(&mut self, storage: &Self::Storage){ + self.key_cursor += 1; + if self.key_valid(storage) { + self.rewind_vals(storage); + } + else { + self.key_cursor = storage.storage.keys.len(); + } + } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); + if self.key_valid(storage) { + self.rewind_vals(storage); + } + } + fn step_val(&mut self, _storage: &Self::Storage) { + self.val_stepped = true; + } + fn seek_val(&mut self, _storage: &Self::Storage, _val: &Self::Val) { } + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.key_cursor = 0; + if self.key_valid(storage) { + self.rewind_vals(storage) + } + } + fn rewind_vals(&mut self, _storage: &Self::Storage) { + self.val_stepped = false; + } + } + + /// A builder for creating layers from unsorted update tuples. + pub struct OrdKeyBuilder { + result: OrdKeyStorage, + singleton: Option<(::Time, ::Diff)>, + /// Counts the number of singleton optimizations we performed. + /// + /// This number allows us to correctly gauge the total number of updates reflected in a batch, + /// even though `updates.len()` may be much shorter than this amount. + singletons: usize, + } + + impl OrdKeyBuilder { + /// Pushes a single update, which may set `self.singleton` rather than push. + /// + /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. + /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities + /// to encode a singleton update with an "absert" update: repeating the most recent offset. + /// This otherwise invalid state encodes "look back one element". + /// + /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the + /// previously pushed update exactly. In that case, we do not push the update into `updates`. + /// The update tuple is retained in `self.singleton` in case we see another update and need + /// to recover the singleton to push it into `updates` to join the second update. + fn push_update(&mut self, time: ::Time, diff: ::Diff) { + // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. + if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) { + assert!(self.singleton.is_none()); + self.singleton = Some((time, diff)); + } + else { + // If we have pushed a single element, we need to copy it out to meet this one. + if let Some(time_diff) = self.singleton.take() { + self.result.updates.push(time_diff); + } + self.result.updates.push((time, diff)); + } + } + } + + impl Builder for OrdKeyBuilder + where + OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff>, + ::KeyOwned: Borrow<::Key>, + { + type Item = ((::KeyOwned, ()), ::Time, ::Diff); + type Time = ::Time; + type Output = OrdKeyBatch; + + fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self { + // We don't introduce zero offsets as they will be introduced by the first `push` call. + Self { + result: OrdKeyStorage { + keys: L::KeyContainer::with_capacity(keys), + keys_offs: OffsetList::with_capacity(keys + 1), + updates: L::UpdContainer::with_capacity(upds), + }, + singleton: None, + singletons: 0, + } + } + + #[inline] + fn push(&mut self, ((key, ()), time, diff): Self::Item) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last() == Some(key.borrow()) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len().try_into().ok().unwrap()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.keys.push(key); + } + } + + #[inline] + fn copy(&mut self, ((key, ()), time, diff): &Self::Item) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last() == Some(key.borrow()) { + self.push_update(time.clone(), diff.clone()); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len().try_into().ok().unwrap()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); + self.result.keys.copy(key.borrow()); + } + } + + #[inline(never)] + fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + // Record the final offsets + self.result.keys_offs.push(self.result.updates.len().try_into().ok().unwrap()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + OrdKeyBatch { + updates: self.result.updates.len() + self.singletons, + storage: self.result, + description: Description::new(lower, upper, since), + } + } + } + +} diff --git a/src/trace/mod.rs b/src/trace/mod.rs index da3b7516a..d500539e5 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -10,7 +10,7 @@ pub mod cursor; pub mod description; pub mod implementations; -pub mod layers; +// pub mod layers; pub mod wrappers; use timely::communication::message::RefOrMut;