From 7c4eec9ae820b1643aaf7e364f0a42540c9c6524 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 13 Jun 2024 11:36:52 -0400 Subject: [PATCH] Adjust for timely TimelyDataflow/timely-dataflow#569 (#511) This fixes a potential bug in the consolidating container builder where it would not clear buffers that were exposed to the user. There is no obligation for the user to call clear, so call it when recycling. Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 58 +++++++++++++++--------------------- src/operators/consolidate.rs | 2 +- src/operators/join.rs | 19 +++++------- 3 files changed, 33 insertions(+), 46 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 1409da072..47573eed8 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -149,31 +149,32 @@ where { /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements. // TODO: Can we replace `multiple` by a bool? + #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { let preferred_capacity = >::preferred_capacity(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity)); + container.clear(); container.extend((&mut drain).take(preferred_capacity)); self.outbound.push_back(container); } } } -impl ContainerBuilder for ConsolidatingContainerBuilder> +impl PushInto

for ConsolidatingContainerBuilder> where D: Data, T: Data, R: Semigroup+'static, + Vec<(D, T, R)>: PushInto

, { - type Container = Vec<(D,T,R)>; - /// Push an element. /// /// Precondition: `current` is not allocated or has space for at least one element. #[inline] - fn push

(&mut self, item: P) where Self::Container: PushInto

{ + fn push_into(&mut self, item: P) { let preferred_capacity = >::preferred_capacity(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); @@ -184,13 +185,17 @@ where self.consolidate_and_flush_through(preferred_capacity); } } +} - fn push_container(&mut self, container: &mut Self::Container) { - for item in container.drain(..) { - self.push(item); - } - } +impl ContainerBuilder for ConsolidatingContainerBuilder> +where + D: Data, + T: Data, + R: Semigroup+'static, +{ + type Container = Vec<(D,T,R)>; + #[inline] fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> { if let Some(container) = self.outbound.pop_front() { self.empty.push(container); @@ -200,12 +205,15 @@ where } } + #[inline] fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> { - // Flush all - self.consolidate_and_flush_through(1); - // Remove all but two elements from the stash of empty to avoid memory leaks. We retain - // two to match `current` capacity. - self.empty.truncate(2); + if !self.current.is_empty() { + // Flush all + self.consolidate_and_flush_through(1); + // Remove all but two elements from the stash of empty to avoid memory leaks. We retain + // two to match `current` capacity. + self.empty.truncate(2); + } self.extract() } } @@ -281,13 +289,13 @@ mod tests { fn test_consolidating_container_builder() { let mut ccb = >>::default(); for _ in 0..1024 { - ccb.push((0, 0, 0)); + ccb.push_into((0, 0, 0)); } assert_eq!(ccb.extract(), None); assert_eq!(ccb.finish(), None); for i in 0..1024 { - ccb.push((i, 0, 1)); + ccb.push_into((i, 0, 1)); } let mut collected = Vec::default(); @@ -299,23 +307,5 @@ mod tests { for i in 0..1024 { assert_eq!((i, 0, 1), collected[i]); } - - ccb = Default::default(); - ccb.push_container(&mut Vec::default()); - assert_eq!(ccb.extract(), None); - assert_eq!(ccb.finish(), None); - - ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); - ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); - collected.clear(); - while let Some(container) = ccb.finish() { - collected.append(container); - } - // The output happens to be sorted, but it's not guaranteed. - consolidate_updates(&mut collected); - for i in 0..1024 { - assert_eq!((i, 0, 2), collected[i]); - } - } } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 715b96efc..4d89d7a86 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -101,7 +101,7 @@ where move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - output.session_with_builder(&time).give_container(&mut vector); + output.session_with_builder(&time).give_iterator(vector.drain(..)); }) } }) diff --git a/src/operators/join.rs b/src/operators/join.rs index d5723878e..cfa7cbb94 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -6,7 +6,7 @@ use std::cmp::Ordering; use timely::Container; -use timely::container::{ContainerBuilder, SizableContainer, PushInto}; +use timely::container::{ContainerBuilder, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::dataflow::{Scope, StreamCore}; @@ -329,16 +329,6 @@ pub struct EffortBuilder(pub std::cell::Cell, pub CB); impl ContainerBuilder for EffortBuilder { type Container = CB::Container; - #[inline] - fn push(&mut self, item: T) where Self::Container: SizableContainer + PushInto { - self.1.push(item) - } - - #[inline] - fn push_container(&mut self, container: &mut Self::Container) { - self.1.push_container(container) - } - #[inline] fn extract(&mut self) -> Option<&mut Self::Container> { let extracted = self.1.extract(); @@ -354,6 +344,13 @@ impl ContainerBuilder for EffortBuilder { } } +impl, D> PushInto for EffortBuilder { + #[inline] + fn push_into(&mut self, item: D) { + self.1.push_into(item); + } +} + /// An equijoin of two traces, sharing a common key type. /// /// This method exists to provide join functionality without opinions on the specific input types, keys and values,