Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass data from batcher to builder by chunk #491

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ where

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;
use timely::container::PushInto;

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -292,7 +294,8 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -311,7 +314,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -282,9 +282,7 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
Expand Down
23 changes: 15 additions & 8 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! to the key and the list of values.
//! The function is expected to populate a list of output values.

use timely::Container;
use timely::container::PushInto;
use crate::hashable::Hashable;
use crate::{Data, ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian};
Expand Down Expand Up @@ -252,7 +254,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
Expand All @@ -274,7 +276,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
;
}
Expand All @@ -293,7 +295,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -312,7 +314,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -454,6 +457,8 @@ where
builders.push(T2::Builder::new());
}

let mut buffer = Default::default();

// cursors for navigating input and output traces.
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
let source_storage = &source_storage;
Expand Down Expand Up @@ -531,7 +536,9 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push(((key.into_owned(), val), time, diff));
((key.into_owned(), val), time, diff).push_into(&mut buffer);
builders[index].push(&mut buffer);
buffer.clear();
}
}
}
Expand Down Expand Up @@ -648,7 +655,7 @@ where
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -728,7 +735,7 @@ mod history_replay {
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -1020,7 +1027,7 @@ mod history_replay {
new_interesting.push(next_time.clone());
debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
}


// Update `meet` to track the meet of each source of times.
meet = None;//T::maximum();
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = Vec<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = Vec<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -497,8 +497,8 @@ where
}
let mut builder = B::with_capacity(keys, vals, upds);

for datum in chain.drain(..).flatten() {
builder.push(datum);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
9 changes: 3 additions & 6 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = TimelyStack<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -290,11 +290,8 @@ where
}
}
let mut builder = B::with_capacity(keys, vals, upds);

for chunk in chain.drain(..) {
for datum in chunk.iter() {
builder.copy(datum);
}
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
Loading