Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making stream joins extensible: A new Trait implementation for SHJ #8234

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 90 additions & 6 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,26 @@ pub fn record_visited_indices<T: ArrowPrimitiveType>(
}
}

/// `handle_state` macro simplifies the handling of state results within a stream join operation.
///
/// This macro is designed to process the result of a state-changing operation, typically
/// encountered in implementations of `EagerJoinStream`. It handles the `StreamJoinStateResult`
/// by matching its variants and executing corresponding actions. This macro is used to
/// streamline the code that deals with these state transitions, reducing boilerplate and
/// improving readability.
///
/// # Usage
///
/// - `Ok(StreamJoinStateResult::Continue)`: Continues the loop, indicating the stream join
/// operation should proceed to the next step.
/// - `Ok(StreamJoinStateResult::Ready(res))`: Returns a `Poll::Ready` with the result, either
/// yielding a value or indicating the stream is awaiting more data.
/// - `Err(e)`: Returns a `Poll::Ready` containing an error, signaling an issue during the stream
/// join operation.
///
/// # Arguments
///
/// * `$match_case`: An expression that evaluates to a `Result<StreamJoinStateResult<_>>`.
#[macro_export]
macro_rules! handle_state {
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
($match_case:expr) => {
Expand All @@ -628,14 +648,43 @@ macro_rules! handle_state {
};
}

/// `handle_async_state` macro extends the `handle_state` macro for asynchronous contexts.
///
/// This macro adapts the `handle_state` macro for use in asynchronous operations, particularly
/// when dealing with `Poll` results within async traits like `EagerJoinStream`. It ensures
/// that the asynchronous state-changing function is polled and then passes the result to
/// `handle_state` for further processing.
///
/// # Usage
///
/// The macro first polls the provided asynchronous function using `poll_unpin` and then
/// processes the result using the `handle_state` macro.
///
/// # Arguments
///
/// * `$state_func`: An async function or future that returns a `Result<StreamJoinStateResult<_>>`.
/// * `$cx`: The context to be passed for polling, usually of type `&mut Context`.
///
#[macro_export]
macro_rules! handle_async_state {
($state_func:expr, $cx:expr) => {
$crate::handle_state!(ready!($state_func.poll_unpin($cx)))
};
}

/// Represents the possible results of a state in the join stream.
/// Represents the result of a stateful operation on `EagerJoinStream`.
///
/// This enum is typically used in scenarios where a stream join operation is performed.
/// It indicates whether the state produced a result that
/// is ready to be used (`Ready`) or if the operation requires continuation (`Continue`).
///
/// Variants:
/// - `Ready(T)`: Indicates that the operation has completed and produced a result of type `T`.
/// This variant holds the result that can be used immediately in further processing.
/// - `Continue`: Indicates that the operation is not yet complete and requires further
/// processing or more data. When this variant is returned, it typically means that the
/// current invocation of the state did not produce a final result, and the operation
/// should be invoked again later with more data and possibly with a different state.
pub enum StreamJoinStateResult<T> {
metesynnada marked this conversation as resolved.
Show resolved Hide resolved
Ready(T),
Continue,
Expand Down Expand Up @@ -669,9 +718,44 @@ pub enum EagerJoinStreamState {
BothExhausted { final_result: bool },
}

/// Represents the asynchronous trait for an eager join stream.
/// This trait defines the core methods for handling asynchronous join operations
/// between two streams (left and right).
/// `EagerJoinStream` is an asynchronous trait designed for managing incremental join operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// between two streams, such as those used in `SymmetricHashJoinExec` and `SortMergeJoinExec`.
/// Unlike traditional join approaches that require one side of the join to be fully materialized
/// before proceeding, `EagerJoinStream` facilitates more dynamic join operations by working with
/// streams as they emit data. This approach allows for more efficient processing, particularly in
/// scenarios where waiting for complete data materialization is not feasible or optimal. The trait
/// provides a robust framework for handling various states of the join process, ensuring that join
/// logic is efficiently executed as data becomes available from either stream.
///
/// Implementors of this trait can perform eager joins of data from two different asynchronous
/// streams, typically referred to as left and right streams. The trait provides a comprehensive
/// set of methods to control and execute the join process, leveraging the states defined in
/// `EagerJoinStreamState`. Methods are primarily focused on asynchronously fetching data batches
/// from each stream, processing them, and managing transitions between various states of the join.
///
/// This trait's default implementations use a state machine approach to navigate through
/// different stages of the join operation, handling data from both streams and determining
/// the completion of the join.
///
/// State Transitions:
/// - From `PullLeft` to `PullRight` or `LeftExhausted`:
/// - In `fetch_next_from_left_stream`, when fetching a batch from the left stream:
/// - On success (`Some(Ok(batch))`), state transitions to `PullRight` for processing the batch.
/// - On error (`Some(Err(e))`), the error is returned, and the state remains unchanged.
/// - On no data (`None`), state changes to `LeftExhausted`, returning `Continue` to proceed with the join process.
/// - From `PullRight` to `PullLeft` or `RightExhausted`:
/// - In `fetch_next_from_right_stream`, when fetching from the right stream:
/// - If a batch is available, state changes to `PullLeft` for processing.
/// - On error, the error is returned without changing the state.
/// - If right stream is exhausted (`None`), state transitions to `RightExhausted`, with a `Continue` result.
/// - Handling `RightExhausted` and `LeftExhausted`:
/// - Methods `handle_right_stream_end` and `handle_left_stream_end` manage scenarios when streams are exhausted:
/// - They attempt to continue processing with the other stream.
/// - If both streams are exhausted, state changes to `BothExhausted { final_result: false }`.
/// - Transition to `BothExhausted { final_result: true }`:
/// - Occurs in `prepare_for_final_results_after_exhaustion` when both streams are exhausted,
/// indicating completion of processing and availability of final results.
///
#[async_trait]
pub trait EagerJoinStream {
/// Implements the main polling logic for the join stream.
Expand Down Expand Up @@ -719,7 +803,7 @@ pub trait EagerJoinStream {
};
}
}
/// Asynchronously pulls the next batch from the right (probe) stream.
/// Asynchronously pulls the next batch from the right stream.
///
/// This default implementation checks for the next value in the right stream.
/// If a batch is found, the state is switched to `PullLeft`, and the batch handling
Expand All @@ -744,7 +828,7 @@ pub trait EagerJoinStream {
}
}

/// Asynchronously pulls the next batch from the left (build) stream.
/// Asynchronously pulls the next batch from the left stream.
///
/// This default implementation checks for the next value in the left stream.
/// If a batch is found, the state is switched to `PullRight`, and the batch handling
Expand Down
Loading