Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define flat container types in terms of regions #514

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ fn main() {
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpineDefault;
let data = data.arrange::<FlatKeySpineDefault<String,usize,isize, _>>();
let keys = keys.arrange::<FlatKeySpineDefault<String,usize,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
65 changes: 36 additions & 29 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@ use timely::{Container, Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};

use crate::difference::Semigroup;
use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;
use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks. `T` describes the
pub struct FlatcontainerMerger<T, R, MC> {
_marker: PhantomData<(T, R, MC)>,
/// A merger for flat stacks.
///
/// `MC` is a [`Region`] that implements [`MergerChunk`].
pub struct FlatcontainerMerger<MC> {
_marker: PhantomData<MC>,
}

impl<T, R, MC> Default for FlatcontainerMerger<T, R, MC> {
impl<MC> Default for FlatcontainerMerger<MC> {
fn default() -> Self {
Self { _marker: PhantomData, }
}
}

impl<T, R, MC: Region> FlatcontainerMerger<T, R, MC> {
impl<MC: Region> FlatcontainerMerger<MC> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<MC::Index>();
Expand Down Expand Up @@ -61,8 +63,12 @@ pub trait MergerChunk: Region {
type Val<'a>: Ord where Self: 'a;
/// The time of the update
type Time<'a>: Ord where Self: 'a;
/// The owned time type.
type TimeOwned;
/// The diff of the update
type Diff<'a> where Self: 'a;
/// The owned diff type.
type DiffOwned;

/// Split a read item into its constituents. Must be cheap.
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>);
Expand All @@ -81,47 +87,48 @@ where
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
}
}

impl<T, R, FR> Merger for FlatcontainerMerger<T, R, FR>
impl<MC> Merger for FlatcontainerMerger<MC>
where
for<'a> T: Ord + PartialOrder + PartialOrder<FR::Time<'a>> + Data,
for<'a> R: Default + Semigroup + Semigroup<FR::Diff<'a>> + Data,
for<'a> FR: MergerChunk + Clone + 'static
+ ReserveItems<<FR as Region>::ReadItem<'a>>
+ Push<<FR as Region>::ReadItem<'a>>
+ Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)>
+ Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, FR::Diff<'a>)>,
for<'a> FR::Time<'a>: PartialOrder<T> + Copy + IntoOwned<'a, Owned=T>,
for<'a> FR::Diff<'a>: IntoOwned<'a, Owned=R>,
for<'a> FR::ReadItem<'a>: std::fmt::Debug,
for<'a> MC: MergerChunk + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
for<'a> MC::DiffOwned: Default + Semigroup + Semigroup<MC::Diff<'a>> + Data,
{
type Time = T;
type Chunk = FlatStack<FR>;
type Output = FlatStack<FR>;
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = <FlatStackQueue<FR>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<FR>>::from(list2.next().unwrap_or_default());
let mut head1 = <FlatStackQueue<MC>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<MC>>::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

let mut diff = R::default();
let mut diff = MC::DiffOwned::default();

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = FR::into_parts(head1.peek());
let (key2, val2, time2, _diff) = FR::into_parts(head2.peek());
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
Expand All @@ -135,8 +142,8 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = FR::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = FR::into_parts(head2.pop());
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
Expand Down Expand Up @@ -207,7 +214,7 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(FR::into_parts) {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
Expand Down Expand Up @@ -247,7 +254,7 @@ where
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(FR::into_parts) {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
Expand Down
81 changes: 50 additions & 31 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ where
type OffsetContainer = OffsetList;
}

/// A layout based on timely stacks
pub struct FlatLayout<U: Update> {
phantom: std::marker::PhantomData<U>,
/// A layout based on flat containers.
pub struct FlatLayout<K, V, T, R> {
phantom: std::marker::PhantomData<(K, V, T, R)>,
}

/// A type with a preferred container.
Expand Down Expand Up @@ -400,49 +400,68 @@ where
}

mod flatcontainer {
use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region};
use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use timely::progress::Timestamp;

use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update};

impl<U: Update> Layout for FlatLayout<U>
impl<K, V, T, R> Update for FlatLayout<K, V, T, R>
where
U::Key: Containerized,
for<'a> <U::Key as Containerized>::Region: Push<U::Key> + Push<<<U::Key as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Key as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Val: Containerized,
for<'a> <U::Val as Containerized>::Region: Push<U::Val> + Push<<<U::Val as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Val as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Time: Containerized,
<U::Time as Containerized>::Region: Region<Owned=U::Time>,
for<'a> <U::Time as Containerized>::Region: Push<U::Time> + Push<<<U::Time as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Time as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
U::Diff: Containerized,
<U::Diff as Containerized>::Region: Region<Owned=U::Diff>,
for<'a> <U::Diff as Containerized>::Region: Push<U::Diff> + Push<<<U::Diff as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<U::Diff as Containerized>::Region as Region>::ReadItem<'a>: Copy + Ord,
K: Region,
V: Region,
T: Region,
R: Region,
K::Owned: Ord + Clone + 'static,
V::Owned: Ord + Clone + 'static,
T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
R::Owned: Ord + Semigroup + 'static,
{
type Target = U;
type KeyContainer = FlatStack<<U::Key as Containerized>::Region>;
type ValContainer = FlatStack<<U::Val as Containerized>::Region>;
type TimeContainer = FlatStack<<U::Time as Containerized>::Region>;
type DiffContainer = FlatStack<<U::Diff as Containerized>::Region>;
type Key = K::Owned;
type Val = V::Owned;
type Time = T::Owned;
type Diff = R::Owned;
}

impl<K, V, T, R> Layout for FlatLayout<K, V, T, R>
where
K: Region + Push<<K as Region>::Owned> + for<'a> Push<<K as Region>::ReadItem<'a>> + 'static,
V: Region + Push<<V as Region>::Owned> + for<'a> Push<<V as Region>::ReadItem<'a>> + 'static,
T: Region + Push<<T as Region>::Owned> + for<'a> Push<<T as Region>::ReadItem<'a>> + 'static,
R: Region + Push<<R as Region>::Owned> + for<'a> Push<<R as Region>::ReadItem<'a>> + 'static,
K::Owned: Ord + Clone + 'static,
V::Owned: Ord + Clone + 'static,
T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
R::Owned: Ord + Semigroup + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
for<'a> V::ReadItem<'a>: Copy + Ord,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R::ReadItem<'a>: Copy + Ord,
{
type Target = Self;
type KeyContainer = FlatStack<K>;
type ValContainer = FlatStack<V>;
type TimeContainer = FlatStack<T>;
type DiffContainer = FlatStack<R>;
type OffsetContainer = OffsetList;
}

impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for FlatStack<TupleABCRegion<TupleABRegion<K,V>,T,R>>
where
K: Region + Clone + 'static,
V: Region + Clone + 'static,
T: Region + Clone + 'static,
R: Region + Clone + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
KBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> V: Region + Clone + 'static,
for<'a> V::ReadItem<'a>: Copy + Ord,
VBC: BatchContainer,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
for<'a> T: Region + Clone + 'static,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R: Region + Clone + 'static,
for<'a> R::ReadItem<'a>: Copy + Ord,
KBC: BatchContainer,
VBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
{
type Key<'a> = K::ReadItem<'a>;
type Val<'a> = V::ReadItem<'a>;
Expand Down
43 changes: 24 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
use std::rc::Rc;
use timely::container::columnation::{TimelyStack};
use timely::container::flatcontainer::{Containerized, FlatStack};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};

use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk};
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -41,15 +42,17 @@ pub type ColValSpine<K, V, T, R> = Spine<
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<K, V, T, R, C> = Spine<
Rc<OrdValBatch<FlatLayout<((K,V),T,R)>>>,
MergeBatcher<
C,
ContainerChunker<FlatStack<<((K,V),T,R) as Containerized>::Region>>,
FlatcontainerMerger<T, R, <((K,V),T,R) as Containerized>::Region>,
T,
>,
RcBuilder<OrdValBuilder<FlatLayout<((K,V),T,R)>, FlatStack<<((K,V),T,R) as Containerized>::Region>>>,
pub type FlatValSpine<L, R, C> = Spine<
Rc<OrdValBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
RcBuilder<OrdValBuilder<L, FlatStack<R>>>,
>;

/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout.
pub type FlatValSpineDefault<K, V, T, R, C> = FlatValSpine<
FlatLayout<<K as Containerized>::Region, <V as Containerized>::Region, <T as Containerized>::Region, <R as Containerized>::Region>,
TupleABCRegion<TupleABRegion<<K as Containerized>::Region, <V as Containerized>::Region>, <T as Containerized>::Region, <R as Containerized>::Region>,
C,
>;

/// A trace implementation using a spine of ordered lists.
Expand All @@ -69,15 +72,17 @@ pub type ColKeySpine<K, T, R> = Spine<
>;

/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<K, T, R, C> = Spine<
Rc<OrdKeyBatch<FlatLayout<((K,()),T,R)>>>,
MergeBatcher<
C,
ContainerChunker<FlatStack<<((K,()),T,R) as Containerized>::Region>>,
FlatcontainerMerger<T, R, <((K,()),T,R) as Containerized>::Region>,
T,
>,
RcBuilder<OrdKeyBuilder<FlatLayout<((K,()),T,R)>, FlatStack<<((K,()),T,R) as Containerized>::Region>>>,
pub type FlatKeySpine<L, R, C> = Spine<
Rc<OrdKeyBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>,
>;

/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout.
pub type FlatKeySpineDefault<K,T,R, C> = FlatKeySpine<
FlatLayout<<K as Containerized>::Region, <() as Containerized>::Region, <T as Containerized>::Region, <R as Containerized>::Region>,
TupleABCRegion<TupleABRegion<<K as Containerized>::Region, <() as Containerized>::Region>, <T as Containerized>::Region, <R as Containerized>::Region>,
C,
>;

/// A trace implementation backed by columnar storage.
Expand Down
8 changes: 4 additions & 4 deletions tests/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpine, FlatValSpine};
use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpineDefault, FlatValSpineDefault};

type Node = usize;
type Edge = (Node, Node);
Expand Down Expand Up @@ -246,8 +246,8 @@ fn bfs_differential_flat(
let (edge_input, edges) = scope.new_collection();

let c = bfs_flat(&edges, &roots).map(|(_, dist)| (dist, ()));
let arranged = c.arrange::<FlatKeySpine<usize, _, isize, Vec<((usize, ()), _, _)>>>();
type T2 = FlatValSpine<usize, isize, usize, isize, Vec<((usize, isize), usize, isize)>>;
let arranged = c.arrange::<FlatKeySpineDefault<usize, usize, isize, Vec<((usize, ()), _, _)>>>();
type T2 = FlatValSpineDefault<usize, isize, usize, isize, Vec<((usize, isize), usize, isize)>>;
let reduced = arranged.reduce_abelian::<_, _, _, T2>("Count", |_k, s, t| {
t.push((s[0].1.clone(), isize::from(1i8)))
});
Expand Down Expand Up @@ -315,7 +315,7 @@ where
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

type Spine<K, V, T, R = isize> = FlatValSpine<K, V, T, R, Vec<((K, V), T, R)>>;
type Spine<K, V, T, R = isize> = FlatValSpineDefault<K, V, T, R, Vec<((K, V), T, R)>>;
let arranged1 = inner.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
let arranged2 = edges.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
arranged1
Expand Down