Skip to content

Commit

Permalink
Extract core merge algo into trait
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>

further generic seal

Signed-off-by: Moritz Hoffmann <[email protected]>

cleanup; use push_batches everywhere

Signed-off-by: Moritz Hoffmann <[email protected]>

oops

Signed-off-by: Moritz Hoffmann <[email protected]>

Batcher output is container

Signed-off-by: Moritz Hoffmann <[email protected]>

Merge batcher can accept containers

Signed-off-by: Moritz Hoffmann <[email protected]>

Builder::from_batches

Signed-off-by: Moritz Hoffmann <[email protected]>

Builder::push_batches takes a mutable reference of vec of c

Signed-off-by: Moritz Hoffmann <[email protected]>

relax trait bounds

Signed-off-by: Moritz Hoffmann <[email protected]>

More thoughs on types on streams

Signed-off-by: Moritz Hoffmann <[email protected]>

Internal merger batch type

Signed-off-by: Moritz Hoffmann <[email protected]>

remove seal

Signed-off-by: Moritz Hoffmann <[email protected]>

Merger input as associated type

Signed-off-by: Moritz Hoffmann <[email protected]>

Inline the sorter

Signed-off-by: Moritz Hoffmann <[email protected]>

No typedef

Signed-off-by: Moritz Hoffmann <[email protected]>

stash; documentation

Signed-off-by: Moritz Hoffmann <[email protected]>

stash mgmt

Signed-off-by: Moritz Hoffmann <[email protected]>

Better memory reuse

Signed-off-by: Moritz Hoffmann <[email protected]>

Columnated merger

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Apr 24, 2024
1 parent d18497c commit ca9d2e7
Show file tree
Hide file tree
Showing 9 changed files with 664 additions and 380 deletions.
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned, V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -303,7 +303,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned,V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
10 changes: 4 additions & 6 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -241,7 +241,6 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;
Expand Down Expand Up @@ -282,11 +281,10 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
let mut batches = vec![std::mem::take(&mut updates)];
let batch = Tr::Builder::from_batches(&mut batches, prev_frontier.borrow(), upper.borrow(), Antichain::from_elem(G::Timestamp::minimum()).borrow());
updates = batches.into_iter().next().unwrap_or_default();
prev_frontier.clone_from(&upper);

// Communicate `batch` to the arrangement and the stream.
Expand Down
23 changes: 17 additions & 6 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
/// .map(|x| (x, x))
/// .reduce_abelian::<_,_,ValSpine<_,_,_,_>>(
/// "Example",
/// Clone::clone,
/// Clone::clone,
/// move |_key, src, dst| dst.push((*src[0].0, 1))
/// )
/// .trace;
Expand All @@ -252,7 +252,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
Expand All @@ -274,7 +274,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
;
}
Expand All @@ -293,7 +293,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -312,7 +312,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned, V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -529,9 +529,20 @@ where
// (ii) that the buffers are time-ordered, and (iii) that the builders accept
// arbitrarily ordered times.
for index in 0 .. buffers.len() {
// TODO: This doesn't reuse allocations for `update`.
let mut update = Vec::with_capacity(1024);
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push(((key.into_owned(), val), time, diff));
update.push(((key.into_owned(), val), time, diff));
if update.len() == update.capacity() {
let mut chain = vec![update];
builders[index].push_batches(&mut chain);
update = chain.pop().unwrap_or_else(|| Vec::with_capacity(1024));
update.clear();
}
}
if !update.is_empty() {
builders[index].push_batches(&mut vec![update]);
}
}
}
Expand Down
Loading

0 comments on commit ca9d2e7

Please sign in to comment.