From 2740a7a0ed7c88b1c35534be3053b7afd75ce93c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:27:10 -0500 Subject: [PATCH 1/9] Refactor HashJoinExec to progressively accumulate dynamic filter bounds instead of computing them after data is accumulated This should unblock attempts to do spilling and should be no better or worse than the current approach in terms of performance / complexity --- datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/src/joins/hash_join/exec.rs | 124 +++++++++++++----- 2 files changed, 89 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 830a159d5eb15..b8a02187d36b2 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -53,6 +53,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b184bd341306e..4ccdeb43a7c36 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -54,7 +54,7 @@ use crate::{ PlanProperties, SendableRecordBatchStream, Statistics, }; -use arrow::array::{Array, ArrayRef, BooleanBufferBuilder}; +use arrow::array::{ArrayRef, BooleanBufferBuilder}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -63,11 +63,11 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result, - ScalarValue, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; -use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch}; +use datafusion_expr::Accumulator; +use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; @@ -77,6 +77,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::TryStreamExt; +use itertools::izip; use parking_lot::Mutex; /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. @@ -1188,28 +1189,6 @@ impl ExecutionPlan for HashJoinExec { } } -/// Compute min/max bounds for each column in the given arrays -fn compute_bounds(arrays: &[ArrayRef]) -> Result> { - arrays - .iter() - .map(|array| { - if array.is_empty() { - // Return NULL values for empty arrays - return Ok(ColumnBounds::new( - ScalarValue::try_from(array.data_type())?, - ScalarValue::try_from(array.data_type())?, - )); - } - - // Use Arrow kernels for efficient min/max computation - let min_val = min_batch(array)?; - let max_val = max_batch(array)?; - - Ok(ColumnBounds::new(min_val, max_val)) - }) - .collect() -} - #[expect(clippy::too_many_arguments)] /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// @@ -1239,6 +1218,18 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result> { /// # Returns /// `JoinLeftData` containing the hash map, consolidated batch, join key values, /// visited indices bitmap, and computed bounds (if requested). + +/// State for collecting the build-side data during hash join +struct BuildSideState { + batches: Vec, + num_rows: usize, + metrics: BuildProbeJoinMetrics, + reservation: MemoryReservation, + min_accumulators: Vec, + max_accumulators: Vec, + on_left: Vec>, +} + async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1251,27 +1242,78 @@ async fn collect_left_input( ) -> Result { let schema = left_stream.schema(); + let (min_accumulators, max_accumulators) = if should_compute_bounds { + let data_types = on_left + .iter() + .map(|expr| expr.data_type(&schema)) + .collect::>>()?; + ( + data_types + .iter() + .map(|data_type| MinAccumulator::try_new(data_type)) + .collect::>>()?, + data_types + .iter() + .map(|data_type| MaxAccumulator::try_new(data_type)) + .collect::>>()?, + ) + } else { + (Vec::new(), Vec::new()) + }; + // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = left_stream - .try_fold(initial, |mut acc, batch| async { + let initial = BuildSideState { + batches: Vec::new(), + num_rows: 0, + metrics, + reservation, + min_accumulators, + max_accumulators, + on_left: on_left.clone(), + }; + + let state = left_stream + .try_fold(initial, |mut state, batch| async move { + // Update accumulators if computing bounds + for (min_accumulator, max_accumulator, on_expr) in izip!( + &mut state.min_accumulators, + &mut state.max_accumulators, + &state.on_left + ) { + let array = on_expr.evaluate(&batch)?.into_array(batch.num_rows())?; + min_accumulator.update_batch(&[array.clone()])?; + max_accumulator.update_batch(&[array])?; + } + + // Decide if we spill or not let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + state.reservation.try_grow(batch_size)?; // Update metrics - acc.2.build_mem_used.add(batch_size); - acc.2.build_input_batches.add(1); - acc.2.build_input_rows.add(batch.num_rows()); + state.metrics.build_mem_used.add(batch_size); + state.metrics.build_input_batches.add(1); + state.metrics.build_input_rows.add(batch.num_rows()); // Update row count - acc.1 += batch.num_rows(); + state.num_rows += batch.num_rows(); // Push batch to output - acc.0.push(batch); - Ok(acc) + state.batches.push(batch); + Ok(state) }) .await?; + // Extract fields from state + let BuildSideState { + batches, + num_rows, + metrics, + mut reservation, + min_accumulators, + max_accumulators, + on_left: _, + } = state; + // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` let fixed_size_u32 = size_of::(); @@ -1339,7 +1381,17 @@ async fn collect_left_input( // Compute bounds for dynamic filter if enabled let bounds = if should_compute_bounds && num_rows > 0 { - Some(compute_bounds(&left_values)?) + let bounds = min_accumulators + .into_iter() + .zip(max_accumulators.into_iter()) + .map(|(mut min_accumulator, mut max_accumulator)| { + Ok(ColumnBounds::new( + min_accumulator.evaluate()?, + max_accumulator.evaluate()?, + )) + }) + .collect::>>()?; + Some(bounds) } else { None }; From 25be14a556131525c701adc8410e1c7abd22a3b5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:30:27 -0500 Subject: [PATCH 2/9] remove unused crate --- Cargo.lock | 1 - datafusion/physical-plan/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 933a0ee44a76d..c295aa208e71b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2559,7 +2559,6 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", - "datafusion-functions-aggregate-common", "datafusion-functions-window", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b8a02187d36b2..01e4b1899a86a 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -54,7 +54,6 @@ datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } -datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } From 9cc484fedeabe4f1596f6af995b734e5e36244e9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:56:59 -0500 Subject: [PATCH 3/9] fix --- .../physical-plan/src/joins/hash_join/exec.rs | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4ccdeb43a7c36..11cd7e849fa72 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -59,6 +59,7 @@ use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; +use arrow_schema::DataType; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ @@ -1189,7 +1190,6 @@ impl ExecutionPlan for HashJoinExec { } } -#[expect(clippy::too_many_arguments)] /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1218,7 +1218,6 @@ impl ExecutionPlan for HashJoinExec { /// # Returns /// `JoinLeftData` containing the hash map, consolidated batch, join key values, /// visited indices bitmap, and computed bounds (if requested). - /// State for collecting the build-side data during hash join struct BuildSideState { batches: Vec, @@ -1230,6 +1229,14 @@ struct BuildSideState { on_left: Vec>, } +fn dictionary_value_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Dictionary(_, value_type) => dictionary_value_type(value_type.as_ref()), + _ => data_type.clone(), + } +} + +#[allow(clippy::too_many_arguments)] async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1245,16 +1252,16 @@ async fn collect_left_input( let (min_accumulators, max_accumulators) = if should_compute_bounds { let data_types = on_left .iter() - .map(|expr| expr.data_type(&schema)) + .map(|expr| expr.data_type(&schema).map(|dt| dictionary_value_type(&dt))) .collect::>>()?; ( data_types .iter() - .map(|data_type| MinAccumulator::try_new(data_type)) + .map(MinAccumulator::try_new) .collect::>>()?, data_types .iter() - .map(|data_type| MaxAccumulator::try_new(data_type)) + .map(MaxAccumulator::try_new) .collect::>>()?, ) } else { @@ -1283,8 +1290,8 @@ async fn collect_left_input( &state.on_left ) { let array = on_expr.evaluate(&batch)?.into_array(batch.num_rows())?; - min_accumulator.update_batch(&[array.clone()])?; - max_accumulator.update_batch(&[array])?; + min_accumulator.update_batch(std::slice::from_ref(&array))?; + max_accumulator.update_batch(std::slice::from_ref(&array))?; } // Decide if we spill or not From 77787f2fcb314437b3fbcf372c34a9a100b018ab Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:06:09 -0500 Subject: [PATCH 4/9] fix --- .../physical-plan/src/joins/hash_join/exec.rs | 126 +++++++++++------- 1 file changed, 76 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 11cd7e849fa72..f5d68335d812d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1224,11 +1224,55 @@ struct BuildSideState { num_rows: usize, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, - min_accumulators: Vec, - max_accumulators: Vec, + min_accumulators: Option>, + max_accumulators: Option>, on_left: Vec>, } +impl BuildSideState { + /// Create a new BuildSideState with optional accumulators for bounds computation + fn try_new( + metrics: BuildProbeJoinMetrics, + reservation: MemoryReservation, + on_left: Vec>, + schema: &SchemaRef, + should_compute_bounds: bool, + ) -> Result { + let (min_accumulators, max_accumulators) = if should_compute_bounds { + let data_types = on_left + .iter() + .map(|expr| expr.data_type(schema).map(|dt| dictionary_value_type(&dt))) + .collect::>>()?; + ( + Some( + data_types + .iter() + .map(MinAccumulator::try_new) + .collect::>>()?, + ), + Some( + data_types + .iter() + .map(MaxAccumulator::try_new) + .collect::>>()?, + ), + ) + } else { + (None, None) + }; + + Ok(Self { + batches: Vec::new(), + num_rows: 0, + metrics, + reservation, + min_accumulators, + max_accumulators, + on_left, + }) + } +} + fn dictionary_value_type(data_type: &DataType) -> DataType { match data_type { DataType::Dictionary(_, value_type) => dictionary_value_type(value_type.as_ref()), @@ -1249,49 +1293,30 @@ async fn collect_left_input( ) -> Result { let schema = left_stream.schema(); - let (min_accumulators, max_accumulators) = if should_compute_bounds { - let data_types = on_left - .iter() - .map(|expr| expr.data_type(&schema).map(|dt| dictionary_value_type(&dt))) - .collect::>>()?; - ( - data_types - .iter() - .map(MinAccumulator::try_new) - .collect::>>()?, - data_types - .iter() - .map(MaxAccumulator::try_new) - .collect::>>()?, - ) - } else { - (Vec::new(), Vec::new()) - }; - // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = BuildSideState { - batches: Vec::new(), - num_rows: 0, + let initial = BuildSideState::try_new( metrics, reservation, - min_accumulators, - max_accumulators, - on_left: on_left.clone(), - }; + on_left.clone(), + &schema, + should_compute_bounds, + )?; let state = left_stream .try_fold(initial, |mut state, batch| async move { // Update accumulators if computing bounds - for (min_accumulator, max_accumulator, on_expr) in izip!( - &mut state.min_accumulators, - &mut state.max_accumulators, - &state.on_left - ) { - let array = on_expr.evaluate(&batch)?.into_array(batch.num_rows())?; - min_accumulator.update_batch(std::slice::from_ref(&array))?; - max_accumulator.update_batch(std::slice::from_ref(&array))?; + if let (Some(ref mut min_accumulators), Some(ref mut max_accumulators)) = + (&mut state.min_accumulators, &mut state.max_accumulators) + { + for (min_accumulator, max_accumulator, on_expr) in + izip!(min_accumulators, max_accumulators, &state.on_left) + { + let array = on_expr.evaluate(&batch)?.into_array(batch.num_rows())?; + min_accumulator.update_batch(std::slice::from_ref(&array))?; + max_accumulator.update_batch(std::slice::from_ref(&array))?; + } } // Decide if we spill or not @@ -1387,20 +1412,21 @@ async fn collect_left_input( .collect::>>()?; // Compute bounds for dynamic filter if enabled - let bounds = if should_compute_bounds && num_rows > 0 { - let bounds = min_accumulators - .into_iter() - .zip(max_accumulators.into_iter()) - .map(|(mut min_accumulator, mut max_accumulator)| { - Ok(ColumnBounds::new( - min_accumulator.evaluate()?, - max_accumulator.evaluate()?, - )) - }) - .collect::>>()?; - Some(bounds) - } else { - None + let bounds = match (min_accumulators, max_accumulators) { + (Some(min_accumulators), Some(max_accumulators)) if num_rows > 0 => { + let bounds = min_accumulators + .into_iter() + .zip(max_accumulators.into_iter()) + .map(|(mut min_accumulator, mut max_accumulator)| { + Ok(ColumnBounds::new( + min_accumulator.evaluate()?, + max_accumulator.evaluate()?, + )) + }) + .collect::>>()?; + Some(bounds) + } + _ => None, }; let data = JoinLeftData::new( From d16b01191f623334b44a13a01314692267e2e6c4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:30:48 -0500 Subject: [PATCH 5/9] docstrings --- .../physical-plan/src/joins/hash_join/exec.rs | 204 +++++++++++------- 1 file changed, 123 insertions(+), 81 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f5d68335d812d..8ff8d7099bca7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -78,7 +78,6 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::TryStreamExt; -use itertools::izip; use parking_lot::Mutex; /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. @@ -1190,43 +1189,93 @@ impl ExecutionPlan for HashJoinExec { } } -/// Collects all batches from the left (build) side stream and creates a hash map for joining. -/// -/// This function is responsible for: -/// 1. Consuming the entire left stream and collecting all batches into memory -/// 2. Building a hash map from the join key columns for efficient probe operations -/// 3. Computing bounds for dynamic filter pushdown (if enabled) -/// 4. Preparing visited indices bitmap for certain join types +/// Accumulator for collecting min/max bounds from build-side data during hash join. /// -/// # Parameters -/// * `random_state` - Random state for consistent hashing across partitions -/// * `left_stream` - Stream of record batches from the build side -/// * `on_left` - Physical expressions for the left side join keys -/// * `metrics` - Metrics collector for tracking memory usage and row counts -/// * `reservation` - Memory reservation tracker for the hash table and data -/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) -/// * `probe_threads_count` - Number of threads that will probe this hash table -/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering -/// -/// # Dynamic Filter Coordination -/// When `should_compute_bounds` is true, this function computes the min/max bounds -/// for each join key column but does NOT update the dynamic filter. Instead, the -/// bounds are stored in the returned `JoinLeftData` and later coordinated by -/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds -/// before updating the filter exactly once. +/// This struct encapsulates the logic for progressively computing column bounds +/// (minimum and maximum values) for a specific join key expression as batches +/// are processed during the build phase of a hash join. /// -/// # Returns -/// `JoinLeftData` containing the hash map, consolidated batch, join key values, -/// visited indices bitmap, and computed bounds (if requested). +/// The bounds are used for dynamic filter pushdown optimization, where filters +/// based on the actual data ranges can be pushed down to the probe side to +/// eliminate unnecessary data early. +struct CollectLeftAccumulator { + /// The physical expression to evaluate for each batch + expr: Arc, + /// Accumulator for tracking the minimum value across all batches + min: MinAccumulator, + /// Accumulator for tracking the maximum value across all batches + max: MaxAccumulator, +} + +impl CollectLeftAccumulator { + /// Creates a new accumulator for tracking bounds of a join key expression. + /// + /// # Arguments + /// * `expr` - The physical expression to track bounds for + /// * `schema` - The schema of the input data + /// + /// # Returns + /// A new `CollectLeftAccumulator` instance configured for the expression's data type + fn try_new(expr: Arc, schema: &SchemaRef) -> Result { + /// Recursively unwraps dictionary types to get the underlying value type. + fn dictionary_value_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Dictionary(_, value_type) => { + dictionary_value_type(value_type.as_ref()) + } + _ => data_type.clone(), + } + } + + let data_type = expr + .data_type(schema) + // Min/Max can operate on dictionary data but expect to be initialized with the underlaying value type + .map(|dt| dictionary_value_type(&dt))?; + Ok(Self { + expr, + min: MinAccumulator::try_new(&data_type)?, + max: MaxAccumulator::try_new(&data_type)?, + }) + } + + /// Updates the accumulators with values from a new batch. + /// + /// Evaluates the expression on the batch and updates both min and max + /// accumulators with the resulting values. + /// + /// # Arguments + /// * `batch` - The record batch to process + /// + /// # Returns + /// Ok(()) if the update succeeds, or an error if expression evaluation fails + fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; + self.min.update_batch(std::slice::from_ref(&array))?; + self.max.update_batch(std::slice::from_ref(&array))?; + Ok(()) + } + + /// Finalizes the accumulation and returns the computed bounds. + /// + /// Consumes self to extract the final min and max values from the accumulators. + /// + /// # Returns + /// The `ColumnBounds` containing the minimum and maximum values observed + fn evaluate(mut self) -> Result { + Ok(ColumnBounds::new( + self.min.evaluate()?, + self.max.evaluate()?, + )) + } +} + /// State for collecting the build-side data during hash join struct BuildSideState { batches: Vec, num_rows: usize, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, - min_accumulators: Option>, - max_accumulators: Option>, - on_left: Vec>, + bounds_accumulators: Option>, } impl BuildSideState { @@ -1238,27 +1287,15 @@ impl BuildSideState { schema: &SchemaRef, should_compute_bounds: bool, ) -> Result { - let (min_accumulators, max_accumulators) = if should_compute_bounds { - let data_types = on_left - .iter() - .map(|expr| expr.data_type(schema).map(|dt| dictionary_value_type(&dt))) - .collect::>>()?; - ( - Some( - data_types - .iter() - .map(MinAccumulator::try_new) - .collect::>>()?, - ), - Some( - data_types - .iter() - .map(MaxAccumulator::try_new) - .collect::>>()?, - ), + let bounds_accumulators = if should_compute_bounds { + Some( + on_left + .iter() + .map(|expr| CollectLeftAccumulator::try_new(expr.clone(), schema)) + .collect::>>()?, ) } else { - (None, None) + None }; Ok(Self { @@ -1266,20 +1303,39 @@ impl BuildSideState { num_rows: 0, metrics, reservation, - min_accumulators, - max_accumulators, - on_left, + bounds_accumulators, }) } } -fn dictionary_value_type(data_type: &DataType) -> DataType { - match data_type { - DataType::Dictionary(_, value_type) => dictionary_value_type(value_type.as_ref()), - _ => data_type.clone(), - } -} - +/// Collects all batches from the left (build) side stream and creates a hash map for joining. +/// +/// This function is responsible for: +/// 1. Consuming the entire left stream and collecting all batches into memory +/// 2. Building a hash map from the join key columns for efficient probe operations +/// 3. Computing bounds for dynamic filter pushdown (if enabled) +/// 4. Preparing visited indices bitmap for certain join types +/// +/// # Parameters +/// * `random_state` - Random state for consistent hashing across partitions +/// * `left_stream` - Stream of record batches from the build side +/// * `on_left` - Physical expressions for the left side join keys +/// * `metrics` - Metrics collector for tracking memory usage and row counts +/// * `reservation` - Memory reservation tracker for the hash table and data +/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) +/// * `probe_threads_count` - Number of threads that will probe this hash table +/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering +/// +/// # Dynamic Filter Coordination +/// When `should_compute_bounds` is true, this function computes the min/max bounds +/// for each join key column but does NOT update the dynamic filter. Instead, the +/// bounds are stored in the returned `JoinLeftData` and later coordinated by +/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds +/// before updating the filter exactly once. +/// +/// # Returns +/// `JoinLeftData` containing the hash map, consolidated batch, join key values, +/// visited indices bitmap, and computed bounds (if requested). #[allow(clippy::too_many_arguments)] async fn collect_left_input( random_state: RandomState, @@ -1307,15 +1363,9 @@ async fn collect_left_input( let state = left_stream .try_fold(initial, |mut state, batch| async move { // Update accumulators if computing bounds - if let (Some(ref mut min_accumulators), Some(ref mut max_accumulators)) = - (&mut state.min_accumulators, &mut state.max_accumulators) - { - for (min_accumulator, max_accumulator, on_expr) in - izip!(min_accumulators, max_accumulators, &state.on_left) - { - let array = on_expr.evaluate(&batch)?.into_array(batch.num_rows())?; - min_accumulator.update_batch(std::slice::from_ref(&array))?; - max_accumulator.update_batch(std::slice::from_ref(&array))?; + if let Some(ref mut accumulators) = state.bounds_accumulators { + for accumulator in accumulators { + accumulator.update_batch(&batch)?; } } @@ -1341,9 +1391,7 @@ async fn collect_left_input( num_rows, metrics, mut reservation, - min_accumulators, - max_accumulators, - on_left: _, + bounds_accumulators, } = state; // Estimation of memory size, required for hashtable, prior to allocation. @@ -1412,17 +1460,11 @@ async fn collect_left_input( .collect::>>()?; // Compute bounds for dynamic filter if enabled - let bounds = match (min_accumulators, max_accumulators) { - (Some(min_accumulators), Some(max_accumulators)) if num_rows > 0 => { - let bounds = min_accumulators + let bounds = match bounds_accumulators { + Some(accumulators) if num_rows > 0 => { + let bounds = accumulators .into_iter() - .zip(max_accumulators.into_iter()) - .map(|(mut min_accumulator, mut max_accumulator)| { - Ok(ColumnBounds::new( - min_accumulator.evaluate()?, - max_accumulator.evaluate()?, - )) - }) + .map(CollectLeftAccumulator::evaluate) .collect::>>()?; Some(bounds) } From 671949a12ef856ff5eac9fdf0764de2daeff2195 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:33:48 -0500 Subject: [PATCH 6/9] fix typo --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 8ff8d7099bca7..a05dea42e7515 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1229,7 +1229,7 @@ impl CollectLeftAccumulator { let data_type = expr .data_type(schema) - // Min/Max can operate on dictionary data but expect to be initialized with the underlaying value type + // Min/Max can operate on dictionary data but expect to be initialized with the underlying value type .map(|dt| dictionary_value_type(&dt))?; Ok(Self { expr, From 556482403a7a9b6809886f300d964ff0c2c0f431 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:36:56 -0500 Subject: [PATCH 7/9] simplify --- .../physical-plan/src/joins/hash_join/exec.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a05dea42e7515..364473caa2f46 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1287,23 +1287,19 @@ impl BuildSideState { schema: &SchemaRef, should_compute_bounds: bool, ) -> Result { - let bounds_accumulators = if should_compute_bounds { - Some( - on_left - .iter() - .map(|expr| CollectLeftAccumulator::try_new(expr.clone(), schema)) - .collect::>>()?, - ) - } else { - None - }; - Ok(Self { batches: Vec::new(), num_rows: 0, metrics, reservation, - bounds_accumulators, + bounds_accumulators: should_compute_bounds + .then(|| { + on_left + .iter() + .map(|expr| CollectLeftAccumulator::try_new(Arc::clone(&expr), schema)) + .collect::>>() + }) + .transpose()?, }) } } From e6c2b754c1d59522314259658f272b412ee40589 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:37:04 -0500 Subject: [PATCH 8/9] fmt --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 364473caa2f46..fa363b2912b8d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1296,7 +1296,9 @@ impl BuildSideState { .then(|| { on_left .iter() - .map(|expr| CollectLeftAccumulator::try_new(Arc::clone(&expr), schema)) + .map(|expr| { + CollectLeftAccumulator::try_new(Arc::clone(&expr), schema) + }) .collect::>>() }) .transpose()?, From b54f71f80bbd25bdb8647d4acb0f5ce36422d925 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:54:40 -0500 Subject: [PATCH 9/9] lint --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index fa363b2912b8d..2c23a9c8c5d14 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1297,7 +1297,7 @@ impl BuildSideState { on_left .iter() .map(|expr| { - CollectLeftAccumulator::try_new(Arc::clone(&expr), schema) + CollectLeftAccumulator::try_new(Arc::clone(expr), schema) }) .collect::>>() })