Skip to content

Commit

Permalink
Define flat container types in terms of regions (#514)
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 24, 2024
1 parent bda68cf commit 9fac226
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 86 deletions.
6 changes: 3 additions & 3 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ fn main() {
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpineDefault;
let data = data.arrange::<FlatKeySpineDefault<String,usize,isize, _>>();
let keys = keys.arrange::<FlatKeySpineDefault<String,usize,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
65 changes: 36 additions & 29 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@ use timely::{Container, Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};

use crate::difference::Semigroup;
use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;
use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks. `T` describes the
pub struct FlatcontainerMerger<T, R, MC> {
_marker: PhantomData<(T, R, MC)>,
/// A merger for flat stacks.
///
/// `MC` is a [`Region`] that implements [`MergerChunk`].
pub struct FlatcontainerMerger<MC> {
_marker: PhantomData<MC>,
}

impl<T, R, MC> Default for FlatcontainerMerger<T, R, MC> {
impl<MC> Default for FlatcontainerMerger<MC> {
fn default() -> Self {
Self { _marker: PhantomData, }
}
}

impl<T, R, MC: Region> FlatcontainerMerger<T, R, MC> {
impl<MC: Region> FlatcontainerMerger<MC> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<MC::Index>();
Expand Down Expand Up @@ -61,8 +63,12 @@ pub trait MergerChunk: Region {
type Val<'a>: Ord where Self: 'a;
/// The time of the update
type Time<'a>: Ord where Self: 'a;
/// The owned time type.
type TimeOwned;
/// The diff of the update
type Diff<'a> where Self: 'a;
/// The owned diff type.
type DiffOwned;

/// Split a read item into its constituents. Must be cheap.
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>);
Expand All @@ -81,47 +87,48 @@ where
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
}
}

impl<T, R, FR> Merger for FlatcontainerMerger<T, R, FR>
impl<MC> Merger for FlatcontainerMerger<MC>
where
for<'a> T: Ord + PartialOrder + PartialOrder<FR::Time<'a>> + Data,
for<'a> R: Default + Semigroup + Semigroup<FR::Diff<'a>> + Data,
for<'a> FR: MergerChunk + Clone + 'static
+ ReserveItems<<FR as Region>::ReadItem<'a>>
+ Push<<FR as Region>::ReadItem<'a>>
+ Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)>
+ Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, FR::Diff<'a>)>,
for<'a> FR::Time<'a>: PartialOrder<T> + Copy + IntoOwned<'a, Owned=T>,
for<'a> FR::Diff<'a>: IntoOwned<'a, Owned=R>,
for<'a> FR::ReadItem<'a>: std::fmt::Debug,
for<'a> MC: MergerChunk + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
for<'a> MC::DiffOwned: Default + Semigroup + Semigroup<MC::Diff<'a>> + Data,
{
type Time = T;
type Chunk = FlatStack<FR>;
type Output = FlatStack<FR>;
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = <FlatStackQueue<FR>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<FR>>::from(list2.next().unwrap_or_default());
let mut head1 = <FlatStackQueue<MC>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<MC>>::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

let mut diff = R::default();
let mut diff = MC::DiffOwned::default();

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = FR::into_parts(head1.peek());
let (key2, val2, time2, _diff) = FR::into_parts(head2.peek());
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
Expand All @@ -135,8 +142,8 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = FR::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = FR::into_parts(head2.pop());
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
Expand Down Expand Up @@ -207,7 +214,7 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(FR::into_parts) {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
Expand Down Expand Up @@ -247,7 +254,7 @@ where
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(FR::into_parts) {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
Expand Down
81 changes: 50 additions & 31 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ where
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
pub struct FlatLayout<U: Update> {
phantom: std::marker::PhantomData<U>,
/// A layout based on flat containers.
pub struct FlatLayout<K, V, T, R> {
phantom: std::marker::PhantomData<(K, V, T, R)>,
}

/// A type with a preferred container.
Expand Down Expand Up @@ -400,49 +400,68 @@ where
}

mod flatcontainer {
use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region};
use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use timely::progress::Timestamp;

use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update};

impl<U: Update> Layout for FlatLayout<U>
impl<K, V, T, R> Update for FlatLayout<K, V, T, R>
where
U::Key: Containerized,
for<'a> <U::Key as Containerized>::Region: Push<U::Key> + Push<<<U::Key as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Key as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Val: Containerized,
for<'a> <U::Val as Containerized>::Region: Push<U::Val> + Push<<<U::Val as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Val as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Time: Containerized,
<U::Time as Containerized>::Region: Region<Owned=U::Time>,
for<'a> <U::Time as Containerized>::Region: Push<U::Time> + Push<<<U::Time as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Time as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Diff: Containerized,
<U::Diff as Containerized>::Region: Region<Owned=U::Diff>,
for<'a> <U::Diff as Containerized>::Region: Push<U::Diff> + Push<<<U::Diff as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Diff as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
K: Region,
V: Region,
T: Region,
R: Region,
K::Owned: Ord + Clone + 'static,
V::Owned: Ord + Clone + 'static,
T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
R::Owned: Ord + Semigroup + 'static,
{
type Target = U;
type KeyContainer = FlatStack<<U::Key as Containerized>::Region>;
type ValContainer = FlatStack<<U::Val as Containerized>::Region>;
type TimeContainer = FlatStack<<U::Time as Containerized>::Region>;
type DiffContainer = FlatStack<<U::Diff as Containerized>::Region>;
type Key = K::Owned;
type Val = V::Owned;
type Time = T::Owned;
type Diff = R::Owned;
}

impl<K, V, T, R> Layout for FlatLayout<K, V, T, R>
where
K: Region + Push<<K as Region>::Owned> + for<'a> Push<<K as Region>::ReadItem<'a>> + 'static,
V: Region + Push<<V as Region>::Owned> + for<'a> Push<<V as Region>::ReadItem<'a>> + 'static,
T: Region + Push<<T as Region>::Owned> + for<'a> Push<<T as Region>::ReadItem<'a>> + 'static,
R: Region + Push<<R as Region>::Owned> + for<'a> Push<<R as Region>::ReadItem<'a>> + 'static,
K::Owned: Ord + Clone + 'static,
V::Owned: Ord + Clone + 'static,
T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
R::Owned: Ord + Semigroup + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
for<'a> V::ReadItem<'a>: Copy + Ord,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R::ReadItem<'a>: Copy + Ord,
{
type Target = Self;
type KeyContainer = FlatStack<K>;
type ValContainer = FlatStack<V>;
type TimeContainer = FlatStack<T>;
type DiffContainer = FlatStack<R>;
type OffsetContainer = OffsetList;
}

impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for FlatStack<TupleABCRegion<TupleABRegion<K,V>,T,R>>
where
K: Region + Clone + 'static,
V: Region + Clone + 'static,
T: Region + Clone + 'static,
R: Region + Clone + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
KBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> V: Region + Clone + 'static,
for<'a> V::ReadItem<'a>: Copy + Ord,
VBC: BatchContainer,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
for<'a> T: Region + Clone + 'static,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R: Region + Clone + 'static,
for<'a> R::ReadItem<'a>: Copy + Ord,
KBC: BatchContainer,
VBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
{
type Key<'a> = K::ReadItem<'a>;
type Val<'a> = V::ReadItem<'a>;
Expand Down
43 changes: 24 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
use std::rc::Rc;
use timely::container::columnation::{TimelyStack};
use timely::container::flatcontainer::{Containerized, FlatStack};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};

use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk};
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -41,15 +42,17 @@ pub type ColValSpine<K, V, T, R> = Spine<
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<K, V, T, R, C> = Spine<
Rc<OrdValBatch<FlatLayout<((K,V),T,R)>>>,
MergeBatcher<
C,
ContainerChunker<FlatStack<<((K,V),T,R) as Containerized>::Region>>,
FlatcontainerMerger<T, R, <((K,V),T,R) as Containerized>::Region>,
T,
>,
RcBuilder<OrdValBuilder<FlatLayout<((K,V),T,R)>, FlatStack<<((K,V),T,R) as Containerized>::Region>>>,
pub type FlatValSpine<L, R, C> = Spine<
Rc<OrdValBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
RcBuilder<OrdValBuilder<L, FlatStack<R>>>,
>;

/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout.
pub type FlatValSpineDefault<K, V, T, R, C> = FlatValSpine<
FlatLayout<<K as Containerized>::Region, <V as Containerized>::Region, <T as Containerized>::Region, <R as Containerized>::Region>,
TupleABCRegion<TupleABRegion<<K as Containerized>::Region, <V as Containerized>::Region>, <T as Containerized>::Region, <R as Containerized>::Region>,
C,
>;

/// A trace implementation using a spine of ordered lists.
Expand All @@ -69,15 +72,17 @@ pub type ColKeySpine<K, T, R> = Spine<
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<K, T, R, C> = Spine<
Rc<OrdKeyBatch<FlatLayout<((K,()),T,R)>>>,
MergeBatcher<
C,
ContainerChunker<FlatStack<<((K,()),T,R) as Containerized>::Region>>,
FlatcontainerMerger<T, R, <((K,()),T,R) as Containerized>::Region>,
T,
>,
RcBuilder<OrdKeyBuilder<FlatLayout<((K,()),T,R)>, FlatStack<<((K,()),T,R) as Containerized>::Region>>>,
pub type FlatKeySpine<L, R, C> = Spine<
Rc<OrdKeyBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>,
>;

/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout.
pub type FlatKeySpineDefault<K,T,R, C> = FlatKeySpine<
FlatLayout<<K as Containerized>::Region, <() as Containerized>::Region, <T as Containerized>::Region, <R as Containerized>::Region>,
TupleABCRegion<TupleABRegion<<K as Containerized>::Region, <() as Containerized>::Region>, <T as Containerized>::Region, <R as Containerized>::Region>,
C,
>;

/// A trace implementation backed by columnar storage.
Expand Down
8 changes: 4 additions & 4 deletions tests/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpine, FlatValSpine};
use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpineDefault, FlatValSpineDefault};

type Node = usize;
type Edge = (Node, Node);
Expand Down Expand Up @@ -246,8 +246,8 @@ fn bfs_differential_flat(
let (edge_input, edges) = scope.new_collection();

let c = bfs_flat(&edges, &roots).map(|(_, dist)| (dist, ()));
let arranged = c.arrange::<FlatKeySpine<usize, _, isize, Vec<((usize, ()), _, _)>>>();
type T2 = FlatValSpine<usize, isize, usize, isize, Vec<((usize, isize), usize, isize)>>;
let arranged = c.arrange::<FlatKeySpineDefault<usize, usize, isize, Vec<((usize, ()), _, _)>>>();
type T2 = FlatValSpineDefault<usize, isize, usize, isize, Vec<((usize, isize), usize, isize)>>;
let reduced = arranged.reduce_abelian::<_, _, _, T2>("Count", |_k, s, t| {
t.push((s[0].1.clone(), isize::from(1i8)))
});
Expand Down Expand Up @@ -315,7 +315,7 @@ where
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

type Spine<K, V, T, R = isize> = FlatValSpine<K, V, T, R, Vec<((K, V), T, R)>>;
type Spine<K, V, T, R = isize> = FlatValSpineDefault<K, V, T, R, Vec<((K, V), T, R)>>;
let arranged1 = inner.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
let arranged2 = edges.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
arranged1
Expand Down

0 comments on commit 9fac226

Please sign in to comment.