-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32332][SQL] Make it possible to implement columnar exchanges #29262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| private def newQueryStage(e: Exchange): QueryStageExec = { | ||
| val optimizedPlan = applyPhysicalRules(e.child, queryStageOptimizerRules) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's important to hide the exchange nodes from the stage optimizer rules, as that's the assumption of these rules.
Then we don't need https://github.com/apache/spark/pull/29134/files#diff-a30c7a6fcdcdd13e57135fd04d05f3b7R115
|
Thanks @cloud-fan I will test our AQE POC with these changes. |
|
Test build #126660 has finished for PR 29262 at commit
|
|
Thanks @cloud-fan. I have tested these changes both with Spark 3.1 and also back ported to the 3.0 branch and everything is working well, so LGTM. I wish I had thought to separate the rules out into stage creation and post-stage creation. That made things much simpler. cc @tgravescs |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
Outdated
Show resolved
Hide resolved
|
Test build #126728 has finished for PR 29262 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #126750 has finished for PR 29262 at commit
|
|
test this please |
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, pending jenkins
|
Test build #126775 has finished for PR 29262 at commit
|
|
test this please |
|
Test build #126779 has finished for PR 29262 at commit
|
|
I merged this to master, unfortunately wouldn't pick clean to branch-3.0. @cloud-fan would you want to put up PR for branch-3.0? Otherwise Andy or myself can. |
|
Test build #126785 has finished for PR 29262 at commit
|
|
Does it qualify a backport? It's kind of a new feature. |
|
I went back and forth on that as I can see it both ways. I ended up filing it as a bug as it prevents us from properly doing columnar processing (introduced in 3.0.0) with AQE. I also thought it was all internal and a fairly isolated path and was hoping to get it into 3.0.1 if others didn't disagree since users are starting to to use AQE in 3.0. what do you think? |
|
I agree it's not that risky(the code is the same as before, just with one more abstraction layer: exchange-like). I don't have a strong opinion here. @andygrove can you help to create a backport PR? |
|
Sure, I'll create a PR this morning. Thanks @cloud-fan and @tgravescs. |
What changes were proposed in this pull request?
This PR adds abstract classes for shuffle and broadcast, so that users can provide their columnar implementations.
This PR updates several places to use the abstract exchange classes, and also update
AdaptiveSparkPlanExecso that the columnar rules can see exchange nodes.This is an alternative of #29134 .
Close #29134
Why are the changes needed?
To allow columnar exchanges.
Does this PR introduce any user-facing change?
no
How was this patch tested?
new tests