Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge batcher generic over containers #474

Merged
merged 5 commits into from
Apr 26, 2024

Conversation

antiguru
Copy link
Member

@antiguru antiguru commented Apr 17, 2024

Merge batcher that's generic over input containers and internal chains, with specific implementations.

Ideas

At the moment, a merge batcher receives a stream of vectors. It consolidates the input vectors, and inserts them into its queue structure. When sealing, it extracts ready data and presents it record-by-record to the builder. It inserts future updates into its queue.

This introduces several opportunities to introduce containers:

  • The input the batcher: The stream of updates received from a timely operator. The batcher requires each batch to be consolidated, i.e., sorted and differences for same data accumulated. It can either take consolidated data, or consolidate it itself. We're missing an abstraction that allows the batcher to consolidate containers, and often containers are read-only, or we only get to look at the data.
    • We could sort by permuting a Vec<usize> and copying into a new container in sorted order.
    • Consolidating requires addition, which either requires storage or mutability, which is odd for read-only containers. Not sure what to do here.
  • The internal queue structure. We need to traverse the elements in order, and compare on data and time, accumulating diffs as we go. This requires specific knowledge about items of a container, which motivates moving the bulk of the implementation behind a trait to avoid generic parameters.
  • Ready data presented to the builder. We have an opportunity to present all data to the builder at once, which relieves it from incrementally building batches. It unlocks "looking ahead" in the input data.

@antiguru antiguru force-pushed the container_merge_batcher branch 4 times, most recently from fc409d5 to ca9d2e7 Compare April 24, 2024 19:25
This change splits the default merge batcher implementation into a type
that maintains the outer part of its algorithm, specifically knows how
to maintain chains, and an inner part that knows how to maintain the
individual batches in chains. The benefit is that the outer part does
not need to know about the contents of the containers it holds on to
because that's encapsulated in the inner trait's implementation.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru changed the title WIP: Merge batcher generic over containers Merge batcher generic over containers Apr 24, 2024
@antiguru antiguru marked this pull request as ready for review April 24, 2024 19:54
Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally looks good! I left some comments from our review, one of which is correctness-y, but others are nits that we can clean up as you like. On bonus ask is that perhaps we could find a name other than Batch to avoid clashing with pre-existing uses. We discussed Block or Chunk, neither of which are especially more insightful .. but if another name presents itself amazing! :D

src/trace/implementations/merge_batcher.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher.rs Outdated Show resolved Hide resolved
for mut buffer in merged {
for (data, time, diff) in buffer.drain(..) {
if upper.less_equal(&time) {
frontier.insert(time.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider insert_ref here to avoid a clone! :D

src/trace/implementations/merge_batcher_col.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher_col.rs Outdated Show resolved Hide resolved
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Copy link
Member

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read through, and it all seems plausible! Hard to be 100% certain, but it seems like a great path forward.

let form_chain = |this: &mut Self, final_chain: &mut Vec<Self::Chunk>, stash: &mut _| {
if this.pending.len() == this.pending.capacity() {
consolidate_updates(&mut this.pending);
if this.pending.len() > this.pending.capacity() / 2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but I think this can be >=. More generally, I think we are looking for this.pending.len() >= this.chunk_capacity(), if we ever end up not maintaining exactly twice the capacity. Idk if it's worth switching over to reveal the intent. If that is the intent (I inferred it, but it could be wrong).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, and I had the same hunch at some point. Changed it to what you suggest, because it seems to be easier to reason about. The amount of data we compact isn't affected by this because we'll merge the chains at some point anyways.

src/trace/implementations/merge_batcher_col.rs Outdated Show resolved Hide resolved
Signed-off-by: Moritz Hoffmann <[email protected]>
/// TODO
type Time;
/// TODO
fn accept(&mut self, batch: RefOrMut<C>, stash: &mut Vec<Self::Batch>) -> Self::Batch;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type should probably be an iterator over batches.

src/trace/implementations/merge_batcher.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher_col.rs Outdated Show resolved Hide resolved
src/trace/implementations/merge_batcher_col.rs Outdated Show resolved Hide resolved
let form_chain = |this: &mut Self, final_chain: &mut Vec<Self::Chunk>, stash: &mut _| {
if this.pending.len() == this.pending.capacity() {
consolidate_updates(&mut this.pending);
if this.pending.len() > this.pending.capacity() / 2 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, and I had the same hunch at some point. Changed it to what you suggest, because it seems to be easier to reason about. The amount of data we compact isn't affected by this because we'll merge the chains at some point anyways.

@antiguru antiguru merged commit b281e50 into TimelyDataflow:master Apr 26, 2024
7 checks passed
@antiguru antiguru deleted the container_merge_batcher branch May 7, 2024 19:08
This was referenced Oct 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants