Skip to content

Commit

Permalink
Generic chunker, plus flatcontainer support
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 3, 2024
1 parent be698bc commit 0daf37c
Show file tree
Hide file tree
Showing 9 changed files with 1,046 additions and 239 deletions.
275 changes: 273 additions & 2 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.

use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
use crate::difference::Semigroup;
use crate::difference::{IsZero, Semigroup};
use crate::trace::cursor::IntoOwned;

/// Sorts and consolidates `vec`.
///
Expand Down Expand Up @@ -210,6 +215,216 @@ where
}
}

/// Behavior to sort containers.
pub trait ContainerSorter<C> {
/// Sort `container`, possibly replacing the contents by a different object.
fn sort(&mut self, target: &mut C);
}

/// A generic container sorter for containers where the item implements [`ConsolidateLayout`].
#[derive(Default)]
pub struct ExternalContainerSorter<C: Container> {
/// Storage to permute item.
permutation: Vec<C::Item<'static>>,
/// Empty container to write results at.
empty: C,
}

impl<C> ContainerSorter<C> for ExternalContainerSorter<C>
where
for<'a> C: ConsolidateLayout + PushInto<C::Item<'a>>,
{
fn sort(&mut self, container: &mut C) {
// SAFETY: `Permutation` is empty, types are equal but have a different lifetime
let mut permutation: Vec<C::Item<'_>> = unsafe { std::mem::transmute::<Vec<C::Item<'static>>, Vec<C::Item<'_>>>(std::mem::take(&mut self.permutation)) };

permutation.extend(container.drain());
// permutation.sort_by_key(|item| C::key(item));
permutation.sort_by(|a, b| C::cmp(a, b));

for item in permutation.drain(..) {
self.empty.push(item);
}

// SAFETY: `Permutation` is empty, types are equal but have a different lifetime
self.permutation = unsafe { std::mem::transmute::<Vec<C::Item<'_>>, Vec<C::Item<'static>>>(permutation) };
std::mem::swap(container, &mut self.empty);
self.empty.clear();
}
}

/// Sort containers in-place, with specific implementations.
#[derive(Default, Debug)]
pub struct InPlaceSorter();

impl<T, R> ContainerSorter<Vec<(T, R)>> for InPlaceSorter
where
T: Ord + Clone,
R: Clone,
{
#[inline]
fn sort(&mut self, container: &mut Vec<(T, R)>) {
container.sort_by(|(a, _), (b, _)| a.cmp(b));
}
}

impl<D, T, R> ContainerSorter<Vec<(D, T, R)>> for InPlaceSorter
where
D: Ord + Clone,
T: Ord + Clone,
R: Clone,
{
#[inline]
fn sort(&mut self, target: &mut Vec<(D, T, R)>) {
target.sort_by(|(d1, t1, _), (d2, t2, _)| (d1, t1).cmp(&(d2, t2)));
}
}

/// Layout of data to be consolidated.
// TODO: This could be split in two, to separate sorting and consolidation.
pub trait ConsolidateLayout: Container {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;

/// GAT diff type.
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;

/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Deconstruct an item into key and diff.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

/// Push an element to a compatible container.
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);

/// Compare two items by key to sort containers.
fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
}

impl<D, R> ConsolidateLayout for Vec<(D, R)>
where
D: Ord + Clone + 'static,
for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
{
type Key<'a> = D where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
item
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
item1.0.cmp(&item2.0)
}

fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned) {
self.push((key, diff));
}
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
}

fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}
}

impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
where
for<'a> K: Region + Push<<K as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> K::ReadItem<'a>: Ord + Copy,
for<'a> V: Region + Push<<V as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> V::ReadItem<'a>: Ord + Copy,
for<'a> T: Region + Push<<T as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> T::ReadItem<'a>: Ord + Copy,
R: Region + Push<<R as Region>::Owned> + Clone + 'static,
for<'a> R::Owned: Semigroup<R::ReadItem<'a>>,
{
type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((key, val, time), diff)
}

fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering {
(K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2)))
}

fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.copy(((key, value), time, diff));
}
}

/// Behavior for copying consolidation.
pub trait ConsolidateContainer<C> {
/// Consolidate the contents of `container` and write the result to `target`.
fn consolidate_container(container: &mut C, target: &mut C);
}

/// Container consolidator that requires the container's item to implement [`ConsolidateLayout`].
#[derive(Default, Debug)]
pub struct ContainerConsolidator;

impl<C> ConsolidateContainer<C> for ContainerConsolidator
where
C: ConsolidateLayout,
{
/// Consolidate the supplied container.
fn consolidate_container(container: &mut C, target: &mut C) {
let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None;
for item in container.drain() {
let (key, diff) = C::into_parts(item);
match &mut previous {
// Initial iteration.
None => previous = Some((key, diff.into_owned())),
Some((prevkey, d)) => {
// Second and following iteration.
if key == *prevkey {
d.plus_equals(&diff);
} else {
// Keys don't match, write down result if non-zero.
if !d.is_zero() {
// Unwrap because we checked for `Some` above.
let (prevkey, diff) = previous.take().unwrap();
target.push_with_diff(prevkey, diff);
}
// Update `previous`
previous = Some((key, diff.into_owned()));
}
}
}
}
// Write any residual data.
if let Some((previtem, d)) = previous {
if !d.is_zero() {
target.push_with_diff(previtem, d);
}
}
}
}



#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -316,6 +531,62 @@ mod tests {
for i in 0..1024 {
assert_eq!((i, 0, 2), collected[i]);
}

}

#[test]
fn test_consolidate_container() {
let mut data = vec![(1,1), (2, 1), (1, -1)];
let mut target = Vec::default();
data.sort();
ContainerConsolidator::consolidate_container(&mut data, &mut target);
assert_eq!(target, [(2,1)]);
}

#[cfg(not(debug_assertions))]
const LEN: usize = 256 << 10;
#[cfg(not(debug_assertions))]
const REPS: usize = 10 << 10;

#[cfg(debug_assertions)]
const LEN: usize = 256 << 1;
#[cfg(debug_assertions)]
const REPS: usize = 10 << 1;

#[test]
fn test_consolidator_duration() {
let mut data = Vec::with_capacity(LEN);
let mut data2 = Vec::with_capacity(LEN);
let mut target = Vec::new();
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data2.clear();
target.clear();
data.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize))));
data2.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
ContainerConsolidator::consolidate_container(&mut data, &mut target);
duration += start.elapsed();

consolidate(&mut data2);
assert_eq!(target, data2);
}
println!("elapsed consolidator {duration:?}");
}

#[test]
fn test_consolidator_duration_vec() {
let mut data = Vec::with_capacity(LEN);
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data.extend((0..LEN).map(|i| (i/4, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate(&mut data);
duration += start.elapsed();
}
println!("elapsed vec {duration:?}");
}
}
34 changes: 2 additions & 32 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,6 @@ where
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=C>,
;

/// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, C>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=C>,
;
}

impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
Expand All @@ -395,17 +386,7 @@ where
Tr::Batcher: Batcher<Input=Vec<((K, V), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,V),G::Timestamp,R)>>,
{
arrange_core(&self.inner, pact, name)
arrange_core(&self.inner, exchange, name)
}
}

Expand Down Expand Up @@ -583,18 +564,7 @@ where
Tr::Batcher: Batcher<Input=Vec<((K, ()), G::Timestamp, R)>>,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
}

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, Vec<((K,()),G::Timestamp,R)>>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Input=Vec<((K,()),G::Timestamp,R)>>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
arrange_core(&self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down
Loading

0 comments on commit 0daf37c

Please sign in to comment.