-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32730][SQL] Improve LeftSemi and Existence SortMergeJoin right side buffering #29572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32730][SQL] Improve LeftSemi and Existence SortMergeJoin right side buffering #29572
Conversation
|
Test build #128000 has finished for PR 29572 at commit
|
|
|
||
| // LEFT SEMI JOIN without bound condition does not use [[ExternalAppendOnlyUnsafeRowArray]] | ||
| // so should not cause any spill | ||
| assertNotSpilled(sparkContext, "left semi join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this fix this UT fails.
|
Test build #128014 has finished for PR 29572 at commit
|
|
cc @cloud-fan, @maropu, @viirya |
| } | ||
|
|
||
| override def add(row: UnsafeRow): Unit = { | ||
| assert(buffer == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that ExternalAppendOnlyUnsafeRowArray will do spill even if we only add one row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, its threshold parameters do work as expected. Just ExternalAppendOnlyUnsafeRowArray looked a bit heavy weight for this case when we want to store only one row. But we can also use new ExternalAppendOnlyUnsafeRowArray(1, 1) for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you test it if ExternalAppendOnlyUnsafeRowArray doesn't spill either?
| pageSizeBytes: Long, | ||
| numRowsInMemoryBufferThreshold: Int, | ||
| numRowsSpillThreshold: Int) extends Logging { | ||
| numRowsSpillThreshold: Int) extends AppendOnlyUnsafeRowArray with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For core component like this, I remember we rarely change its inheritance. It is easily to have performance regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, in that case let's drop that new trait and stick to the important part.
689b5b7 to
d893580
Compare
d893580 to
037b876
Compare
|
Test build #128212 has finished for PR 29572 at commit
|
| private[this] val bufferedMatches = | ||
| new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) | ||
| private[this] val bufferedMatches: ExternalAppendOnlyUnsafeRowArray = | ||
| new ExternalAppendOnlyUnsafeRowArray(if (bufferFirstOnly) 1 else inMemoryThreshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this change avoid spilling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not this change does it, but the one in bufferMatchingRows(). This change just avoids creating a buffer larger than 1.
| spillThreshold: Int, | ||
| eagerCleanupResources: () => Unit) { | ||
| eagerCleanupResources: () => Unit, | ||
| bufferFirstOnly: Boolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: bufferFirstOnly -> matchedBufferFirstOnly? And, please add @param, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bufferFirstOnly: Boolean = false to avoid the unnecessary changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
| private[this] val bufferedMatches = | ||
| new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) | ||
| private[this] val bufferedMatches: ExternalAppendOnlyUnsafeRowArray = | ||
| new ExternalAppendOnlyUnsafeRowArray(if (bufferFirstOnly) 1 else inMemoryThreshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this change? How about just setting 1 in the caller side? https://github.com/apache/spark/pull/29572/files#diff-2649ee0724bbc5ee6a64d7de308c527eR255
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another call site in existence join: https://github.com/apache/spark/pull/29572/files#diff-2649ee0724bbc5ee6a64d7de308c527eR336
| spillThreshold: Int, | ||
| eagerCleanupResources: () => Unit) { | ||
| eagerCleanupResources: () => Unit, | ||
| matchedBufferFirstOnly: Boolean = false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about onlyBufferFirstMatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, renamed.
|
Ur one more; please update the title/description (this PR is not only for leftsemi, right?), too. |
| inMemoryThreshold, | ||
| spillThreshold, | ||
| cleanupResources | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please avoid the unnecessary changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, reverted them.
Updated, thanks. |
|
Test build #128238 has finished for PR 29572 at commit
|
|
Test build #128243 has finished for PR 29572 at commit
|
|
Test build #128242 has finished for PR 29572 at commit
|
|
thanks, merging to master! |
|
Thanks for the review @cloud-fan, @maropu, @viirya. |
|
@peter-toth Nice fix! Could you share the perf difference when you run the TPC-DS ? |
| spillThreshold, | ||
| cleanupResources | ||
| cleanupResources, | ||
| condition.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @peter-toth !
I think this could be also added to LeftAnti join, which is also only interested in the existence of a match and doesn't need to buffer them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @juliuszsompolski, I think you are right. Shall I open a follow-up PR or a different ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a followup PR is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened #29727
Thanks @gatorsmile. Yes, I will try to run some benchmarks for this particular change and share the results. BTW, I have another PR open that brings ~30% improvement to some of the TPCDS queries: #28885 |
…de buffering ### What changes were proposed in this pull request? This is a follow-up to #29572. LeftAnti SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens. ### Why are the changes needed? Performance improvement. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #29727 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…de buffering ### What changes were proposed in this pull request? This is a follow-up to apache/spark#29572. LeftAnti SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens. ### Why are the changes needed? Performance improvement. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Closes #29727 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin-followup. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.
Why are the changes needed?
Performance improvement.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT and TPCDS benchmarks.