Skip to content

Commit

Permalink
Minor: Improve HashJoinExec documentation (#7953)
Browse files Browse the repository at this point in the history
* Minor: Improve `HashJoinExec` documentation

* Apply suggestions from code review

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
alamb and viirya committed Oct 29, 2023
1 parent f388a2b commit 4a91ce9
Showing 1 changed file with 126 additions and 12 deletions.
138 changes: 126 additions & 12 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the join plan for executing partitions in parallel and then joining the results
//! into a set of partitions.
//! [`HashJoinExec`] Partitioned Hash Join Operator

use std::fmt;
use std::mem::size_of;
Expand Down Expand Up @@ -78,29 +77,140 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};

type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);

/// Join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
/// join.
///
/// Filter expression expected to contain non-equality predicates that can not be pushed
/// down to any of join inputs.
/// In case of outer join, filter applied to only matched rows.
/// # Join Expressions
///
/// This implementation is optimized for evaluating eqijoin predicates (
/// `<col1> = <col2>`) expressions, which are represented as a list of `Columns`
/// in [`Self::on`].
///
/// Non-equality predicates, which can not pushed down to a join inputs (e.g.
/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
/// after the equijoin predicates.
///
/// # "Build Side" vs "Probe Side"
///
/// HashJoin takes two inputs, which are referred to as the "build" and the
/// "probe". The build side is the first child, and the probe side is the second
/// child.
///
/// The two inputs are treated differently and it is VERY important that the
/// *smaller* input is placed on the build side to minimize the work of creating
/// the hash table.
///
/// ```text
/// ┌───────────┐
/// │ HashJoin │
/// │ │
/// └───────────┘
/// │ │
/// ┌─────┘ └─────┐
/// ▼ ▼
/// ┌────────────┐ ┌─────────────┐
/// │ Input │ │ Input │
/// │ [0] │ │ [1] │
/// └────────────┘ └─────────────┘
///
/// "build side" "probe side"
/// ```
///
/// Execution proceeds in 2 stages:
///
/// 1. the **build phase** where a hash table is created from the tuples of the
/// build side.
///
/// 2. the **probe phase** where the tuples of the probe side are streamed
/// through, checking for matches of the join keys in the hash table.
///
/// ```text
/// ┌────────────────┐ ┌────────────────┐
/// │ ┌─────────┐ │ │ ┌─────────┐ │
/// │ │ Hash │ │ │ │ Hash │ │
/// │ │ Table │ │ │ │ Table │ │
/// │ │(keys are│ │ │ │(keys are│ │
/// │ │equi join│ │ │ │equi join│ │ Stage 2: batches from
/// Stage 1: the │ │columns) │ │ │ │columns) │ │ the probe side are
/// *entire* build │ │ │ │ │ │ │ │ streamed through, and
/// side is read │ └─────────┘ │ │ └─────────┘ │ checked against the
/// into the hash │ ▲ │ │ ▲ │ contents of the hash
/// table │ HashJoin │ │ HashJoin │ table
/// └──────┼─────────┘ └──────────┼─────┘
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
/// │ │
///
/// │ │
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
/// ... ...
/// ┌────────────┐ ┌────────────┐
/// │RecordBatch │ │RecordBatch │
/// └────────────┘ └────────────┘
///
/// build side probe side
///
/// ```
///
/// # Example "Optimal" Plans
///
/// The differences in the inputs means that for classic "Star Schema Query",
/// the optimal plan will be a **"Right Deep Tree"** . A Star Schema Query is
/// one where there is one large table and several smaller "dimension" tables,
/// joined on `Foreign Key = Primary Key` predicates.
///
/// A "Right Deep Tree" looks like this large table as the probe side on the
/// lowest join:
///
/// ```text
/// ┌───────────┐
/// │ HashJoin │
/// │ │
/// └───────────┘
/// │ │
/// ┌───────┘ └──────────┐
/// ▼ ▼
/// ┌───────────────┐ ┌───────────┐
/// │ small table 1 │ │ HashJoin │
/// │ "dimension" │ │ │
/// └───────────────┘ └───┬───┬───┘
/// ┌──────────┘ └───────┐
/// │ │
/// ▼ ▼
/// ┌───────────────┐ ┌───────────┐
/// │ small table 2 │ │ HashJoin │
/// │ "dimension" │ │ │
/// └───────────────┘ └───┬───┬───┘
/// ┌────────┘ └────────┐
/// │ │
/// ▼ ▼
/// ┌───────────────┐ ┌───────────────┐
/// │ small table 3 │ │ large table │
/// │ "dimension" │ │ "fact" │
/// └───────────────┘ └───────────────┘
/// ```
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
pub left: Arc<dyn ExecutionPlan>,
/// right (probe) side which are filtered by the hash table
pub right: Arc<dyn ExecutionPlan>,
/// Set of common columns used to join on
/// Set of equijoin columns from the relations: `(left_col, right_col)`
pub on: Vec<(Column, Column)>,
/// Filters which are applied while finding matching rows
pub filter: Option<JoinFilter>,
/// How the join is performed
/// How the join is performed (`OUTER`, `INNER`, etc)
pub join_type: JoinType,
/// The schema once the join is applied
/// The output schema for the join
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Shares the `RandomState` for the hashing algorithm
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Output order
output_order: Option<Vec<PhysicalSortExpr>>,
Expand All @@ -110,12 +220,16 @@ pub struct HashJoinExec {
metrics: ExecutionPlanMetricsSet,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
/// Null matching behavior: If `null_equals_null` is true, rows that have
/// `null`s in both left and right equijoin columns will be matched.
/// Otherwise, rows that have `null`s in the join columns will not be
/// matched and thus will not appear in the output.
pub null_equals_null: bool,
}

impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
///
/// # Error
/// This function errors when it is not possible to join the left and right sides on keys `on`.
pub fn try_new(
Expand Down

0 comments on commit 4a91ce9

Please sign in to comment.