diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 1cb2b100e2d6..a3d20b97d1ab 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -825,27 +825,27 @@ fn estimate_join_cardinality( right_stats: Statistics, on: &JoinOn, ) -> Option { + let (left_col_stats, right_col_stats) = on + .iter() + .map(|(left, right)| { + match ( + left.as_any().downcast_ref::(), + right.as_any().downcast_ref::(), + ) { + (Some(left), Some(right)) => ( + left_stats.column_statistics[left.index()].clone(), + right_stats.column_statistics[right.index()].clone(), + ), + _ => ( + ColumnStatistics::new_unknown(), + ColumnStatistics::new_unknown(), + ), + } + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let (left_col_stats, right_col_stats) = on - .iter() - .map(|(left, right)| { - match ( - left.as_any().downcast_ref::(), - right.as_any().downcast_ref::(), - ) { - (Some(left), Some(right)) => ( - left_stats.column_statistics[left.index()].clone(), - right_stats.column_statistics[right.index()].clone(), - ), - _ => ( - ColumnStatistics::new_unknown(), - ColumnStatistics::new_unknown(), - ), - } - }) - .unzip::<_, _, Vec<_>, Vec<_>>(); - let ij_cardinality = estimate_inner_join_cardinality( Statistics { num_rows: left_stats.num_rows.clone(), @@ -888,10 +888,38 @@ fn estimate_join_cardinality( }) } - JoinType::LeftSemi - | JoinType::RightSemi - | JoinType::LeftAnti - | JoinType::RightAnti => None, + // For SemiJoins estimation result is either zero, in cases when inputs + // are non-overlapping according to statistics, or equal to number of rows + // for outer input + JoinType::LeftSemi | JoinType::RightSemi => { + let (outer_stats, inner_stats) = match join_type { + JoinType::LeftSemi => (left_stats, right_stats), + _ => (right_stats, left_stats), + }; + let cardinality = match estimate_disjoint_inputs(&outer_stats, &inner_stats) { + Some(estimation) => *estimation.get_value()?, + None => *outer_stats.num_rows.get_value()?, + }; + + Some(PartialJoinStatistics { + num_rows: cardinality, + column_statistics: outer_stats.column_statistics, + }) + } + + // For AntiJoins estimation always equals to outer statistics, as + // non-overlapping inputs won't affect estimation + JoinType::LeftAnti | JoinType::RightAnti => { + let outer_stats = match join_type { + JoinType::LeftAnti => left_stats, + _ => right_stats, + }; + + Some(PartialJoinStatistics { + num_rows: *outer_stats.num_rows.get_value()?, + column_statistics: outer_stats.column_statistics, + }) + } } } @@ -903,6 +931,11 @@ fn estimate_inner_join_cardinality( left_stats: Statistics, right_stats: Statistics, ) -> Option> { + // Immediatedly return if inputs considered as non-overlapping + if let Some(estimation) = estimate_disjoint_inputs(&left_stats, &right_stats) { + return Some(estimation); + }; + // The algorithm here is partly based on the non-histogram selectivity estimation // from Spark's Catalyst optimizer. let mut join_selectivity = Precision::Absent; @@ -911,30 +944,13 @@ fn estimate_inner_join_cardinality( .iter() .zip(right_stats.column_statistics.iter()) { - // If there is no overlap in any of the join columns, this means the join - // itself is disjoint and the cardinality is 0. Though we can only assume - // this when the statistics are exact (since it is a very strong assumption). - if left_stat.min_value.get_value()? > right_stat.max_value.get_value()? { - return Some( - if left_stat.min_value.is_exact().unwrap_or(false) - && right_stat.max_value.is_exact().unwrap_or(false) - { - Precision::Exact(0) - } else { - Precision::Inexact(0) - }, - ); - } - if left_stat.max_value.get_value()? < right_stat.min_value.get_value()? { - return Some( - if left_stat.max_value.is_exact().unwrap_or(false) - && right_stat.min_value.is_exact().unwrap_or(false) - { - Precision::Exact(0) - } else { - Precision::Inexact(0) - }, - ); + // Break if any of statistics bounds are undefined + if left_stat.min_value.get_value().is_none() + || left_stat.max_value.get_value().is_none() + || right_stat.min_value.get_value().is_none() + || right_stat.max_value.get_value().is_none() + { + return None; } let left_max_distinct = max_distinct_count(&left_stats.num_rows, left_stat); @@ -968,6 +984,58 @@ fn estimate_inner_join_cardinality( } } +/// Estimates if inputs are non-overlapping, using input statistics. +/// If inputs are disjoint, returns zero estimation, otherwise returns None +fn estimate_disjoint_inputs( + left_stats: &Statistics, + right_stats: &Statistics, +) -> Option> { + for (left_stat, right_stat) in left_stats + .column_statistics + .iter() + .zip(right_stats.column_statistics.iter()) + { + // If there is no overlap in any of the join columns, this means the join + // itself is disjoint and the cardinality is 0. Though we can only assume + // this when the statistics are exact (since it is a very strong assumption). + let left_min_val = left_stat.min_value.get_value(); + let right_max_val = right_stat.max_value.get_value(); + if left_min_val.is_some() + && right_max_val.is_some() + && left_min_val > right_max_val + { + return Some( + if left_stat.min_value.is_exact().unwrap_or(false) + && right_stat.max_value.is_exact().unwrap_or(false) + { + Precision::Exact(0) + } else { + Precision::Inexact(0) + }, + ); + } + + let left_max_val = left_stat.max_value.get_value(); + let right_min_val = right_stat.min_value.get_value(); + if left_max_val.is_some() + && right_min_val.is_some() + && left_max_val < right_min_val + { + return Some( + if left_stat.max_value.is_exact().unwrap_or(false) + && right_stat.min_value.is_exact().unwrap_or(false) + { + Precision::Exact(0) + } else { + Precision::Inexact(0) + }, + ); + } + } + + None +} + /// Estimate the number of maximum distinct values that can be present in the /// given column from its statistics. If distinct_count is available, uses it /// directly. Otherwise, if the column is numeric and has min/max values, it @@ -1716,9 +1784,11 @@ mod tests { #[test] fn test_inner_join_cardinality_single_column() -> Result<()> { let cases: Vec<(PartialStats, PartialStats, Option>)> = vec![ - // ----------------------------------------------------------------------------- - // | left(rows, min, max, distinct), right(rows, min, max, distinct), expected | - // ----------------------------------------------------------------------------- + // ------------------------------------------------ + // | left(rows, min, max, distinct, null_count), | + // | right(rows, min, max, distinct, null_count), | + // | expected, | + // ------------------------------------------------ // Cardinality computation // ======================= @@ -1824,6 +1894,11 @@ mod tests { None, ), // Non overlapping min/max (when exact=False). + ( + (10, Absent, Inexact(4), Absent, Absent), + (10, Inexact(5), Absent, Absent, Absent), + Some(Inexact(0)), + ), ( (10, Inexact(0), Inexact(10), Absent, Absent), (10, Inexact(11), Inexact(20), Absent, Absent), @@ -2106,6 +2181,204 @@ mod tests { Ok(()) } + #[test] + fn test_anti_semi_join_cardinality() -> Result<()> { + let cases: Vec<(JoinType, PartialStats, PartialStats, Option)> = vec![ + // ------------------------------------------------ + // | join_type , | + // | left(rows, min, max, distinct, null_count), | + // | right(rows, min, max, distinct, null_count), | + // | expected, | + // ------------------------------------------------ + + // Cardinality computation + // ======================= + ( + JoinType::LeftSemi, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(15), Inexact(25), Absent, Absent), + Some(50), + ), + ( + JoinType::RightSemi, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(15), Inexact(25), Absent, Absent), + Some(10), + ), + ( + JoinType::LeftSemi, + (10, Absent, Absent, Absent, Absent), + (50, Absent, Absent, Absent, Absent), + Some(10), + ), + ( + JoinType::LeftSemi, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(30), Inexact(40), Absent, Absent), + Some(0), + ), + ( + JoinType::LeftSemi, + (50, Inexact(10), Absent, Absent, Absent), + (10, Absent, Inexact(5), Absent, Absent), + Some(0), + ), + ( + JoinType::LeftSemi, + (50, Absent, Inexact(20), Absent, Absent), + (10, Inexact(30), Absent, Absent, Absent), + Some(0), + ), + ( + JoinType::LeftAnti, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(15), Inexact(25), Absent, Absent), + Some(50), + ), + ( + JoinType::RightAnti, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(15), Inexact(25), Absent, Absent), + Some(10), + ), + ( + JoinType::LeftAnti, + (10, Absent, Absent, Absent, Absent), + (50, Absent, Absent, Absent, Absent), + Some(10), + ), + ( + JoinType::LeftAnti, + (50, Inexact(10), Inexact(20), Absent, Absent), + (10, Inexact(30), Inexact(40), Absent, Absent), + Some(50), + ), + ( + JoinType::LeftAnti, + (50, Inexact(10), Absent, Absent, Absent), + (10, Absent, Inexact(5), Absent, Absent), + Some(50), + ), + ( + JoinType::LeftAnti, + (50, Absent, Inexact(20), Absent, Absent), + (10, Inexact(30), Absent, Absent, Absent), + Some(50), + ), + ]; + + let join_on = vec![( + Arc::new(Column::new("l_col", 0)) as _, + Arc::new(Column::new("r_col", 0)) as _, + )]; + + for (join_type, outer_info, inner_info, expected) in cases { + let outer_num_rows = outer_info.0; + let outer_col_stats = vec![create_column_stats( + outer_info.1, + outer_info.2, + outer_info.3, + outer_info.4, + )]; + + let inner_num_rows = inner_info.0; + let inner_col_stats = vec![create_column_stats( + inner_info.1, + inner_info.2, + inner_info.3, + inner_info.4, + )]; + + let output_cardinality = estimate_join_cardinality( + &join_type, + Statistics { + num_rows: Inexact(outer_num_rows), + total_byte_size: Absent, + column_statistics: outer_col_stats, + }, + Statistics { + num_rows: Inexact(inner_num_rows), + total_byte_size: Absent, + column_statistics: inner_col_stats, + }, + &join_on, + ) + .map(|cardinality| cardinality.num_rows); + + assert_eq!( + output_cardinality, expected, + "failure for join_type: {}", + join_type + ); + } + + Ok(()) + } + + #[test] + fn test_semi_join_cardinality_absent_rows() -> Result<()> { + let dummy_column_stats = + vec![create_column_stats(Absent, Absent, Absent, Absent)]; + let join_on = vec![( + Arc::new(Column::new("l_col", 0)) as _, + Arc::new(Column::new("r_col", 0)) as _, + )]; + + let absent_outer_estimation = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Absent, + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + Statistics { + num_rows: Exact(10), + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + &join_on, + ); + assert!( + absent_outer_estimation.is_none(), + "Expected \"None\" esimated SemiJoin cardinality for absent outer num_rows" + ); + + let absent_inner_estimation = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(500), + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + Statistics { + num_rows: Absent, + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + &join_on, + ).expect("Expected non-empty PartialJoinStatistics for SemiJoin with absent inner num_rows"); + + assert_eq!(absent_inner_estimation.num_rows, 500, "Expected outer.num_rows esimated SemiJoin cardinality for absent inner num_rows"); + + let absent_inner_estimation = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Absent, + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + Statistics { + num_rows: Absent, + total_byte_size: Absent, + column_statistics: dummy_column_stats.clone(), + }, + &join_on, + ); + assert!(absent_inner_estimation.is_none(), "Expected \"None\" esimated SemiJoin cardinality for absent outer and inner num_rows"); + + Ok(()) + } + #[test] fn test_calculate_join_output_ordering() -> Result<()> { let options = SortOptions::default();