From 3edbebea0cbec0e22d0907b3a5d5b3e605cf384f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 27 Jun 2024 15:39:31 -0400 Subject: [PATCH 1/7] Define ConsolidateLayout for FlatStack in terms of MergerChunk I'm not sure about this. It's a gross change as it pulls behavior across unrelated places, but I'm also not sure how to implement without effectively duplicating the `MergerChunk`. What might work is a trait that extends `MergerChunk` and just adds the `cmp_without_diff` function. Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 33 ++++++++++--------- .../implementations/merge_batcher_flat.rs | 7 ++++ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index b9495d104..e309b1a77 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -15,10 +15,10 @@ use std::collections::VecDeque; use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; use timely::container::flatcontainer::{FlatStack, Push, Region}; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; use crate::difference::{IsZero, Semigroup}; use crate::trace::cursor::IntoOwned; +use crate::trace::implementations::merge_batcher_flat::MergerChunk; /// Sorts and consolidates `vec`. /// @@ -280,27 +280,28 @@ where } } -impl ConsolidateLayout for FlatStack, T, R>> +impl ConsolidateLayout for FlatStack where - for<'a> K: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> K::ReadItem<'a>: Ord + Copy, - for<'a> V: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> V::ReadItem<'a>: Ord + Copy, - for<'a> T: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> T::ReadItem<'a>: Ord + Copy, - R: Region + Push<::Owned> + Clone + 'static, - for<'a> R::Owned: Semigroup>, + MC: MergerChunk + + Region + + Clone + + for<'a> Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::DiffOwned)> + + 'static, + for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, + for<'a> MC::DiffOwned: Semigroup>, + for<'a> MC::ReadItem<'a>: Copy, { - type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; + type Key<'a> = (MC::Key<'a>, MC::Val<'a>, MC::Time<'a>) where Self: 'a; + type Diff<'a> = MC::Diff<'a> where Self: 'a; + type DiffOwned = MC::DiffOwned; - fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + let (key, val, time, diff) = MC::into_parts(item); ((key, val, time), diff) } - fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering { - (K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2))) + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + MC::cmp_without_diff(*item1, *item2) } fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index ae8d2894f..713483a76 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -72,6 +72,9 @@ pub trait MergerChunk: Region { /// Split a read item into its constituents. Must be cheap. fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); + + /// Compare two items, ignoring the diff. + fn cmp_without_diff<'a, 'b>(item1: Self::ReadItem<'a>, item2: Self::ReadItem<'b>) -> Ordering; } impl MergerChunk for TupleABCRegion, T, R> @@ -94,6 +97,10 @@ where fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { (key, val, time, diff) } + + fn cmp_without_diff<'a, 'b>(((key1, val1), time1, _diff1): Self::ReadItem<'a>, ((key2, val2), time2, _diff2): Self::ReadItem<'b>) -> Ordering { + (K::reborrow(key1), V::reborrow(val1), T::reborrow(time1)).cmp(&(K::reborrow(key2), V::reborrow(val2), T::reborrow(time2))) + } } impl Merger for FlatcontainerMerger From 4601ef40b2305bbee73955f7fa715202f4db02a1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 27 Jun 2024 16:28:56 -0400 Subject: [PATCH 2/7] Same thing for BuilderInput Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_flat.rs | 26 ++++++++++++++ src/trace/implementations/mod.rs | 36 +++++++++---------- 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index 713483a76..2efbe83ea 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -75,6 +75,18 @@ pub trait MergerChunk: Region { /// Compare two items, ignoring the diff. fn cmp_without_diff<'a, 'b>(item1: Self::ReadItem<'a>, item2: Self::ReadItem<'b>) -> Ordering; + + /// Converts a key into one with a narrower lifetime. + #[must_use] + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a; + + /// Converts a value into one with a narrower lifetime. + #[must_use] + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a; } impl MergerChunk for TupleABCRegion, T, R> @@ -101,6 +113,20 @@ where fn cmp_without_diff<'a, 'b>(((key1, val1), time1, _diff1): Self::ReadItem<'a>, ((key2, val2), time2, _diff2): Self::ReadItem<'b>) -> Ordering { (K::reborrow(key1), V::reborrow(val1), T::reborrow(time1)).cmp(&(K::reborrow(key2), V::reborrow(val2), T::reborrow(time2))) } + + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a + { + K::reborrow(item) + } + + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a + { + V::reborrow(item) + } } impl Merger for FlatcontainerMerger diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e39522a7e..8e25f3dd9 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -401,12 +401,12 @@ where mod flatcontainer { use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region}; - use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::progress::Timestamp; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; + use crate::trace::implementations::merge_batcher_flat::MergerChunk; impl Update for FlatLayout where @@ -448,36 +448,34 @@ mod flatcontainer { type OffsetContainer = OffsetList; } - impl BuilderInput for FlatStack,T,R>> + impl BuilderInput for FlatStack where - K: Region + Clone + 'static, - V: Region + Clone + 'static, - T: Region + Clone + 'static, - R: Region + Clone + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, + MC: MergerChunk + Region + Clone + 'static, + for<'a> MC::Key<'a>: Copy, + for<'a> MC::Val<'a>: Copy, + for<'a> MC::Time<'a>: IntoOwned<'a, Owned = MC::TimeOwned>, + for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, KBC: BatchContainer, VBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq>, - for<'a> VBC::ReadItem<'a>: PartialEq>, + for<'a> KBC::ReadItem<'a>: PartialEq>, + for<'a> VBC::ReadItem<'a>: PartialEq>, { - type Key<'a> = K::ReadItem<'a>; - type Val<'a> = V::ReadItem<'a>; - type Time = T::Owned; - type Diff = R::Owned; + type Key<'a> = MC::Key<'a>; + type Val<'a> = MC::Val<'a>; + type Time = MC::TimeOwned; + type Diff = MC::DiffOwned; - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + let (key, val, time, diff) = MC::into_parts(item); (key, val, time.into_owned(), diff.into_owned()) } fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool { - KBC::reborrow(other) == K::reborrow(*this) + KBC::reborrow(other) == MC::reborrow_key(*this) } fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { - VBC::reborrow(other) == V::reborrow(*this) + VBC::reborrow(other) == MC::reborrow_val(*this) } } } From a47f020d7662d0fad240847c5edc70e6cbb91c16 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 27 Jun 2024 21:17:10 -0400 Subject: [PATCH 3/7] Generalize MergerChunk Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 9 +-- .../implementations/merge_batcher_flat.rs | 56 +++++++++++++------ src/trace/implementations/mod.rs | 8 +-- src/trace/implementations/ord_neu.rs | 6 +- 4 files changed, 48 insertions(+), 31 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index e309b1a77..9f339e29b 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -18,7 +18,7 @@ use timely::container::flatcontainer::{FlatStack, Push, Region}; use crate::Data; use crate::difference::{IsZero, Semigroup}; use crate::trace::cursor::IntoOwned; -use crate::trace::implementations::merge_batcher_flat::MergerChunk; +use crate::trace::implementations::merge_batcher_flat::RegionUpdate; /// Sorts and consolidates `vec`. /// @@ -282,12 +282,11 @@ where impl ConsolidateLayout for FlatStack where - MC: MergerChunk + MC: RegionUpdate + Region + Clone + for<'a> Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::DiffOwned)> + 'static, - for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, for<'a> MC::DiffOwned: Semigroup>, for<'a> MC::ReadItem<'a>: Copy, { @@ -301,7 +300,9 @@ where } fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { - MC::cmp_without_diff(*item1, *item2) + let (key1, val1, time1, _diff1) = MC::into_parts(*item1); + let (key2, val2, time2, _diff2) = MC::into_parts(*item2); + (MC::reborrow_key(key1), MC::reborrow_val(val1), MC::reborrow_time(time1)).cmp(&(MC::reborrow_key(key2), MC::reborrow_val(val2), MC::reborrow_time(time2))) } fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index 2efbe83ea..9e10fca69 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -14,7 +14,7 @@ use crate::trace::cursor::IntoOwned; /// A merger for flat stacks. /// -/// `MC` is a [`Region`] that implements [`MergerChunk`]. +/// `MC` is a [`Region`] that implements [`RegionUpdate`]. pub struct FlatcontainerMerger { _marker: PhantomData, } @@ -56,26 +56,23 @@ impl FlatcontainerMerger { } /// Behavior to dissect items of chunks in the merge batcher -pub trait MergerChunk: Region { +pub trait RegionUpdate: Region { /// The key of the update - type Key<'a>: Ord where Self: 'a; + type Key<'a>: Copy + Ord where Self: 'a; /// The value of the update - type Val<'a>: Ord where Self: 'a; + type Val<'a>: Copy + Ord where Self: 'a; /// The time of the update - type Time<'a>: Ord where Self: 'a; + type Time<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::TimeOwned> where Self: 'a; /// The owned time type. type TimeOwned; /// The diff of the update - type Diff<'a> where Self: 'a; + type Diff<'a>: Copy + IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a; /// The owned diff type. type DiffOwned; /// Split a read item into its constituents. Must be cheap. fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); - /// Compare two items, ignoring the diff. - fn cmp_without_diff<'a, 'b>(item1: Self::ReadItem<'a>, item2: Self::ReadItem<'b>) -> Ordering; - /// Converts a key into one with a narrower lifetime. #[must_use] fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> @@ -87,17 +84,30 @@ pub trait MergerChunk: Region { fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> where Self: 'a; + + /// Converts a time into one with a narrower lifetime. + #[must_use] + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a; + + /// Converts a diff into one with a narrower lifetime. + #[must_use] + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a; } -impl MergerChunk for TupleABCRegion, T, R> +impl RegionUpdate for TupleABCRegion, T, R> where K: Region, - for<'a> K::ReadItem<'a>: Ord, + for<'a> K::ReadItem<'a>: Copy + Ord, V: Region, - for<'a> V::ReadItem<'a>: Ord, + for<'a> V::ReadItem<'a>: Copy + Ord, T: Region, - for<'a> T::ReadItem<'a>: Ord, + for<'a> T::ReadItem<'a>: Copy + Ord, R: Region, + for<'a> R::ReadItem<'a>: Copy + Ord, { type Key<'a> = K::ReadItem<'a> where Self: 'a; type Val<'a> = V::ReadItem<'a> where Self: 'a; @@ -110,10 +120,6 @@ where (key, val, time, diff) } - fn cmp_without_diff<'a, 'b>(((key1, val1), time1, _diff1): Self::ReadItem<'a>, ((key2, val2), time2, _diff2): Self::ReadItem<'b>) -> Ordering { - (K::reborrow(key1), V::reborrow(val1), T::reborrow(time1)).cmp(&(K::reborrow(key2), V::reborrow(val2), T::reborrow(time2))) - } - fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> where Self: 'a @@ -127,11 +133,25 @@ where { V::reborrow(item) } + + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a + { + T::reborrow(item) + } + + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a + { + R::reborrow(item) + } } impl Merger for FlatcontainerMerger where - for<'a> MC: MergerChunk + Clone + 'static + for<'a> MC: RegionUpdate + Clone + 'static + ReserveItems<::ReadItem<'a>> + Push<::ReadItem<'a>> + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)> diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 8e25f3dd9..7c7fc74a6 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -406,7 +406,7 @@ mod flatcontainer { use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; - use crate::trace::implementations::merge_batcher_flat::MergerChunk; + use crate::trace::implementations::merge_batcher_flat::RegionUpdate; impl Update for FlatLayout where @@ -450,11 +450,7 @@ mod flatcontainer { impl BuilderInput for FlatStack where - MC: MergerChunk + Region + Clone + 'static, - for<'a> MC::Key<'a>: Copy, - for<'a> MC::Val<'a>: Copy, - for<'a> MC::Time<'a>: IntoOwned<'a, Owned = MC::TimeOwned>, - for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, + MC: RegionUpdate + Region + Clone + 'static, KBC: BatchContainer, VBC: BatchContainer, for<'a> KBC::ReadItem<'a>: PartialEq>, diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 573e9c287..b13a41ff6 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -17,7 +17,7 @@ use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunke use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; -use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk}; +use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, RegionUpdate}; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; @@ -44,7 +44,7 @@ pub type ColValSpine = Spine< /// A trace implementation backed by flatcontainer storage. pub type FlatValSpine = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, RcBuilder>>, >; @@ -74,7 +74,7 @@ pub type ColKeySpine = Spine< /// A trace implementation backed by flatcontainer storage. pub type FlatKeySpine = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, RcBuilder>>, >; From 7d3ddaac0655a0e760d1373cf8625eb6d549d56b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 2 Jul 2024 09:34:15 -0400 Subject: [PATCH 4/7] Renaming Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 24 ++--- .../implementations/merge_batcher_flat.rs | 101 +++++++++--------- src/trace/implementations/mod.rs | 22 ++-- 3 files changed, 75 insertions(+), 72 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 9f339e29b..feac6cafa 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -280,29 +280,29 @@ where } } -impl ConsolidateLayout for FlatStack +impl ConsolidateLayout for FlatStack where - MC: RegionUpdate + R: RegionUpdate + Region + Clone - + for<'a> Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::DiffOwned)> + + for<'a> Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::DiffOwned)> + 'static, - for<'a> MC::DiffOwned: Semigroup>, - for<'a> MC::ReadItem<'a>: Copy, + for<'a> R::DiffOwned: Semigroup>, + for<'a> R::ReadItem<'a>: Copy, { - type Key<'a> = (MC::Key<'a>, MC::Val<'a>, MC::Time<'a>) where Self: 'a; - type Diff<'a> = MC::Diff<'a> where Self: 'a; - type DiffOwned = MC::DiffOwned; + type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a; + type Diff<'a> = R::Diff<'a> where Self: 'a; + type DiffOwned = R::DiffOwned; fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { - let (key, val, time, diff) = MC::into_parts(item); + let (key, val, time, diff) = R::into_parts(item); ((key, val, time), diff) } fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { - let (key1, val1, time1, _diff1) = MC::into_parts(*item1); - let (key2, val2, time2, _diff2) = MC::into_parts(*item2); - (MC::reborrow_key(key1), MC::reborrow_val(val1), MC::reborrow_time(time1)).cmp(&(MC::reborrow_key(key2), MC::reborrow_val(val2), MC::reborrow_time(time2))) + let (key1, val1, time1, _diff1) = R::into_parts(*item1); + let (key2, val2, time2, _diff2) = R::into_parts(*item2); + (R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2))) } fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index 9e10fca69..f313f2829 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -14,21 +14,21 @@ use crate::trace::cursor::IntoOwned; /// A merger for flat stacks. /// -/// `MC` is a [`Region`] that implements [`RegionUpdate`]. -pub struct FlatcontainerMerger { - _marker: PhantomData, +/// `R` is a [`Region`] that implements [`RegionUpdate`]. +pub struct FlatcontainerMerger { + _marker: PhantomData, } -impl Default for FlatcontainerMerger { +impl Default for FlatcontainerMerger { fn default() -> Self { Self { _marker: PhantomData, } } } -impl FlatcontainerMerger { +impl FlatcontainerMerger { const BUFFER_SIZE_BYTES: usize = 8 << 10; fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); + let size = ::std::mem::size_of::(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -40,13 +40,13 @@ impl FlatcontainerMerger { /// Helper to get pre-sized vector from the stash. #[inline] - fn empty(&self, stash: &mut Vec>) -> FlatStack { + fn empty(&self, stash: &mut Vec>) -> FlatStack { stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) } /// Helper to return a chunk to the stash. #[inline] - fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { + fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? if chunk.capacity() == self.chunk_capacity() { chunk.clear(); @@ -98,23 +98,23 @@ pub trait RegionUpdate: Region { Self: 'a; } -impl RegionUpdate for TupleABCRegion, T, R> +impl RegionUpdate for TupleABCRegion, TR, RR> where - K: Region, - for<'a> K::ReadItem<'a>: Copy + Ord, - V: Region, - for<'a> V::ReadItem<'a>: Copy + Ord, - T: Region, - for<'a> T::ReadItem<'a>: Copy + Ord, - R: Region, - for<'a> R::ReadItem<'a>: Copy + Ord, + KR: Region, + for<'a> KR::ReadItem<'a>: Copy + Ord, + VR: Region, + for<'a> VR::ReadItem<'a>: Copy + Ord, + TR: Region, + for<'a> TR::ReadItem<'a>: Copy + Ord, + RR: Region, + for<'a> RR::ReadItem<'a>: Copy + Ord, { - type Key<'a> = K::ReadItem<'a> where Self: 'a; - type Val<'a> = V::ReadItem<'a> where Self: 'a; - type Time<'a> = T::ReadItem<'a> where Self: 'a; - type TimeOwned = T::Owned; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; + type Key<'a> = KR::ReadItem<'a> where Self: 'a; + type Val<'a> = VR::ReadItem<'a> where Self: 'a; + type Time<'a> = TR::ReadItem<'a> where Self: 'a; + type TimeOwned = TR::Owned; + type Diff<'a> = RR::ReadItem<'a> where Self: 'a; + type DiffOwned = RR::Owned; fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { (key, val, time, diff) @@ -124,64 +124,67 @@ where where Self: 'a { - K::reborrow(item) + KR::reborrow(item) } fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> where Self: 'a { - V::reborrow(item) + VR::reborrow(item) } fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> where Self: 'a { - T::reborrow(item) + TR::reborrow(item) } fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> where Self: 'a { - R::reborrow(item) + RR::reborrow(item) } } -impl Merger for FlatcontainerMerger +impl Merger for FlatcontainerMerger where - for<'a> MC: RegionUpdate + Clone + 'static - + ReserveItems<::ReadItem<'a>> - + Push<::ReadItem<'a>> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>, - for<'a> MC::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=MC::TimeOwned>, - for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, - for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, - for<'a> MC::DiffOwned: Default + Semigroup + Semigroup> + Data, + for<'a> R: Region + + RegionUpdate + + Clone + + ReserveItems<::ReadItem<'a>> + + Push<::ReadItem<'a>> + + Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, &'a R::DiffOwned)> + + Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::Diff<'a>)> + + 'static, + for<'a> R::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=R::TimeOwned>, + for<'a> R::Diff<'a>: IntoOwned<'a, Owned = R::DiffOwned>, + for<'a> R::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, + for<'a> R::DiffOwned: Default + Semigroup + Semigroup> + Data, { - type Time = MC::TimeOwned; - type Chunk = FlatStack; - type Output = FlatStack; + type Time = R::TimeOwned; + type Chunk = FlatStack; + type Output = FlatStack; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = >::from(list1.next().unwrap_or_default()); - let mut head2 = >::from(list2.next().unwrap_or_default()); + let mut head1 = >::from(list1.next().unwrap_or_default()); + let mut head2 = >::from(list2.next().unwrap_or_default()); let mut result = self.empty(stash); - let mut diff = MC::DiffOwned::default(); + let mut diff = R::DiffOwned::default(); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { let cmp = { - let (key1, val1, time1, _diff) = MC::into_parts(head1.peek()); - let (key2, val2, time2, _diff) = MC::into_parts(head2.peek()); + let (key1, val1, time1, _diff) = R::into_parts(head1.peek()); + let (key2, val2, time2, _diff) = R::into_parts(head2.peek()); ((key1, val1), time1).cmp(&((key2, val2), time2)) }; // TODO: The following less/greater branches could plausibly be a good moment for @@ -195,8 +198,8 @@ where result.copy(head2.pop()); } Ordering::Equal => { - let (key, val, time1, diff1) = MC::into_parts(head1.pop()); - let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop()); + let (key, val, time1, diff1) = R::into_parts(head1.pop()); + let (_key, _val, _time2, diff2) = R::into_parts(head2.pop()); diff1.clone_onto(&mut diff); diff.plus_equals(&diff2); if !diff.is_zero() { @@ -267,7 +270,7 @@ where let mut ready = self.empty(stash); for buffer in merged { - for (key, val, time, diff) in buffer.iter().map(MC::into_parts) { + for (key, val, time, diff) in buffer.iter().map(R::into_parts) { if upper.less_equal(&time) { frontier.insert_with(&time, |time| (*time).into_owned()); if keep.len() == keep.capacity() && !keep.is_empty() { @@ -307,7 +310,7 @@ where { let mut prev_keyval = None; for buffer in chain.iter() { - for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) { + for (key, val, time, _diff) in buffer.iter().map(R::into_parts) { if !upper.less_equal(&time) { if let Some((p_key, p_val)) = prev_keyval { debug_assert!(p_key <= key); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 7c7fc74a6..b494299ef 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -448,30 +448,30 @@ mod flatcontainer { type OffsetContainer = OffsetList; } - impl BuilderInput for FlatStack + impl BuilderInput for FlatStack where - MC: RegionUpdate + Region + Clone + 'static, + R: RegionUpdate + Region + Clone + 'static, KBC: BatchContainer, VBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq>, - for<'a> VBC::ReadItem<'a>: PartialEq>, + for<'a> KBC::ReadItem<'a>: PartialEq>, + for<'a> VBC::ReadItem<'a>: PartialEq>, { - type Key<'a> = MC::Key<'a>; - type Val<'a> = MC::Val<'a>; - type Time = MC::TimeOwned; - type Diff = MC::DiffOwned; + type Key<'a> = R::Key<'a>; + type Val<'a> = R::Val<'a>; + type Time = R::TimeOwned; + type Diff = R::DiffOwned; fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { - let (key, val, time, diff) = MC::into_parts(item); + let (key, val, time, diff) = R::into_parts(item); (key, val, time.into_owned(), diff.into_owned()) } fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool { - KBC::reborrow(other) == MC::reborrow_key(*this) + KBC::reborrow(other) == R::reborrow_key(*this) } fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { - VBC::reborrow(other) == MC::reborrow_val(*this) + VBC::reborrow(other) == R::reborrow_val(*this) } } } From 0697f7a519247a2f2ca08bad28a59617f3fcce72 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 8 Jul 2024 14:00:07 -0400 Subject: [PATCH 5/7] FlatStack generic storage Signed-off-by: Moritz Hoffmann --- Cargo.toml | 2 +- src/consolidation.rs | 79 +++++++++++++------ src/trace/implementations/chunker.rs | 15 ++-- .../implementations/merge_batcher_flat.rs | 69 +++++++++------- src/trace/implementations/mod.rs | 8 +- src/trace/implementations/ord_neu.rs | 6 +- tests/bfs.rs | 3 +- 7 files changed, 116 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0cca0a4bd..711480041 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ timely = {workspace = true} [workspace.dependencies] #timely = { version = "0.12", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "flatcontainer_storage", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/src/consolidation.rs b/src/consolidation.rs index feac6cafa..143e3a01e 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -14,11 +14,9 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; -use timely::container::flatcontainer::{FlatStack, Push, Region}; use crate::Data; use crate::difference::{IsZero, Semigroup}; use crate::trace::cursor::IntoOwned; -use crate::trace::implementations::merge_batcher_flat::RegionUpdate; /// Sorts and consolidates `vec`. /// @@ -255,6 +253,12 @@ pub trait ConsolidateLayout: Container { /// Compare two items by key to sort containers. fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; + + /// The preferred capacity + fn preferred_capacity() -> usize; + + /// Ensure that the container has sufficient capacity to absorb `preferred_capacity` elements. + fn ensure_preferred_capacity(&mut self); } impl ConsolidateLayout for Vec<(D, T, R)> @@ -278,35 +282,66 @@ where fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { self.push((data, time, diff)); } + + fn preferred_capacity() -> usize { + ::preferred_capacity() + } + + #[inline] + fn ensure_preferred_capacity(&mut self) { + if self.capacity() < ::preferred_capacity() { + self.reserve(::preferred_capacity() - self.capacity()); + } + } } -impl ConsolidateLayout for FlatStack -where - R: RegionUpdate +mod flatcontainer { + use std::cmp::Ordering; + + use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::impls::index::IndexContainer; + + use crate::consolidation::ConsolidateLayout; + use crate::difference::Semigroup; + use crate::trace::implementations::merge_batcher_flat::RegionUpdate; + + impl ConsolidateLayout for FlatStack + where + R: RegionUpdate + Region + Clone + for<'a> Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::DiffOwned)> + 'static, - for<'a> R::DiffOwned: Semigroup>, - for<'a> R::ReadItem<'a>: Copy, -{ - type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a; - type Diff<'a> = R::Diff<'a> where Self: 'a; - type DiffOwned = R::DiffOwned; + for<'a> R::DiffOwned: Semigroup>, + for<'a> R::ReadItem<'a>: Copy, + S: IndexContainer + Clone + 'static, + { + type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a; + type Diff<'a> = R::Diff<'a> where Self: 'a; + type DiffOwned = R::DiffOwned; + + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + let (key, val, time, diff) = R::into_parts(item); + ((key, val, time), diff) + } - fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { - let (key, val, time, diff) = R::into_parts(item); - ((key, val, time), diff) - } + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + let (key1, val1, time1, _diff1) = R::into_parts(*item1); + let (key2, val2, time2, _diff2) = R::into_parts(*item2); + (R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2))) + } - fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { - let (key1, val1, time1, _diff1) = R::into_parts(*item1); - let (key2, val2, time2, _diff2) = R::into_parts(*item2); - (R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2))) - } + fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.copy(((key, value), time, diff)); + } - fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { - self.copy(((key, value), time, diff)); + fn preferred_capacity() -> usize { + 1024 + } + + fn ensure_preferred_capacity(&mut self) { + // Nop + } } } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 527a614d0..79172767e 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::{ContainerBuilder, PushInto}; use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; use crate::difference::Semigroup; @@ -291,21 +291,18 @@ where impl<'a, Input, Output> PushInto> for ContainerChunker where Input: Container, - Output: SizableContainer - + ConsolidateLayout + Output: ConsolidateLayout + PushInto> + PushInto>, { fn push_into(&mut self, container: RefOrMut<'a, Input>) { - if self.pending.capacity() < Output::preferred_capacity() { - self.pending.reserve(Output::preferred_capacity() - self.pending.len()); - } + self.pending.ensure_preferred_capacity(); let form_batch = |this: &mut Self| { - if this.pending.len() == this.pending.capacity() { + if this.pending.len() >= Output::preferred_capacity() { consolidate_container(&mut this.pending, &mut this.empty); std::mem::swap(&mut this.pending, &mut this.empty); this.empty.clear(); - if this.pending.len() > this.pending.capacity() / 2 { + if this.pending.len() > Output::preferred_capacity() / 2 { // Note that we're pushing non-full containers, which is a deviation from // other implementation. The reason for this is that we cannot extract // partial data from `this.pending`. We should revisit this in the future. @@ -332,7 +329,7 @@ where impl ContainerBuilder for ContainerChunker where - Output: SizableContainer + ConsolidateLayout, + Output: ConsolidateLayout, { type Container = Output; diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index f313f2829..237a74c04 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -5,6 +5,7 @@ use std::marker::PhantomData; use timely::progress::frontier::{Antichain, AntichainRef}; use timely::{Container, Data, PartialOrder}; use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; +use timely::container::flatcontainer::impls::index::IndexContainer; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::difference::{IsZero, Semigroup}; @@ -15,17 +16,21 @@ use crate::trace::cursor::IntoOwned; /// A merger for flat stacks. /// /// `R` is a [`Region`] that implements [`RegionUpdate`]. -pub struct FlatcontainerMerger { - _marker: PhantomData, +pub struct FlatcontainerMerger::Index>> { + _marker: PhantomData<(R, S)>, } -impl Default for FlatcontainerMerger { +impl Default for FlatcontainerMerger { fn default() -> Self { Self { _marker: PhantomData, } } } -impl FlatcontainerMerger { +impl FlatcontainerMerger +where + R: Region, + S: IndexContainer<::Index> + Clone + 'static, +{ const BUFFER_SIZE_BYTES: usize = 8 << 10; fn chunk_capacity(&self) -> usize { let size = ::std::mem::size_of::(); @@ -40,18 +45,16 @@ impl FlatcontainerMerger { /// Helper to get pre-sized vector from the stash. #[inline] - fn empty(&self, stash: &mut Vec>) -> FlatStack { + fn empty(&self, stash: &mut Vec>) -> FlatStack { stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) } /// Helper to return a chunk to the stash. #[inline] - fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { + fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { - chunk.clear(); - stash.push(chunk); - } + chunk.clear(); + stash.push(chunk); } } @@ -149,7 +152,7 @@ where } } -impl Merger for FlatcontainerMerger +impl Merger for FlatcontainerMerger where for<'a> R: Region + RegionUpdate @@ -163,17 +166,18 @@ where for<'a> R::Diff<'a>: IntoOwned<'a, Owned = R::DiffOwned>, for<'a> R::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, for<'a> R::DiffOwned: Default + Semigroup + Semigroup> + Data, + S: IndexContainer<::Index> + Clone + 'static, { type Time = R::TimeOwned; - type Chunk = FlatStack; - type Output = FlatStack; + type Chunk = FlatStack; + type Output = FlatStack; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = >::from(list1.next().unwrap_or_default()); - let mut head2 = >::from(list2.next().unwrap_or_default()); + let mut head1 = >::from(list1.next().unwrap_or_default()); + let mut head2 = >::from(list2.next().unwrap_or_default()); let mut result = self.empty(stash); @@ -181,7 +185,7 @@ where // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { + while (result.len() < self.chunk_capacity()) && !head1.is_empty() && !head2.is_empty() { let cmp = { let (key1, val1, time1, _diff) = R::into_parts(head1.peek()); let (key2, val2, time2, _diff) = R::into_parts(head2.peek()); @@ -209,7 +213,7 @@ where } } - if result.capacity() == result.len() { + if result.len() == self.chunk_capacity() { output.push(result); result = self.empty(stash); } @@ -225,7 +229,7 @@ where } while !head1.is_empty() { - let advance = result.capacity() - result.len(); + let advance = self.chunk_capacity().saturating_sub(result.len()); let iter = head1.iter().take(advance); result.reserve_items(iter.clone()); for item in iter { @@ -243,7 +247,7 @@ where self.recycle(head1.done(), stash); while !head2.is_empty() { - let advance = result.capacity() - result.len(); + let advance = self.chunk_capacity().saturating_sub(result.len()); let iter = head2.iter().take(advance); result.reserve_items(iter.clone()); for item in iter { @@ -273,13 +277,13 @@ where for (key, val, time, diff) in buffer.iter().map(R::into_parts) { if upper.less_equal(&time) { frontier.insert_with(&time, |time| (*time).into_owned()); - if keep.len() == keep.capacity() && !keep.is_empty() { + if keep.len() == self.chunk_capacity() && !keep.is_empty() { kept.push(keep); keep = self.empty(stash); } keep.copy(((key, val), time, diff)); } else { - if ready.len() == ready.capacity() && !ready.is_empty() { + if ready.len() == self.chunk_capacity() && !ready.is_empty() { readied.push(ready); ready = self.empty(stash); } @@ -351,18 +355,27 @@ where } } -struct FlatStackQueue { - list: FlatStack, +struct FlatStackQueue +{ + list: FlatStack, head: usize, } -impl Default for FlatStackQueue { +impl Default for FlatStackQueue +where + R: Region, + S: IndexContainer<::Index>, +{ fn default() -> Self { - Self::from(Default::default()) + Self::from(FlatStack::default()) } } -impl FlatStackQueue { +impl FlatStackQueue +where + R: Region, + S: IndexContainer<::Index>, +{ fn pop(&mut self) -> R::ReadItem<'_> { self.head += 1; self.list.get(self.head - 1) @@ -372,11 +385,11 @@ impl FlatStackQueue { self.list.get(self.head) } - fn from(list: FlatStack) -> Self { + fn from(list: FlatStack) -> Self { FlatStackQueue { list, head: 0 } } - fn done(self) -> FlatStack { + fn done(self) -> FlatStack { self.list } diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index b494299ef..b9722b720 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -401,6 +401,7 @@ where mod flatcontainer { use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region}; + use timely::container::flatcontainer::impls::index::IndexContainer; use timely::progress::Timestamp; use crate::difference::Semigroup; @@ -448,13 +449,14 @@ mod flatcontainer { type OffsetContainer = OffsetList; } - impl BuilderInput for FlatStack + impl BuilderInput for FlatStack where R: RegionUpdate + Region + Clone + 'static, KBC: BatchContainer, VBC: BatchContainer, for<'a> KBC::ReadItem<'a>: PartialEq>, for<'a> VBC::ReadItem<'a>: PartialEq>, + S: IndexContainer + Clone + 'static, { type Key<'a> = R::Key<'a>; type Val<'a> = R::Val<'a>; @@ -614,12 +616,14 @@ pub mod containers { mod flatcontainer { use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::impls::index::IndexContainer; use crate::trace::implementations::BatchContainer; - impl BatchContainer for FlatStack + impl BatchContainer for FlatStack where for<'a> R: Region + Push<::ReadItem<'a>> + 'static, for<'a> R::ReadItem<'a>: Copy + Ord, + S: IndexContainer + Clone + 'static, { type Owned = R::Owned; type ReadItem<'a> = R::ReadItem<'a>; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index b13a41ff6..42e58ca66 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -42,10 +42,10 @@ pub type ColValSpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< +pub type FlatValSpine::Index>> = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, - RcBuilder>>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + RcBuilder>>, >; /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. diff --git a/tests/bfs.rs b/tests/bfs.rs index 58471947b..1d0b1ec08 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -300,12 +300,13 @@ where for<'a> G::Timestamp: PartialOrder<<::Region as Region>::ReadItem<'a>>, ::Region: Region + Push, for<'a> as RegionPreference>::Region: Region> + Push<< as RegionPreference>::Region as Region>::ReadItem<'a>>, - ::Region: Clone + Ord, + ::Region: Clone + Ord + for<'a> ReserveItems<&'a G::Timestamp>, for<'a> FlatProductRegion<::Region, MirrorRegion>: Push<&'a Product>, for<'a> ::Region, MirrorRegion> as Region>::ReadItem<'a>: Copy + Ord + Debug, Product: for<'a> PartialOrder<< as RegionPreference>::Region as Region>::ReadItem<'a>>, for<'a> < as RegionPreference>::Region as Region>::ReadItem<'a>: PartialOrder>, for<'a> as RegionPreference>::Region: ReserveItems<< as RegionPreference>::Region as Region>::ReadItem<'a>>, + for<'a> as RegionPreference>::Region: ReserveItems<&'a < as RegionPreference>::Region as Region>::ReadItem<'a>>, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); From 8d476ef7b2d4cef67153033002ea0e6acb029630 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 8 Jul 2024 16:00:33 -0400 Subject: [PATCH 6/7] Expand comments Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 10 +++++++++- src/trace/implementations/merge_batcher_flat.rs | 3 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 143e3a01e..af81ee344 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -336,11 +336,19 @@ mod flatcontainer { } fn preferred_capacity() -> usize { + // We don't have a good way to present any pre-defined capacity here, since it's a + // concept foreign to flat containers. Each region might have a capacity, but overall + // the concept of capacity does not exist. For this reason, we just hardcode a number, + // which seems to work reasonably well. + // + // We should revisit this if/once we have an abstraction that can express a capacity + // for `FlatStack`, but we arent' there yet. 1024 } fn ensure_preferred_capacity(&mut self) { - // Nop + // Nop, same reasoning as for `preferred_capacity`. We don't know how to ensure capacity + // for a certain number of elements. } } } diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index 237a74c04..3e9304f8b 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -53,6 +53,9 @@ where #[inline] fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? + // TODO: Ideally, we check that `chunk` has a shape (capacity) that fits, but flat + // containers don't have a concept of capacity on a `FlatStack`-level, only on individual + // regions. chunk.clear(); stash.push(chunk); } From bb5836a50e5233795cf3efcc042f8945f78b4350 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 9 Jul 2024 10:19:13 -0400 Subject: [PATCH 7/7] CapacityContainer Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 37 ++-------------------------- src/trace/implementations/chunker.rs | 4 ++- 2 files changed, 5 insertions(+), 36 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index af81ee344..7b5dc5fd3 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -154,7 +154,7 @@ where // TODO: Can we replace `multiple` by a bool? #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = as SizableContainer>::preferred_capacity(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { @@ -178,7 +178,7 @@ where /// Precondition: `current` is not allocated or has space for at least one element. #[inline] fn push_into(&mut self, item: P) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = as SizableContainer>::preferred_capacity(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); } @@ -253,12 +253,6 @@ pub trait ConsolidateLayout: Container { /// Compare two items by key to sort containers. fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; - - /// The preferred capacity - fn preferred_capacity() -> usize; - - /// Ensure that the container has sufficient capacity to absorb `preferred_capacity` elements. - fn ensure_preferred_capacity(&mut self); } impl ConsolidateLayout for Vec<(D, T, R)> @@ -282,17 +276,6 @@ where fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { self.push((data, time, diff)); } - - fn preferred_capacity() -> usize { - ::preferred_capacity() - } - - #[inline] - fn ensure_preferred_capacity(&mut self) { - if self.capacity() < ::preferred_capacity() { - self.reserve(::preferred_capacity() - self.capacity()); - } - } } mod flatcontainer { @@ -334,22 +317,6 @@ mod flatcontainer { fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { self.copy(((key, value), time, diff)); } - - fn preferred_capacity() -> usize { - // We don't have a good way to present any pre-defined capacity here, since it's a - // concept foreign to flat containers. Each region might have a capacity, but overall - // the concept of capacity does not exist. For this reason, we just hardcode a number, - // which seems to work reasonably well. - // - // We should revisit this if/once we have an abstraction that can express a capacity - // for `FlatStack`, but we arent' there yet. - 1024 - } - - fn ensure_preferred_capacity(&mut self) { - // Nop, same reasoning as for `preferred_capacity`. We don't know how to ensure capacity - // for a certain number of elements. - } } } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 79172767e..2028a3034 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -4,7 +4,8 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::container::{ContainerBuilder, PushInto}; +use timely::container::{CapacityContainer, ContainerBuilder, PushInto}; + use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; use crate::difference::Semigroup; @@ -292,6 +293,7 @@ impl<'a, Input, Output> PushInto> for ContainerChunker> + PushInto>, {