Skip to content

Commit

Permalink
fix: duplicate output for HashJoinExec in CollectLeft mode (#9757)
Browse files Browse the repository at this point in the history
* fix: duplicate output for HashJoinExec in CollectLeft mode

* address review comments

* test fix after merging main
  • Loading branch information
korowa authored Apr 21, 2024
1 parent fc34dac commit 70db5ea
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 117 deletions.
73 changes: 18 additions & 55 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,6 @@ impl PhysicalOptimizerRule for JoinSelection {
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
/// When the `ignore_threshold` is false, this function will also check left
/// and right sizes in bytes or rows.
///
/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
/// mode as is, but it can do so by changing the join type to [`JoinType::Right`]
/// and [`JoinType::RightAnti`], respectively.
fn try_collect_left(
hash_join: &HashJoinExec,
ignore_threshold: bool,
Expand All @@ -318,38 +313,20 @@ fn try_collect_left(
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let left = hash_join.left();
let right = hash_join.right();
let join_type = hash_join.join_type();

let left_can_collect = match join_type {
JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
JoinType::Inner
| JoinType::LeftSemi
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
ignore_threshold
|| supports_collect_by_thresholds(
&**left,
threshold_byte_size,
threshold_num_rows,
)
}
};
let right_can_collect = match join_type {
JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
JoinType::Inner
| JoinType::RightSemi
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => {
ignore_threshold
|| supports_collect_by_thresholds(
&**right,
threshold_byte_size,
threshold_num_rows,
)
}
};
let left_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**left,
threshold_byte_size,
threshold_num_rows,
);
let right_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**right,
threshold_byte_size,
threshold_num_rows,
);

match (left_can_collect, right_can_collect) {
(true, true) => {
if should_swap_join_order(&**left, &**right)?
Expand Down Expand Up @@ -916,9 +893,9 @@ mod tests_statistical {
}

#[tokio::test]
async fn test_left_join_with_swap() {
async fn test_left_join_no_swap() {
let (big, small) = create_big_and_small();
// Left out join should alway swap when the mode is PartitionMode::CollectLeft, even left side is small and right side is large

let join = Arc::new(
HashJoinExec::try_new(
Arc::clone(&small),
Expand All @@ -942,32 +919,18 @@ mod tests_statistical {
.optimize(join.clone(), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
.as_any()
.downcast_ref::<ProjectionExec>()
.expect("A proj is required to swap columns back to their original order");

assert_eq!(swapping_projection.expr().len(), 2);
let (col, name) = &swapping_projection.expr()[0];
assert_eq!(name, "small_col");
assert_col_expr(col, "small_col", 1);
let (col, name) = &swapping_projection.expr()[1];
assert_eq!(name, "big_col");
assert_col_expr(col, "big_col", 0);

let swapped_join = swapping_projection
.input()
let swapped_join = optimized_join
.as_any()
.downcast_ref::<HashJoinExec>()
.expect("The type of the plan should not be changed");

assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
Precision::Inexact(2097152)
Precision::Inexact(8192)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
Precision::Inexact(8192)
Precision::Inexact(2097152)
);
crosscheck_plans(join.clone()).unwrap();
}
Expand Down
Loading

0 comments on commit 70db5ea

Please sign in to comment.