Skip to content

Commit

Permalink
SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join (a…
Browse files Browse the repository at this point in the history
…pache#11041)

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join
  • Loading branch information
comphead authored and xinlifoobar committed Jun 22, 2024
1 parent 7f55951 commit c96415a
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 18 deletions.
104 changes: 89 additions & 15 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ async fn test_semi_join_1k() {
.await
}

// The test is flaky
// https://github.com/apache/datafusion/issues/10886
// SMJ produces 1 more row in the output
#[ignore]
#[tokio::test]
async fn test_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
Expand Down Expand Up @@ -442,18 +438,45 @@ impl JoinFuzzTestCase {

if debug {
println!("The debug is ON. Input data will be saved");
let out_dir_name = &format!("fuzz_test_debug_batch_size_{batch_size}");
Self::save_as_parquet(&self.input1, out_dir_name, "input1");
Self::save_as_parquet(&self.input2, out_dir_name, "input2");
let fuzz_debug = "fuzz_test_debug";
std::fs::remove_dir_all(fuzz_debug).unwrap_or(());
std::fs::create_dir_all(fuzz_debug).unwrap();
let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}");
Self::save_partitioned_batches_as_parquet(
&self.input1,
out_dir_name,
"input1",
);
Self::save_partitioned_batches_as_parquet(
&self.input2,
out_dir_name,
"input2",
);

if join_tests.contains(&JoinTestType::NljHj) {
Self::save_as_parquet(&nlj_collected, out_dir_name, "nlj");
Self::save_as_parquet(&hj_collected, out_dir_name, "hj");
Self::save_partitioned_batches_as_parquet(
&nlj_collected,
out_dir_name,
"nlj",
);
Self::save_partitioned_batches_as_parquet(
&hj_collected,
out_dir_name,
"hj",
);
}

if join_tests.contains(&JoinTestType::HjSmj) {
Self::save_as_parquet(&hj_collected, out_dir_name, "hj");
Self::save_as_parquet(&smj_collected, out_dir_name, "smj");
Self::save_partitioned_batches_as_parquet(
&hj_collected,
out_dir_name,
"hj",
);
Self::save_partitioned_batches_as_parquet(
&smj_collected,
out_dir_name,
"smj",
);
}
}

Expand Down Expand Up @@ -527,11 +550,26 @@ impl JoinFuzzTestCase {
/// as a parquet files preserving partitioning.
/// Once the data is saved it is possible to run a custom test on top of the saved data and debug
///
/// #[tokio::test]
/// async fn test1() {
/// let left: Vec<RecordBatch> = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input1").await.unwrap();
/// let right: Vec<RecordBatch> = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input2").await.unwrap();
///
/// JoinFuzzTestCase::new(
/// left,
/// right,
/// JoinType::LeftSemi,
/// Some(Box::new(col_lt_col_filter)),
/// )
/// .run_test(&[JoinTestType::HjSmj], false)
/// .await
/// }
///
/// let ctx: SessionContext = SessionContext::new();
/// let df = ctx
/// .read_parquet(
/// "/tmp/input1/*.parquet",
/// ParquetReadOptions::default(),
/// datafusion::prelude::ParquetReadOptions::default(),
/// )
/// .await
/// .unwrap();
Expand All @@ -540,7 +578,7 @@ impl JoinFuzzTestCase {
/// let df = ctx
/// .read_parquet(
/// "/tmp/input2/*.parquet",
/// ParquetReadOptions::default(),
/// datafusion::prelude::ParquetReadOptions::default(),
/// )
/// .await
/// .unwrap();
Expand All @@ -554,8 +592,11 @@ impl JoinFuzzTestCase {
/// )
/// .run_test()
/// .await
/// }
fn save_as_parquet(input: &[RecordBatch], output_dir: &str, out_name: &str) {
fn save_partitioned_batches_as_parquet(
input: &[RecordBatch],
output_dir: &str,
out_name: &str,
) {
let out_path = &format!("{output_dir}/{out_name}");
std::fs::remove_dir_all(out_path).unwrap_or(());
std::fs::create_dir_all(out_path).unwrap();
Expand All @@ -576,6 +617,39 @@ impl JoinFuzzTestCase {

println!("The data {out_name} saved as parquet into {out_path}");
}

/// Read parquet files preserving partitions, i.e. 1 file -> 1 partition
/// Files can be of different sizes
/// The method can be useful to read partitions have been saved by `save_partitioned_batches_as_parquet`
/// for test debugging purposes
#[allow(dead_code)]
async fn load_partitioned_batches_from_parquet(
dir: &str,
) -> std::io::Result<Vec<RecordBatch>> {
let ctx: SessionContext = SessionContext::new();
let mut batches: Vec<RecordBatch> = vec![];

for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();

if path.is_file() {
let mut batch = ctx
.read_parquet(
path.to_str().unwrap(),
datafusion::prelude::ParquetReadOptions::default(),
)
.await
.unwrap()
.collect()
.await
.unwrap();

batches.append(&mut batch);
}
}
Ok(batches)
}
}

/// Return randomly sized record batches with:
Expand Down
24 changes: 21 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,17 +1532,21 @@ fn get_filtered_join_mask(
for i in 0..streamed_indices_length {
// LeftSemi respects only first true values for specific streaming index,
// others true values for the same index must be false
if mask.value(i) && !seen_as_true {
let streamed_idx = streamed_indices.value(i);
if mask.value(i)
&& !seen_as_true
&& !matched_indices.contains(&streamed_idx)
{
seen_as_true = true;
corrected_mask.append_value(true);
filter_matched_indices.push(streamed_indices.value(i));
filter_matched_indices.push(streamed_idx);
} else {
corrected_mask.append_value(false);
}

// if switched to next streaming index(e.g. from 0 to 1, or from 1 to 2), we reset seen_as_true flag
if i < streamed_indices_length - 1
&& streamed_indices.value(i) != streamed_indices.value(i + 1)
&& streamed_idx != streamed_indices.value(i + 1)
{
seen_as_true = false;
}
Expand Down Expand Up @@ -2940,6 +2944,20 @@ mod tests {
))
);

assert_eq!(
get_filtered_join_mask(
LeftSemi,
&UInt64Array::from(vec![0, 0, 0, 1, 1, 1]),
&BooleanArray::from(vec![true, false, false, false, false, true]),
&HashSet::from_iter(vec![1]),
&0,
),
Some((
BooleanArray::from(vec![true, false, false, false, false, false]),
vec![0]
))
);

Ok(())
}

Expand Down

0 comments on commit c96415a

Please sign in to comment.