Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making stream joins extensible: A new Trait implementation for SHJ #8234

Merged
merged 7 commits into from
Nov 20, 2023
Merged

Making stream joins extensible: A new Trait implementation for SHJ #8234

merged 7 commits into from
Nov 20, 2023

Conversation

metesynnada
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

In our ongoing efforts to increase join use cases in our Datafusion, this PR introduces a trait: EagerJoinStream. This trait is designed to provide a more structured and efficient way to implement more join use cases in future, ensuring better maintainability and ease of use.

  • EagerJoinStream: This trait ensures that all join operations are evaluated eagerly, providing faster response times for scenarios where immediate results are required.

What changes are included in this PR?

This PR includes the implementation of the EagerJoinStream trait, along with necessary modifications to existing code to integrate these new traits. The changes are as follows:

  1. Implementation of EagerJoinStream: This trait is implemented to support eager evaluation of join operations.
  2. Code restructure: stream_hash_utils.rs introduced to separate responsibility on HJ and SHJ. It becomes easier to maintain.
  3. Integration and Refactoring: Existing code has been refactored and integrated with these new traits to ensure seamless operation and maintain compatibility.
  4. Proto support for SHJ: datafusion.proto file and ser/de features are updated to support SHJ in proto.

Changes are mostly code restructuring and proto implementations, instead of adding new functionality to the joins.

Are these changes tested?

Yes, comprehensive tests have been added to cover the new functionality introduced by these traits. The tests ensure the new features' correctness, performance, and reliability.

Are there any user-facing changes?

NA

@alamb
Copy link
Contributor

alamb commented Nov 16, 2023

I plan to review this PR, hopefully later today

@ozankabak
Copy link
Contributor

For context: This enables downstream users (like us) to implement various join algorithms without duplicating a bunch of code every time.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @metesynnada -- this looks like a nice refactoring to me. I had some suggestions on documentation and API design but I think they could also be done as follow on PRs

I had some questions:

  1. Do you plan to add new join implementations to DataFusion, and if so are your plans written anywhere? A trait is a nice way to keep specialized implementation in other crates as well.
  2. Is it possible to extend "eager join" to MergeJoin? Would that even be a good idea?
  3. I don't understand how always reading alternately from left and right inputs would work (I left a more detailed question / comment below)

datafusion/physical-plan/src/joins/stream_join_utils.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/joins/stream_join_utils.rs Outdated Show resolved Hide resolved
/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
/// ---------------------
/// ```
pub struct JoinHashMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to move this structure anyways perhaps we can put it into its own module (e.g datafusion/physical-plan/src/joins/join_hash_map.rs or something)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only struct that hash join uses different from other joins, this is why I put it under utils. I can create a new folder as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it will be a single struct in the new file. We can split more similar structs into a new file in future.

/// # Returns
///
/// * `Result<StreamJoinStateResult<Option<RecordBatch>>>` - The state result after pulling the batch.
async fn fetch_next_from_right_stream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat confused by this default implementation as it implies that join will always "ping pong" back and forth between fetching left and right inputs, while in realty I think the details of how the stream is implemented and how the join keys are distributed across batches could require fetching multiple batches from one (or both) inputs before progress can be made.

I am thinking of a join on a = b where all the rows in the batch have the same join key, for example:

Batch 1

a
100
100

Batch 2

a
100
100

Batch 3

a
100
200

Wouldn't the symmetric hash join have to read all three batches to find the next join key (200) before reading a batch from the other input / producing output?

I didn't see how the symmetric hash handles this case, so I must be missing something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and previous implementations buffer rows on both sides, adhering to deletion criteria set by the interval library; this aspect remains unchanged.

The core proposal focuses on retaining control over the SendableRecordBatch streams instead of merging them with futures::select. Additionally, there are plans to develop more efficient yielding strategies in the future. The suggested alternation strategy is expected to advance into sophisticated load-balancing techniques. This characteristic is fundamental to the realization of these advanced features.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and previous implementations buffer rows on both sides, adhering to deletion criteria set by the interval library; this aspect remains unchanged.

I agree this PR doesn't seem to change any behavior.

Additionally, there are plans to develop more efficient yielding strategies in the future. The suggested alternation strategy is expected to advance into sophisticated load-balancing techniques.

Are these plans described anywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these plans described anywhere?

Not yet, but hopefully soon. We will continue publishing blog posts about this stuff in Datafusion and will talk about future goodies there :)

@metesynnada
Copy link
Contributor Author

Thank you @metesynnada -- this looks like a nice refactoring to me. I had some suggestions on documentation and API design but I think they could also be done as follow on PRs

I had some questions:

  1. Do you plan to add new join implementations to DataFusion, and if so are your plans written anywhere? A trait is a nice way to keep specialized implementation in other crates as well.
  2. Is it possible to extend "eager join" to MergeJoin? Would that even be a good idea?
  3. I don't understand how always reading alternately from left and right inputs would work (I left a more detailed question / comment below)
  1. While we have additional joins planned for our roadmap, we haven't announced these publicly yet.
  2. Thank you for the suggestion – it's a great idea!
  3. To reiterate, our current implementation of streaming joins is just the beginning. The polling mechanism you see now is foundational, serving as a template for more advanced features and enhancements we plan to introduce moving forward.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing @alamb. We enriched the comments per your suggestions and will short open an issue to track the work on SortMergeJoin, which will be addressed in a follow-on PR.

@metesynnada
Copy link
Contributor Author

SortMergeJoin: #8273

@ozankabak ozankabak merged commit 2156dde into apache:main Nov 20, 2023
22 checks passed
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the improved comments 🙏

/// Represents the asynchronous trait for an eager join stream.
/// This trait defines the core methods for handling asynchronous join operations
/// between two streams (left and right).
/// `EagerJoinStream` is an asynchronous trait designed for managing incremental join operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants