diff --git a/src/consolidation.rs b/src/consolidation.rs index 1409da072..47573eed8 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -149,31 +149,32 @@ where { /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements. // TODO: Can we replace `multiple` by a bool? + #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { let preferred_capacity = >::preferred_capacity(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity)); + container.clear(); container.extend((&mut drain).take(preferred_capacity)); self.outbound.push_back(container); } } } -impl ContainerBuilder for ConsolidatingContainerBuilder> +impl PushInto

for ConsolidatingContainerBuilder> where D: Data, T: Data, R: Semigroup+'static, + Vec<(D, T, R)>: PushInto

, { - type Container = Vec<(D,T,R)>; - /// Push an element. /// /// Precondition: `current` is not allocated or has space for at least one element. #[inline] - fn push

(&mut self, item: P) where Self::Container: PushInto

{ + fn push_into(&mut self, item: P) { let preferred_capacity = >::preferred_capacity(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); @@ -184,13 +185,17 @@ where self.consolidate_and_flush_through(preferred_capacity); } } +} - fn push_container(&mut self, container: &mut Self::Container) { - for item in container.drain(..) { - self.push(item); - } - } +impl ContainerBuilder for ConsolidatingContainerBuilder> +where + D: Data, + T: Data, + R: Semigroup+'static, +{ + type Container = Vec<(D,T,R)>; + #[inline] fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> { if let Some(container) = self.outbound.pop_front() { self.empty.push(container); @@ -200,12 +205,15 @@ where } } + #[inline] fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> { - // Flush all - self.consolidate_and_flush_through(1); - // Remove all but two elements from the stash of empty to avoid memory leaks. We retain - // two to match `current` capacity. - self.empty.truncate(2); + if !self.current.is_empty() { + // Flush all + self.consolidate_and_flush_through(1); + // Remove all but two elements from the stash of empty to avoid memory leaks. We retain + // two to match `current` capacity. + self.empty.truncate(2); + } self.extract() } } @@ -281,13 +289,13 @@ mod tests { fn test_consolidating_container_builder() { let mut ccb = >>::default(); for _ in 0..1024 { - ccb.push((0, 0, 0)); + ccb.push_into((0, 0, 0)); } assert_eq!(ccb.extract(), None); assert_eq!(ccb.finish(), None); for i in 0..1024 { - ccb.push((i, 0, 1)); + ccb.push_into((i, 0, 1)); } let mut collected = Vec::default(); @@ -299,23 +307,5 @@ mod tests { for i in 0..1024 { assert_eq!((i, 0, 1), collected[i]); } - - ccb = Default::default(); - ccb.push_container(&mut Vec::default()); - assert_eq!(ccb.extract(), None); - assert_eq!(ccb.finish(), None); - - ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); - ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); - collected.clear(); - while let Some(container) = ccb.finish() { - collected.append(container); - } - // The output happens to be sorted, but it's not guaranteed. - consolidate_updates(&mut collected); - for i in 0..1024 { - assert_eq!((i, 0, 2), collected[i]); - } - } } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 715b96efc..4d89d7a86 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -101,7 +101,7 @@ where move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - output.session_with_builder(&time).give_container(&mut vector); + output.session_with_builder(&time).give_iterator(vector.drain(..)); }) } }) diff --git a/src/operators/join.rs b/src/operators/join.rs index d5723878e..cfa7cbb94 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -6,7 +6,7 @@ use std::cmp::Ordering; use timely::Container; -use timely::container::{ContainerBuilder, SizableContainer, PushInto}; +use timely::container::{ContainerBuilder, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::dataflow::{Scope, StreamCore}; @@ -329,16 +329,6 @@ pub struct EffortBuilder(pub std::cell::Cell, pub CB); impl ContainerBuilder for EffortBuilder { type Container = CB::Container; - #[inline] - fn push(&mut self, item: T) where Self::Container: SizableContainer + PushInto { - self.1.push(item) - } - - #[inline] - fn push_container(&mut self, container: &mut Self::Container) { - self.1.push_container(container) - } - #[inline] fn extract(&mut self) -> Option<&mut Self::Container> { let extracted = self.1.extract(); @@ -354,6 +344,13 @@ impl ContainerBuilder for EffortBuilder { } } +impl, D> PushInto for EffortBuilder { + #[inline] + fn push_into(&mut self, item: D) { + self.1.push_into(item); + } +} + /// An equijoin of two traces, sharing a common key type. /// /// This method exists to provide join functionality without opinions on the specific input types, keys and values,