Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join #11041

Merged
merged 18 commits into from
Jun 22, 2024

Conversation

comphead
Copy link
Contributor

@comphead comphead commented Jun 21, 2024

Which issue does this PR close?

Closes #10886 .

Rationale for this change

There was an issue in SMJ when streamed rows handled concurrently. Race conditions for the self.streamed_batch.join_filter_matched_idxs lead SMJ producing duplicates because of unexpected event ordering caused by concurrency.

What changes are included in this PR?

Added an extra check when calculating the filter boolean mask and test methods to make fuzz test debugging easier in future

Are these changes tested?

Yes. There is no specifc test because of unstable test nature. The issue can be reproduced on main branch with high probability by running the test 1000 times

#[tokio::test]
async fn test_semi_join_1k_filtered() {
   for _ in 0 .. 1000 { 
     JoinFuzzTestCase::new(
          make_staggered_batches(1000),
          make_staggered_batches(1000),
          JoinType::LeftSemi,
          Some(Box::new(col_lt_col_filter)),
      )
      .run_test(&[JoinTestType::HjSmj], false)
      .await
  }
}

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Jun 21, 2024
let streamed_idx = streamed_indices.value(i);
if mask.value(i)
&& !seen_as_true
&& !matched_indices.contains(&streamed_idx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an actual fix!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the fix is correct. I roughly remember I have commented on such case. JoinType::LeftAnti below has considered matched_indices. Maybe for JoinType::LeftSem it is missing.

@comphead comphead requested a review from viirya June 21, 2024 02:15
Comment on lines +566 to 568
/// }
///
/// let ctx: SessionContext = SessionContext::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indent looks weird.

@@ -576,6 +617,39 @@ impl JoinFuzzTestCase {

println!("The data {out_name} saved as parquet into {out_path}");
}

/// Read parquet files preserving partitions, i.e. 1 file -> 1 partition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One partition is one batch here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

@@ -215,10 +215,6 @@ async fn test_semi_join_1k() {
.await
}

// The test is flaky
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@alamb alamb merged commit 6c0e4fb into apache:main Jun 22, 2024
23 checks passed
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
…pache#11041)

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jun 22, 2024
…pache#11041)

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…pache#11041)

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: disable fuzz test to avoid CI spontaneous failures

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* Minor: Add routine to debug join fuzz tests

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join

* SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SMJ producing different results than HashJoin when doing a semi join
3 participants