From 0daf37ca1355010c05809f7804abba4b37a702ef Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 21 May 2024 22:05:00 -0400 Subject: [PATCH] Generic chunker, plus flatcontainer support Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 275 ++++++++++++++- src/operators/arrange/arrangement.rs | 34 +- src/trace/implementations/chunker.rs | 325 ++++++++++++++++++ src/trace/implementations/merge_batcher.rs | 123 ++----- .../implementations/merge_batcher_col.rs | 87 +---- .../implementations/merge_batcher_flat.rs | 325 ++++++++++++++++++ src/trace/implementations/mod.rs | 69 +++- src/trace/implementations/ord_neu.rs | 42 ++- src/trace/implementations/rhh.rs | 5 +- 9 files changed, 1046 insertions(+), 239 deletions(-) create mode 100644 src/trace/implementations/chunker.rs create mode 100644 src/trace/implementations/merge_batcher_flat.rs diff --git a/src/consolidation.rs b/src/consolidation.rs index 1409da072..100159a47 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -10,10 +10,15 @@ //! you need specific behavior, it may be best to defensively copy, paste, and maintain the //! specific behavior you require. +use std::cmp::Ordering; use std::collections::VecDeque; +use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::flatcontainer::{FlatStack, Push, Region}; +use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; -use crate::difference::Semigroup; +use crate::difference::{IsZero, Semigroup}; +use crate::trace::cursor::IntoOwned; /// Sorts and consolidates `vec`. /// @@ -210,6 +215,216 @@ where } } +/// Behavior to sort containers. +pub trait ContainerSorter { + /// Sort `container`, possibly replacing the contents by a different object. + fn sort(&mut self, target: &mut C); +} + +/// A generic container sorter for containers where the item implements [`ConsolidateLayout`]. +#[derive(Default)] +pub struct ExternalContainerSorter { + /// Storage to permute item. + permutation: Vec>, + /// Empty container to write results at. + empty: C, +} + +impl ContainerSorter for ExternalContainerSorter +where + for<'a> C: ConsolidateLayout + PushInto>, +{ + fn sort(&mut self, container: &mut C) { + // SAFETY: `Permutation` is empty, types are equal but have a different lifetime + let mut permutation: Vec> = unsafe { std::mem::transmute::>, Vec>>(std::mem::take(&mut self.permutation)) }; + + permutation.extend(container.drain()); + // permutation.sort_by_key(|item| C::key(item)); + permutation.sort_by(|a, b| C::cmp(a, b)); + + for item in permutation.drain(..) { + self.empty.push(item); + } + + // SAFETY: `Permutation` is empty, types are equal but have a different lifetime + self.permutation = unsafe { std::mem::transmute::>, Vec>>(permutation) }; + std::mem::swap(container, &mut self.empty); + self.empty.clear(); + } +} + +/// Sort containers in-place, with specific implementations. +#[derive(Default, Debug)] +pub struct InPlaceSorter(); + +impl ContainerSorter> for InPlaceSorter +where + T: Ord + Clone, + R: Clone, +{ + #[inline] + fn sort(&mut self, container: &mut Vec<(T, R)>) { + container.sort_by(|(a, _), (b, _)| a.cmp(b)); + } +} + +impl ContainerSorter> for InPlaceSorter +where + D: Ord + Clone, + T: Ord + Clone, + R: Clone, +{ + #[inline] + fn sort(&mut self, target: &mut Vec<(D, T, R)>) { + target.sort_by(|(d1, t1, _), (d2, t2, _)| (d1, t1).cmp(&(d2, t2))); + } +} + +/// Layout of data to be consolidated. +// TODO: This could be split in two, to separate sorting and consolidation. +pub trait ConsolidateLayout: Container { + /// Key portion of data, essentially everything minus the diff + type Key<'a>: Eq where Self: 'a; + + /// GAT diff type. + type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a; + + /// Owned diff type. + type DiffOwned: for<'a> Semigroup>; + + /// Deconstruct an item into key and diff. + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>); + + /// Push an element to a compatible container. + fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned); + + /// Compare two items by key to sort containers. + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; +} + +impl ConsolidateLayout for Vec<(D, R)> +where + D: Ord + Clone + 'static, + for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static, +{ + type Key<'a> = D where Self: 'a; + type Diff<'a> = R where Self: 'a; + type DiffOwned = R; + + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + item + } + + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + item1.0.cmp(&item2.0) + } + + fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned) { + self.push((key, diff)); + } +} + +impl ConsolidateLayout for Vec<(D, T, R)> +where + D: Ord + Clone + 'static, + T: Ord + Clone + 'static, + for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static, +{ + type Key<'a> = (D, T) where Self: 'a; + type Diff<'a> = R where Self: 'a; + type DiffOwned = R; + + fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + ((data, time), diff) + } + + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1)) + } + + fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.push((data, time, diff)); + } +} + +impl ConsolidateLayout for FlatStack, T, R>> +where + for<'a> K: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> K::ReadItem<'a>: Ord + Copy, + for<'a> V: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> V::ReadItem<'a>: Ord + Copy, + for<'a> T: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> T::ReadItem<'a>: Ord + Copy, + R: Region + Push<::Owned> + Clone + 'static, + for<'a> R::Owned: Semigroup>, +{ + type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a; + type Diff<'a> = R::ReadItem<'a> where Self: 'a; + type DiffOwned = R::Owned; + + fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + ((key, val, time), diff) + } + + fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering { + (K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2))) + } + + fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.copy(((key, value), time, diff)); + } +} + +/// Behavior for copying consolidation. +pub trait ConsolidateContainer { + /// Consolidate the contents of `container` and write the result to `target`. + fn consolidate_container(container: &mut C, target: &mut C); +} + +/// Container consolidator that requires the container's item to implement [`ConsolidateLayout`]. +#[derive(Default, Debug)] +pub struct ContainerConsolidator; + +impl ConsolidateContainer for ContainerConsolidator +where + C: ConsolidateLayout, +{ + /// Consolidate the supplied container. + fn consolidate_container(container: &mut C, target: &mut C) { + let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None; + for item in container.drain() { + let (key, diff) = C::into_parts(item); + match &mut previous { + // Initial iteration. + None => previous = Some((key, diff.into_owned())), + Some((prevkey, d)) => { + // Second and following iteration. + if key == *prevkey { + d.plus_equals(&diff); + } else { + // Keys don't match, write down result if non-zero. + if !d.is_zero() { + // Unwrap because we checked for `Some` above. + let (prevkey, diff) = previous.take().unwrap(); + target.push_with_diff(prevkey, diff); + } + // Update `previous` + previous = Some((key, diff.into_owned())); + } + } + } + } + // Write any residual data. + if let Some((previtem, d)) = previous { + if !d.is_zero() { + target.push_with_diff(previtem, d); + } + } + } +} + + + #[cfg(test)] mod tests { use super::*; @@ -316,6 +531,62 @@ mod tests { for i in 0..1024 { assert_eq!((i, 0, 2), collected[i]); } - + } + + #[test] + fn test_consolidate_container() { + let mut data = vec![(1,1), (2, 1), (1, -1)]; + let mut target = Vec::default(); + data.sort(); + ContainerConsolidator::consolidate_container(&mut data, &mut target); + assert_eq!(target, [(2,1)]); + } + + #[cfg(not(debug_assertions))] + const LEN: usize = 256 << 10; + #[cfg(not(debug_assertions))] + const REPS: usize = 10 << 10; + + #[cfg(debug_assertions)] + const LEN: usize = 256 << 1; + #[cfg(debug_assertions)] + const REPS: usize = 10 << 1; + + #[test] + fn test_consolidator_duration() { + let mut data = Vec::with_capacity(LEN); + let mut data2 = Vec::with_capacity(LEN); + let mut target = Vec::new(); + let mut duration = std::time::Duration::default(); + for _ in 0..REPS { + data.clear(); + data2.clear(); + target.clear(); + data.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize)))); + data2.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize)))); + data.sort_by(|x,y| x.0.cmp(&y.0)); + let start = std::time::Instant::now(); + ContainerConsolidator::consolidate_container(&mut data, &mut target); + duration += start.elapsed(); + + consolidate(&mut data2); + assert_eq!(target, data2); + } + println!("elapsed consolidator {duration:?}"); + } + + #[test] + fn test_consolidator_duration_vec() { + let mut data = Vec::with_capacity(LEN); + let mut duration = std::time::Duration::default(); + for _ in 0..REPS { + data.clear(); + data.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize)))); + data.sort_by(|x,y| x.0.cmp(&y.0)); + let start = std::time::Instant::now(); + consolidate(&mut data); + duration += start.elapsed(); + } + println!("elapsed vec {duration:?}"); } } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 370d085a6..ef14f2b92 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -369,15 +369,6 @@ where Tr::Batch: Batch, Tr::Batcher: Batcher, ; - - /// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name. - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher, - ; } impl Arrange> for Collection @@ -395,17 +386,7 @@ where Tr::Batcher: Batcher>, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract>, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher>, - { - arrange_core(&self.inner, pact, name) + arrange_core(&self.inner, exchange, name) } } @@ -583,18 +564,7 @@ where Tr::Batcher: Batcher>, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract>, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher>, - { - self.map(|k| (k, ())) - .arrange_core(pact, name) + arrange_core(&self.map(|k| (k, ())).inner, exchange, name) } } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs new file mode 100644 index 000000000..55ac911a5 --- /dev/null +++ b/src/trace/implementations/chunker.rs @@ -0,0 +1,325 @@ +//! Organize streams of data into sorted chunks. + +use std::marker::PhantomData; +use timely::communication::message::RefOrMut; +use timely::Container; +use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::{PushInto, SizableContainer}; +use crate::consolidation::{consolidate_updates, ConsolidateContainer, ContainerSorter}; +use crate::difference::Semigroup; + +/// Behavior to transform streams of data into sorted chunks of regular size. +pub trait Chunker { + /// Input container type. + type Input; + /// Output container type. + type Output; + + /// Accept a container and absorb its contents. The caller must + /// call [`extract`] or [`finish`] soon after pushing a container. + fn push_container(&mut self, container: RefOrMut); + + /// Extract all read data, leaving unfinished data behind. + fn extract(&mut self) -> Option; + + /// Unconditionally extract all data, leaving no unfinished data behind. + fn finish(&mut self) -> Option; +} + +/// Chunk a stream of vectors into chains of vectors. +pub struct VecChunker { + pending: Vec, + ready: Vec>, +} + +impl Default for VecChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: Vec::default(), + } + } +} + +impl VecChunker { + const BUFFER_SIZE_BYTES: usize = 8 << 10; + fn chunk_capacity(&self) -> usize { + let size = ::std::mem::size_of::(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + fn pending_capacity(&self) -> usize { + self.chunk_capacity() * 2 + } +} + +impl Chunker for VecChunker<((K, V), T, R)> +where + K: Ord + Clone, + V: Ord + Clone, + T: Ord + Clone, + R: Semigroup + Clone, +{ + type Input = Vec<((K, V), T, R)>; + type Output = Self::Input; + + fn push_container(&mut self, container: RefOrMut) { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < self.pending_capacity() { + self.pending.reserve(self.pending_capacity() - self.pending.len()); + } + + // Form chunks from what's in pending. + // This closure does the following: + // * If pending is full, consolidate. + // * If after consolidation it's more than half full, peel off chunks, + // leaving behind any partial chunk in pending. + let form_chunk = |this: &mut Self| { + if this.pending.len() == this.pending.capacity() { + consolidate_updates(&mut this.pending); + if this.pending.len() >= this.chunk_capacity() { + while this.pending.len() > this.chunk_capacity() { + let mut chunk = Vec::with_capacity(this.chunk_capacity()); + chunk.extend(this.pending.drain(..chunk.capacity())); + this.ready.push(chunk); + } + } + } + }; + + // `container` is either a shared reference or an owned allocations. + match container { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + form_chunk(self); + } + } + RefOrMut::Mut(vec) => { + let mut drain = vec.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + form_chunk(self); + } + } + } + } + + fn extract(&mut self) -> Option { + self.ready.pop() + } + + fn finish(&mut self) -> Option { + if !self.pending.is_empty() { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = Vec::with_capacity(self.chunk_capacity()); + chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity()))); + self.ready.push(chunk); + } + } + self.ready.pop() + } +} + +/// Chunk a stream of vectors into chains of vectors. +pub struct ColumnationChunker { + pending: Vec, + ready: Vec>, +} + +impl Default for ColumnationChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: Vec::default(), + } + } +} + +impl ColumnationChunker +where + T: Columnation, +{ + const BUFFER_SIZE_BYTES: usize = 64 << 10; + fn chunk_capacity(&self) -> usize { + let size = ::std::mem::size_of::(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Buffer size for pending updates, currently 2 * [`Self::chunk_capacity`]. + fn pending_capacity(&self) -> usize { + self.chunk_capacity() * 2 + } +} + +impl Chunker for ColumnationChunker<((K, V), T, R)> +where + K: Columnation + Ord + Clone, + V: Columnation + Ord + Clone, + T: Columnation + Ord + Clone, + R: Columnation + Semigroup + Clone, +{ + type Input = Vec<((K, V), T, R)>; + type Output = TimelyStack<((K,V),T,R)>; + + fn push_container(&mut self, container: RefOrMut) { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < self.pending_capacity() { + self.pending.reserve(self.pending_capacity() - self.pending.len()); + } + + // Form chunks from what's in pending. + // This closure does the following: + // * If pending is full, consolidate. + // * If after consolidation it's more than half full, peel off chunks, + // leaving behind any partial chunk in pending. + let form_chunk = |this: &mut Self| { + if this.pending.len() == this.pending.capacity() { + consolidate_updates(&mut this.pending); + if this.pending.len() >= this.chunk_capacity() { + while this.pending.len() > this.chunk_capacity() { + let mut chunk = TimelyStack::with_capacity(this.chunk_capacity()); + for item in this.pending.drain(..chunk.capacity()) { + chunk.copy(&item); + } + this.ready.push(chunk); + } + } + } + }; + + // `container` is either a shared reference or an owned allocations. + match container { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + form_chunk(self); + } + } + RefOrMut::Mut(vec) => { + let mut drain = vec.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + form_chunk(self); + } + } + } + } + + fn extract(&mut self) -> Option { + self.ready.pop() + } + + fn finish(&mut self) -> Option { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = TimelyStack::with_capacity(self.chunk_capacity()); + for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) { + chunk.copy(&item); + } + self.ready.push(chunk); + } + self.ready.pop() + } +} + +/// Chunk a stream of vectors into chains of vectors. +pub struct ContainerChunker +where + I: Container, + for<'a> O: SizableContainer + PushInto>, + Sorter: ContainerSorter, + Consolidator: ConsolidateContainer + ?Sized, +{ + pending: O, + empty: O, + ready: Vec, + sorter: Sorter, + _marker: PhantomData<(I, Consolidator)>, +} + +impl Default for ContainerChunker +where + I: Container, + for<'a> O: SizableContainer + PushInto>, + Sorter: ContainerSorter + Default, + Consolidator: ConsolidateContainer + ?Sized, +{ + fn default() -> Self { + Self { + pending: O::default(), + empty: O::default(), + ready: Vec::default(), + sorter: Sorter::default(), + _marker: PhantomData, + } + } +} + +impl Chunker for ContainerChunker +where + I: Container, + for<'a> O: SizableContainer + PushInto>, + Sorter: ContainerSorter, + Consolidator: ConsolidateContainer, +{ + type Input = I; + type Output = O; + + fn push_container(&mut self, container: RefOrMut) { + if self.pending.capacity() < O::preferred_capacity() { + self.pending.reserve(O::preferred_capacity() - self.pending.len()); + } + // TODO: This uses `IterRef`, which isn't optimal for containers that can move. + for item in container.iter() { + self.pending.push(item); + if self.pending.len() == self.pending.capacity() { + self.sorter.sort(&mut self.pending); + Consolidator::consolidate_container(&mut self.pending, &mut self.empty); + std::mem::swap(&mut self.pending, &mut self.empty); + self.empty.clear(); + if self.pending.len() > self.pending.capacity() / 2 { + self.ready.push(std::mem::take(&mut self.pending)); + } + } + } + } + + fn extract(&mut self) -> Option { + self.ready.pop() + } + + fn finish(&mut self) -> Option { + if !self.pending.is_empty() { + self.sorter.sort(&mut self.pending); + Consolidator::consolidate_container(&mut self.pending, &mut self.empty); + std::mem::swap(&mut self.pending, &mut self.empty); + self.empty.clear(); + if !self.pending.is_empty() { + self.ready.push(std::mem::take(&mut self.pending)); + } + } + self.ready.pop() + } +} diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index bb13cf650..8e4983aed 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,6 +1,7 @@ //! A general purpose `Batcher` implementation based on radix sort. use std::collections::VecDeque; +use std::marker::PhantomData; use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; @@ -9,16 +10,17 @@ use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::{Container, PartialOrder}; -use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; use crate::Data; +use crate::trace::implementations::chunker::Chunker; /// Creates batches from unordered tuples. -pub struct MergeBatcher +pub struct MergeBatcher where - M: Merger, + C: Chunker + Default, + M: Merger