Skip to content

Commit

Permalink
Consolidator on batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 1, 2024
1 parent 44dd276 commit bc1d102
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 7 deletions.
97 changes: 92 additions & 5 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,61 @@ where
}
}

/// Behavior to sort containers.
pub trait ContainerSorter<C> {
/// Sort `container`, possibly replacing the contents by a different object.
fn sort(&mut self, container: &mut C);
}

/// A generic container sorter for containers where the item implements [`ConsolidateLayout`].
pub struct ExternalContainerSorter<C: Container> {
/// Storage to permute item.
permutation: Vec<C::Item<'static>>,
/// Empty container to write results at.
empty: C,
}

impl<C> ContainerSorter<C> for ExternalContainerSorter<C>
where
for<'a> C: Container + PushInto<C::Item<'a>>,
for<'a> C::Item<'a>: ConsolidateLayout<C>,
{
fn sort(&mut self, container: &mut C) {
// SAFETY: `Permutation` is empty, types are equal but have a different lifetime
let mut permutation: Vec<C::Item<'_>> = unsafe { std::mem::transmute::<Vec<C::Item<'static>>, Vec<C::Item<'_>>>(std::mem::take(&mut self.permutation)) };

permutation.extend(container.drain());
permutation.sort_by(|a, b| a.key().cmp(&b.key()));

for item in permutation.drain(..) {
self.empty.push(item);
}

// SAFETY: `Permutation` is empty, types are equal but have a different lifetime
self.permutation = unsafe { std::mem::transmute::<Vec<C::Item<'_>>, Vec<C::Item<'static>>>(permutation) };
std::mem::swap(container, &mut self.empty);
self.empty.clear();
}
}

/// Sort containers in-place, with specific implementations.
#[derive(Default, Debug)]
pub struct InPlaceSorter();

impl<T: Ord, R> ContainerSorter<Vec<(T, R)>> for InPlaceSorter {
#[inline]
fn sort(&mut self, container: &mut Vec<(T, R)>) {
container.sort_by(|(a, _), (b, _)| a.cmp(b));
}
}

impl<D: Ord, T: Ord, R> ContainerSorter<Vec<(D, T, R)>> for InPlaceSorter {
#[inline]
fn sort(&mut self, container: &mut Vec<(D, T, R)>) {
container.sort_by(|(d1, t1, _), (d2, t2, _)| (d1, t1).cmp(&(d2, t2)));
}
}

/// Layout of data to be consolidated. Generic over containers to enable `push`.
pub trait ConsolidateLayout<C> {
/// Key to consolidate and sort.
Expand Down Expand Up @@ -257,11 +312,43 @@ impl<D: Ord, R: Semigroup + Clone> ConsolidateLayout<Vec<(D,R)>> for (D, R) {
}
}

trait ConsolidateContainer{
fn consolidate_container(container: &mut Self, target: &mut Self);
impl<D: Ord, T: Ord, R: Semigroup + Clone> ConsolidateLayout<Vec<(D, T, R)>> for (D, T, R) {
type Key<'a> = (&'a D, &'a T) where Self: 'a;
type DiffOwned = R;

#[inline]
fn key(&self) -> Self::Key<'_> {
(&self.0, &self.1)
}

#[inline]
fn diff(&self) -> Self::DiffOwned {
self.2.clone()
}

#[inline]
fn diff_plus_equals(&self, target: &mut Self::DiffOwned) {
target.plus_equals(&self.2);
}

#[inline]
fn push(self, diff: Self::DiffOwned, target: &mut Vec<(D, T, R)>) {
target.push((self.0, self.1, diff));
}
}

/// Behavior for copying consolidation.
pub trait ConsolidateContainer<C> {
/// Consolidate the contents of `container` and write the result to `target`.
fn consolidate_container(container: &mut C, target: &mut C);
}

/// Container consolidator that requires the container's item to implement [`ConsolidateLayout`].
#[derive(Default, Debug)]
pub struct ContainerConsolidator{
}

impl<C> ConsolidateContainer for C
impl<C> ConsolidateContainer<C> for ContainerConsolidator
where
C: Container,
for<'a> C::Item<'a>: ConsolidateLayout<C>,
Expand Down Expand Up @@ -416,7 +503,7 @@ mod tests {
let mut data = vec![(1,1), (2, 1), (1, -1)];
let mut target = Vec::default();
data.sort();
ConsolidateContainer::consolidate_container(&mut data, &mut target);
ContainerConsolidator::consolidate_container(&mut data, &mut target);
assert_eq!(target, [(2,1)]);
}

Expand Down Expand Up @@ -444,7 +531,7 @@ mod tests {
data2.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
ConsolidateContainer::consolidate_container(&mut data, &mut target);
ContainerConsolidator::consolidate_container(&mut data, &mut target);
duration += start.elapsed();

consolidate(&mut data2);
Expand Down
8 changes: 7 additions & 1 deletion src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::{Container, PartialOrder};

use crate::consolidation::consolidate_updates;
use crate::consolidation::{consolidate_updates, ConsolidateContainer, ContainerConsolidator, ContainerSorter, InPlaceSorter};
use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder};
Expand Down Expand Up @@ -189,6 +189,10 @@ impl<M: Merger, T> Drop for MergeBatcher<M, T> {
pub trait Merger: Default {
/// The type of update containers received from inputs.
type Input;
/// TODO
type Sorter: ContainerSorter<Self::Input>;
/// TODO
type Consolidator: ConsolidateContainer<Self::Input>;
/// The internal representation of chunks of data.
type Chunk: Container;
/// The output type
Expand Down Expand Up @@ -283,6 +287,8 @@ where
type Input = Vec<((K, V), T, R)>;
type Chunk = Vec<((K, V), T, R)>;
type Output = Vec<((K, V), T, R)>;
type Sorter = InPlaceSorter;
type Consolidator = ContainerConsolidator;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down
4 changes: 3 additions & 1 deletion src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A general purpose `Batcher` implementation based on radix sort for TimelyStack.

use crate::consolidation::consolidate_updates;
use crate::consolidation::{consolidate_updates, ContainerConsolidator, InPlaceSorter};
use std::cmp::Ordering;
use timely::communication::message::RefOrMut;
use timely::container::columnation::{Columnation, TimelyStack};
Expand Down Expand Up @@ -68,6 +68,8 @@ where
type Input = Vec<((K, V), T, R)>;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = TimelyStack<((K, V), T, R)>;
type Sorter = InPlaceSorter;
type Consolidator = ContainerConsolidator;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down

0 comments on commit bc1d102

Please sign in to comment.