Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
This fixes a potential bug in the consolidating container builder where it
would not clear buffers that were exposed to the user. There is no
obligation for the user to call clear, so call it when recycling.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 13, 2024
1 parent 4a0f7bb commit 7c4eec9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
58 changes: 24 additions & 34 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,32 @@ where
{
/// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
// TODO: Can we replace `multiple` by a bool?
#[cold]
fn consolidate_and_flush_through(&mut self, multiple: usize) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
consolidate_updates(&mut self.current);
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
while drain.peek().is_some() {
let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
container.clear();
container.extend((&mut drain).take(preferred_capacity));
self.outbound.push_back(container);
}
}
}

impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
Vec<(D, T, R)>: PushInto<P>,
{
type Container = Vec<(D,T,R)>;

/// Push an element.
///
/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push<P>(&mut self, item: P) where Self::Container: PushInto<P> {
fn push_into(&mut self, item: P) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
Expand All @@ -184,13 +185,17 @@ where
self.consolidate_and_flush_through(preferred_capacity);
}
}
}

fn push_container(&mut self, container: &mut Self::Container) {
for item in container.drain(..) {
self.push(item);
}
}
impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
{
type Container = Vec<(D,T,R)>;

#[inline]
fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
if let Some(container) = self.outbound.pop_front() {
self.empty.push(container);
Expand All @@ -200,12 +205,15 @@ where
}
}

#[inline]
fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
// Flush all
self.consolidate_and_flush_through(1);
// Remove all but two elements from the stash of empty to avoid memory leaks. We retain
// two to match `current` capacity.
self.empty.truncate(2);
if !self.current.is_empty() {
// Flush all
self.consolidate_and_flush_through(1);
// Remove all but two elements from the stash of empty to avoid memory leaks. We retain
// two to match `current` capacity.
self.empty.truncate(2);
}
self.extract()
}
}
Expand Down Expand Up @@ -281,13 +289,13 @@ mod tests {
fn test_consolidating_container_builder() {
let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
for _ in 0..1024 {
ccb.push((0, 0, 0));
ccb.push_into((0, 0, 0));
}
assert_eq!(ccb.extract(), None);
assert_eq!(ccb.finish(), None);

for i in 0..1024 {
ccb.push((i, 0, 1));
ccb.push_into((i, 0, 1));
}

let mut collected = Vec::default();
Expand All @@ -299,23 +307,5 @@ mod tests {
for i in 0..1024 {
assert_eq!((i, 0, 1), collected[i]);
}

ccb = Default::default();
ccb.push_container(&mut Vec::default());
assert_eq!(ccb.extract(), None);
assert_eq!(ccb.finish(), None);

ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1))));
ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1))));
collected.clear();
while let Some(container) = ccb.finish() {
collected.append(container);
}
// The output happens to be sorted, but it's not guaranteed.
consolidate_updates(&mut collected);
for i in 0..1024 {
assert_eq!((i, 0, 2), collected[i]);
}

}
}
2 changes: 1 addition & 1 deletion src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session_with_builder(&time).give_container(&mut vector);
output.session_with_builder(&time).give_iterator(vector.drain(..));
})
}
})
Expand Down
19 changes: 8 additions & 11 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::cmp::Ordering;
use timely::Container;

use timely::container::{ContainerBuilder, SizableContainer, PushInto};
use timely::container::{ContainerBuilder, PushInto};
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::{Scope, StreamCore};
Expand Down Expand Up @@ -329,16 +329,6 @@ pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
type Container = CB::Container;

#[inline]
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T> {
self.1.push(item)
}

#[inline]
fn push_container(&mut self, container: &mut Self::Container) {
self.1.push_container(container)
}

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
let extracted = self.1.extract();
Expand All @@ -354,6 +344,13 @@ impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
}
}

impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
#[inline]
fn push_into(&mut self, item: D) {
self.1.push_into(item);
}
}

/// An equijoin of two traces, sharing a common key type.
///
/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
Expand Down

0 comments on commit 7c4eec9

Please sign in to comment.