From 6c0e4fb5d9ac7a0a2f2b91f8b88d21f0bc0b4424 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Sat, 22 Jun 2024 05:05:13 -0700 Subject: [PATCH] SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join (#11041) * Fix: Sort Merge Join crashes on TPCH Q21 * Fix LeftAnti SMJ join when the join filter is set * rm dbg * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: disable fuzz test to avoid CI spontaneous failures * Fix: Sort Merge Join crashes on TPCH Q21 * Fix LeftAnti SMJ join when the join filter is set * rm dbg * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 104 +++++++++++++++--- .../src/joins/sort_merge_join.rs | 24 +++- 2 files changed, 110 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 5fdf02079496..17dbf3a0ff28 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -215,10 +215,6 @@ async fn test_semi_join_1k() { .await } -// The test is flaky -// https://github.com/apache/datafusion/issues/10886 -// SMJ produces 1 more row in the output -#[ignore] #[tokio::test] async fn test_semi_join_1k_filtered() { JoinFuzzTestCase::new( @@ -442,18 +438,45 @@ impl JoinFuzzTestCase { if debug { println!("The debug is ON. Input data will be saved"); - let out_dir_name = &format!("fuzz_test_debug_batch_size_{batch_size}"); - Self::save_as_parquet(&self.input1, out_dir_name, "input1"); - Self::save_as_parquet(&self.input2, out_dir_name, "input2"); + let fuzz_debug = "fuzz_test_debug"; + std::fs::remove_dir_all(fuzz_debug).unwrap_or(()); + std::fs::create_dir_all(fuzz_debug).unwrap(); + let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}"); + Self::save_partitioned_batches_as_parquet( + &self.input1, + out_dir_name, + "input1", + ); + Self::save_partitioned_batches_as_parquet( + &self.input2, + out_dir_name, + "input2", + ); if join_tests.contains(&JoinTestType::NljHj) { - Self::save_as_parquet(&nlj_collected, out_dir_name, "nlj"); - Self::save_as_parquet(&hj_collected, out_dir_name, "hj"); + Self::save_partitioned_batches_as_parquet( + &nlj_collected, + out_dir_name, + "nlj", + ); + Self::save_partitioned_batches_as_parquet( + &hj_collected, + out_dir_name, + "hj", + ); } if join_tests.contains(&JoinTestType::HjSmj) { - Self::save_as_parquet(&hj_collected, out_dir_name, "hj"); - Self::save_as_parquet(&smj_collected, out_dir_name, "smj"); + Self::save_partitioned_batches_as_parquet( + &hj_collected, + out_dir_name, + "hj", + ); + Self::save_partitioned_batches_as_parquet( + &smj_collected, + out_dir_name, + "smj", + ); } } @@ -527,11 +550,26 @@ impl JoinFuzzTestCase { /// as a parquet files preserving partitioning. /// Once the data is saved it is possible to run a custom test on top of the saved data and debug /// + /// #[tokio::test] + /// async fn test1() { + /// let left: Vec = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input1").await.unwrap(); + /// let right: Vec = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input2").await.unwrap(); + /// + /// JoinFuzzTestCase::new( + /// left, + /// right, + /// JoinType::LeftSemi, + /// Some(Box::new(col_lt_col_filter)), + /// ) + /// .run_test(&[JoinTestType::HjSmj], false) + /// .await + /// } + /// /// let ctx: SessionContext = SessionContext::new(); /// let df = ctx /// .read_parquet( /// "/tmp/input1/*.parquet", - /// ParquetReadOptions::default(), + /// datafusion::prelude::ParquetReadOptions::default(), /// ) /// .await /// .unwrap(); @@ -540,7 +578,7 @@ impl JoinFuzzTestCase { /// let df = ctx /// .read_parquet( /// "/tmp/input2/*.parquet", - /// ParquetReadOptions::default(), + /// datafusion::prelude::ParquetReadOptions::default(), /// ) /// .await /// .unwrap(); @@ -554,8 +592,11 @@ impl JoinFuzzTestCase { /// ) /// .run_test() /// .await - /// } - fn save_as_parquet(input: &[RecordBatch], output_dir: &str, out_name: &str) { + fn save_partitioned_batches_as_parquet( + input: &[RecordBatch], + output_dir: &str, + out_name: &str, + ) { let out_path = &format!("{output_dir}/{out_name}"); std::fs::remove_dir_all(out_path).unwrap_or(()); std::fs::create_dir_all(out_path).unwrap(); @@ -576,6 +617,39 @@ impl JoinFuzzTestCase { println!("The data {out_name} saved as parquet into {out_path}"); } + + /// Read parquet files preserving partitions, i.e. 1 file -> 1 partition + /// Files can be of different sizes + /// The method can be useful to read partitions have been saved by `save_partitioned_batches_as_parquet` + /// for test debugging purposes + #[allow(dead_code)] + async fn load_partitioned_batches_from_parquet( + dir: &str, + ) -> std::io::Result> { + let ctx: SessionContext = SessionContext::new(); + let mut batches: Vec = vec![]; + + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if path.is_file() { + let mut batch = ctx + .read_parquet( + path.to_str().unwrap(), + datafusion::prelude::ParquetReadOptions::default(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + batches.append(&mut batch); + } + } + Ok(batches) + } } /// Return randomly sized record batches with: diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 420fab51da39..91b2151d32e7 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1532,17 +1532,21 @@ fn get_filtered_join_mask( for i in 0..streamed_indices_length { // LeftSemi respects only first true values for specific streaming index, // others true values for the same index must be false - if mask.value(i) && !seen_as_true { + let streamed_idx = streamed_indices.value(i); + if mask.value(i) + && !seen_as_true + && !matched_indices.contains(&streamed_idx) + { seen_as_true = true; corrected_mask.append_value(true); - filter_matched_indices.push(streamed_indices.value(i)); + filter_matched_indices.push(streamed_idx); } else { corrected_mask.append_value(false); } // if switched to next streaming index(e.g. from 0 to 1, or from 1 to 2), we reset seen_as_true flag if i < streamed_indices_length - 1 - && streamed_indices.value(i) != streamed_indices.value(i + 1) + && streamed_idx != streamed_indices.value(i + 1) { seen_as_true = false; } @@ -2940,6 +2944,20 @@ mod tests { )) ); + assert_eq!( + get_filtered_join_mask( + LeftSemi, + &UInt64Array::from(vec![0, 0, 0, 1, 1, 1]), + &BooleanArray::from(vec![true, false, false, false, false, true]), + &HashSet::from_iter(vec![1]), + &0, + ), + Some(( + BooleanArray::from(vec![true, false, false, false, false, false]), + vec![0] + )) + ); + Ok(()) }