diff --git a/Cargo.toml b/Cargo.toml index 0cca0a4bd..711480041 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ timely = {workspace = true} [workspace.dependencies] #timely = { version = "0.12", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "flatcontainer_storage", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/src/consolidation.rs b/src/consolidation.rs index b9495d104..7b5dc5fd3 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -14,8 +14,6 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; -use timely::container::flatcontainer::{FlatStack, Push, Region}; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; use crate::difference::{IsZero, Semigroup}; use crate::trace::cursor::IntoOwned; @@ -156,7 +154,7 @@ where // TODO: Can we replace `multiple` by a bool? #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = as SizableContainer>::preferred_capacity(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { @@ -180,7 +178,7 @@ where /// Precondition: `current` is not allocated or has space for at least one element. #[inline] fn push_into(&mut self, item: P) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = as SizableContainer>::preferred_capacity(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); } @@ -280,31 +278,45 @@ where } } -impl ConsolidateLayout for FlatStack, T, R>> -where - for<'a> K: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> K::ReadItem<'a>: Ord + Copy, - for<'a> V: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> V::ReadItem<'a>: Ord + Copy, - for<'a> T: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> T::ReadItem<'a>: Ord + Copy, - R: Region + Push<::Owned> + Clone + 'static, - for<'a> R::Owned: Semigroup>, -{ - type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; - - fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { - ((key, val, time), diff) - } +mod flatcontainer { + use std::cmp::Ordering; + + use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::impls::index::IndexContainer; + + use crate::consolidation::ConsolidateLayout; + use crate::difference::Semigroup; + use crate::trace::implementations::merge_batcher_flat::RegionUpdate; + + impl ConsolidateLayout for FlatStack + where + R: RegionUpdate + + Region + + Clone + + for<'a> Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::DiffOwned)> + + 'static, + for<'a> R::DiffOwned: Semigroup>, + for<'a> R::ReadItem<'a>: Copy, + S: IndexContainer + Clone + 'static, + { + type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a; + type Diff<'a> = R::Diff<'a> where Self: 'a; + type DiffOwned = R::DiffOwned; + + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + let (key, val, time, diff) = R::into_parts(item); + ((key, val, time), diff) + } - fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering { - (K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2))) - } + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + let (key1, val1, time1, _diff1) = R::into_parts(*item1); + let (key2, val2, time2, _diff2) = R::into_parts(*item2); + (R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2))) + } - fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { - self.copy(((key, value), time, diff)); + fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.copy(((key, value), time, diff)); + } } } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 527a614d0..2028a3034 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -4,7 +4,8 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::{CapacityContainer, ContainerBuilder, PushInto}; + use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; use crate::difference::Semigroup; @@ -291,21 +292,19 @@ where impl<'a, Input, Output> PushInto> for ContainerChunker where Input: Container, - Output: SizableContainer - + ConsolidateLayout + Output: ConsolidateLayout + + CapacityContainer + PushInto> + PushInto>, { fn push_into(&mut self, container: RefOrMut<'a, Input>) { - if self.pending.capacity() < Output::preferred_capacity() { - self.pending.reserve(Output::preferred_capacity() - self.pending.len()); - } + self.pending.ensure_preferred_capacity(); let form_batch = |this: &mut Self| { - if this.pending.len() == this.pending.capacity() { + if this.pending.len() >= Output::preferred_capacity() { consolidate_container(&mut this.pending, &mut this.empty); std::mem::swap(&mut this.pending, &mut this.empty); this.empty.clear(); - if this.pending.len() > this.pending.capacity() / 2 { + if this.pending.len() > Output::preferred_capacity() / 2 { // Note that we're pushing non-full containers, which is a deviation from // other implementation. The reason for this is that we cannot extract // partial data from `this.pending`. We should revisit this in the future. @@ -332,7 +331,7 @@ where impl ContainerBuilder for ContainerChunker where - Output: SizableContainer + ConsolidateLayout, + Output: ConsolidateLayout, { type Container = Output; diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index ae8d2894f..3e9304f8b 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -5,6 +5,7 @@ use std::marker::PhantomData; use timely::progress::frontier::{Antichain, AntichainRef}; use timely::{Container, Data, PartialOrder}; use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; +use timely::container::flatcontainer::impls::index::IndexContainer; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::difference::{IsZero, Semigroup}; @@ -14,21 +15,25 @@ use crate::trace::cursor::IntoOwned; /// A merger for flat stacks. /// -/// `MC` is a [`Region`] that implements [`MergerChunk`]. -pub struct FlatcontainerMerger { - _marker: PhantomData, +/// `R` is a [`Region`] that implements [`RegionUpdate`]. +pub struct FlatcontainerMerger::Index>> { + _marker: PhantomData<(R, S)>, } -impl Default for FlatcontainerMerger { +impl Default for FlatcontainerMerger { fn default() -> Self { Self { _marker: PhantomData, } } } -impl FlatcontainerMerger { +impl FlatcontainerMerger +where + R: Region, + S: IndexContainer<::Index> + Clone + 'static, +{ const BUFFER_SIZE_BYTES: usize = 8 << 10; fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); + let size = ::std::mem::size_of::(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -40,95 +45,153 @@ impl FlatcontainerMerger { /// Helper to get pre-sized vector from the stash. #[inline] - fn empty(&self, stash: &mut Vec>) -> FlatStack { + fn empty(&self, stash: &mut Vec>) -> FlatStack { stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) } /// Helper to return a chunk to the stash. #[inline] - fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { + fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { - chunk.clear(); - stash.push(chunk); - } + // TODO: Ideally, we check that `chunk` has a shape (capacity) that fits, but flat + // containers don't have a concept of capacity on a `FlatStack`-level, only on individual + // regions. + chunk.clear(); + stash.push(chunk); } } /// Behavior to dissect items of chunks in the merge batcher -pub trait MergerChunk: Region { +pub trait RegionUpdate: Region { /// The key of the update - type Key<'a>: Ord where Self: 'a; + type Key<'a>: Copy + Ord where Self: 'a; /// The value of the update - type Val<'a>: Ord where Self: 'a; + type Val<'a>: Copy + Ord where Self: 'a; /// The time of the update - type Time<'a>: Ord where Self: 'a; + type Time<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::TimeOwned> where Self: 'a; /// The owned time type. type TimeOwned; /// The diff of the update - type Diff<'a> where Self: 'a; + type Diff<'a>: Copy + IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a; /// The owned diff type. type DiffOwned; /// Split a read item into its constituents. Must be cheap. fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); + + /// Converts a key into one with a narrower lifetime. + #[must_use] + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a; + + /// Converts a value into one with a narrower lifetime. + #[must_use] + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a; + + /// Converts a time into one with a narrower lifetime. + #[must_use] + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a; + + /// Converts a diff into one with a narrower lifetime. + #[must_use] + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a; } -impl MergerChunk for TupleABCRegion, T, R> +impl RegionUpdate for TupleABCRegion, TR, RR> where - K: Region, - for<'a> K::ReadItem<'a>: Ord, - V: Region, - for<'a> V::ReadItem<'a>: Ord, - T: Region, - for<'a> T::ReadItem<'a>: Ord, - R: Region, + KR: Region, + for<'a> KR::ReadItem<'a>: Copy + Ord, + VR: Region, + for<'a> VR::ReadItem<'a>: Copy + Ord, + TR: Region, + for<'a> TR::ReadItem<'a>: Copy + Ord, + RR: Region, + for<'a> RR::ReadItem<'a>: Copy + Ord, { - type Key<'a> = K::ReadItem<'a> where Self: 'a; - type Val<'a> = V::ReadItem<'a> where Self: 'a; - type Time<'a> = T::ReadItem<'a> where Self: 'a; - type TimeOwned = T::Owned; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; + type Key<'a> = KR::ReadItem<'a> where Self: 'a; + type Val<'a> = VR::ReadItem<'a> where Self: 'a; + type Time<'a> = TR::ReadItem<'a> where Self: 'a; + type TimeOwned = TR::Owned; + type Diff<'a> = RR::ReadItem<'a> where Self: 'a; + type DiffOwned = RR::Owned; fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { (key, val, time, diff) } + + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a + { + KR::reborrow(item) + } + + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a + { + VR::reborrow(item) + } + + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a + { + TR::reborrow(item) + } + + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a + { + RR::reborrow(item) + } } -impl Merger for FlatcontainerMerger +impl Merger for FlatcontainerMerger where - for<'a> MC: MergerChunk + Clone + 'static - + ReserveItems<::ReadItem<'a>> - + Push<::ReadItem<'a>> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>, - for<'a> MC::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=MC::TimeOwned>, - for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, - for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, - for<'a> MC::DiffOwned: Default + Semigroup + Semigroup> + Data, + for<'a> R: Region + + RegionUpdate + + Clone + + ReserveItems<::ReadItem<'a>> + + Push<::ReadItem<'a>> + + Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, &'a R::DiffOwned)> + + Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::Diff<'a>)> + + 'static, + for<'a> R::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=R::TimeOwned>, + for<'a> R::Diff<'a>: IntoOwned<'a, Owned = R::DiffOwned>, + for<'a> R::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, + for<'a> R::DiffOwned: Default + Semigroup + Semigroup> + Data, + S: IndexContainer<::Index> + Clone + 'static, { - type Time = MC::TimeOwned; - type Chunk = FlatStack; - type Output = FlatStack; + type Time = R::TimeOwned; + type Chunk = FlatStack; + type Output = FlatStack; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = >::from(list1.next().unwrap_or_default()); - let mut head2 = >::from(list2.next().unwrap_or_default()); + let mut head1 = >::from(list1.next().unwrap_or_default()); + let mut head2 = >::from(list2.next().unwrap_or_default()); let mut result = self.empty(stash); - let mut diff = MC::DiffOwned::default(); + let mut diff = R::DiffOwned::default(); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { + while (result.len() < self.chunk_capacity()) && !head1.is_empty() && !head2.is_empty() { let cmp = { - let (key1, val1, time1, _diff) = MC::into_parts(head1.peek()); - let (key2, val2, time2, _diff) = MC::into_parts(head2.peek()); + let (key1, val1, time1, _diff) = R::into_parts(head1.peek()); + let (key2, val2, time2, _diff) = R::into_parts(head2.peek()); ((key1, val1), time1).cmp(&((key2, val2), time2)) }; // TODO: The following less/greater branches could plausibly be a good moment for @@ -142,8 +205,8 @@ where result.copy(head2.pop()); } Ordering::Equal => { - let (key, val, time1, diff1) = MC::into_parts(head1.pop()); - let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop()); + let (key, val, time1, diff1) = R::into_parts(head1.pop()); + let (_key, _val, _time2, diff2) = R::into_parts(head2.pop()); diff1.clone_onto(&mut diff); diff.plus_equals(&diff2); if !diff.is_zero() { @@ -153,7 +216,7 @@ where } } - if result.capacity() == result.len() { + if result.len() == self.chunk_capacity() { output.push(result); result = self.empty(stash); } @@ -169,7 +232,7 @@ where } while !head1.is_empty() { - let advance = result.capacity() - result.len(); + let advance = self.chunk_capacity().saturating_sub(result.len()); let iter = head1.iter().take(advance); result.reserve_items(iter.clone()); for item in iter { @@ -187,7 +250,7 @@ where self.recycle(head1.done(), stash); while !head2.is_empty() { - let advance = result.capacity() - result.len(); + let advance = self.chunk_capacity().saturating_sub(result.len()); let iter = head2.iter().take(advance); result.reserve_items(iter.clone()); for item in iter { @@ -214,16 +277,16 @@ where let mut ready = self.empty(stash); for buffer in merged { - for (key, val, time, diff) in buffer.iter().map(MC::into_parts) { + for (key, val, time, diff) in buffer.iter().map(R::into_parts) { if upper.less_equal(&time) { frontier.insert_with(&time, |time| (*time).into_owned()); - if keep.len() == keep.capacity() && !keep.is_empty() { + if keep.len() == self.chunk_capacity() && !keep.is_empty() { kept.push(keep); keep = self.empty(stash); } keep.copy(((key, val), time, diff)); } else { - if ready.len() == ready.capacity() && !ready.is_empty() { + if ready.len() == self.chunk_capacity() && !ready.is_empty() { readied.push(ready); ready = self.empty(stash); } @@ -254,7 +317,7 @@ where { let mut prev_keyval = None; for buffer in chain.iter() { - for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) { + for (key, val, time, _diff) in buffer.iter().map(R::into_parts) { if !upper.less_equal(&time) { if let Some((p_key, p_val)) = prev_keyval { debug_assert!(p_key <= key); @@ -295,18 +358,27 @@ where } } -struct FlatStackQueue { - list: FlatStack, +struct FlatStackQueue +{ + list: FlatStack, head: usize, } -impl Default for FlatStackQueue { +impl Default for FlatStackQueue +where + R: Region, + S: IndexContainer<::Index>, +{ fn default() -> Self { - Self::from(Default::default()) + Self::from(FlatStack::default()) } } -impl FlatStackQueue { +impl FlatStackQueue +where + R: Region, + S: IndexContainer<::Index>, +{ fn pop(&mut self) -> R::ReadItem<'_> { self.head += 1; self.list.get(self.head - 1) @@ -316,11 +388,11 @@ impl FlatStackQueue { self.list.get(self.head) } - fn from(list: FlatStack) -> Self { + fn from(list: FlatStack) -> Self { FlatStackQueue { list, head: 0 } } - fn done(self) -> FlatStack { + fn done(self) -> FlatStack { self.list } diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e39522a7e..b9722b720 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -401,12 +401,13 @@ where mod flatcontainer { use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region}; - use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; + use timely::container::flatcontainer::impls::index::IndexContainer; use timely::progress::Timestamp; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; + use crate::trace::implementations::merge_batcher_flat::RegionUpdate; impl Update for FlatLayout where @@ -448,36 +449,31 @@ mod flatcontainer { type OffsetContainer = OffsetList; } - impl BuilderInput for FlatStack,T,R>> + impl BuilderInput for FlatStack where - K: Region + Clone + 'static, - V: Region + Clone + 'static, - T: Region + Clone + 'static, - R: Region + Clone + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, + R: RegionUpdate + Region + Clone + 'static, KBC: BatchContainer, VBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq>, - for<'a> VBC::ReadItem<'a>: PartialEq>, + for<'a> KBC::ReadItem<'a>: PartialEq>, + for<'a> VBC::ReadItem<'a>: PartialEq>, + S: IndexContainer + Clone + 'static, { - type Key<'a> = K::ReadItem<'a>; - type Val<'a> = V::ReadItem<'a>; - type Time = T::Owned; - type Diff = R::Owned; + type Key<'a> = R::Key<'a>; + type Val<'a> = R::Val<'a>; + type Time = R::TimeOwned; + type Diff = R::DiffOwned; - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + let (key, val, time, diff) = R::into_parts(item); (key, val, time.into_owned(), diff.into_owned()) } fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool { - KBC::reborrow(other) == K::reborrow(*this) + KBC::reborrow(other) == R::reborrow_key(*this) } fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { - VBC::reborrow(other) == V::reborrow(*this) + VBC::reborrow(other) == R::reborrow_val(*this) } } } @@ -620,12 +616,14 @@ pub mod containers { mod flatcontainer { use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::impls::index::IndexContainer; use crate::trace::implementations::BatchContainer; - impl BatchContainer for FlatStack + impl BatchContainer for FlatStack where for<'a> R: Region + Push<::ReadItem<'a>> + 'static, for<'a> R::ReadItem<'a>: Copy + Ord, + S: IndexContainer + Clone + 'static, { type Owned = R::Owned; type ReadItem<'a> = R::ReadItem<'a>; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 573e9c287..42e58ca66 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -17,7 +17,7 @@ use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunke use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; -use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk}; +use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, RegionUpdate}; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; @@ -42,10 +42,10 @@ pub type ColValSpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< +pub type FlatValSpine::Index>> = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, - RcBuilder>>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + RcBuilder>>, >; /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. @@ -74,7 +74,7 @@ pub type ColKeySpine = Spine< /// A trace implementation backed by flatcontainer storage. pub type FlatKeySpine = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, RcBuilder>>, >; diff --git a/tests/bfs.rs b/tests/bfs.rs index 58471947b..1d0b1ec08 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -300,12 +300,13 @@ where for<'a> G::Timestamp: PartialOrder<<::Region as Region>::ReadItem<'a>>, ::Region: Region + Push, for<'a> as RegionPreference>::Region: Region> + Push<< as RegionPreference>::Region as Region>::ReadItem<'a>>, - ::Region: Clone + Ord, + ::Region: Clone + Ord + for<'a> ReserveItems<&'a G::Timestamp>, for<'a> FlatProductRegion<::Region, MirrorRegion>: Push<&'a Product>, for<'a> ::Region, MirrorRegion> as Region>::ReadItem<'a>: Copy + Ord + Debug, Product: for<'a> PartialOrder<< as RegionPreference>::Region as Region>::ReadItem<'a>>, for<'a> < as RegionPreference>::Region as Region>::ReadItem<'a>: PartialOrder>, for<'a> as RegionPreference>::Region: ReserveItems<< as RegionPreference>::Region as Region>::ReadItem<'a>>, + for<'a> as RegionPreference>::Region: ReserveItems<&'a < as RegionPreference>::Region as Region>::ReadItem<'a>>, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0));