Skip to content

Conversation

@andygrove
Copy link
Member

What changes were proposed in this pull request?

This PR adds support for plugins to be able to provide columnar exchanges when AQE is enabled.

The main changes are:

  • In AdaptiveSparkPlanExec, the newQueryStage method now optimizes the Exchange node rather than just optimizing its children. This allows plugins to participate in optimizing and replacing the Exchange node itself.
  • New abstract base classes have been introduced for ShuffleExchange and BroadcastExchange so that Spark can work with replaced versions of these operators.

Why are the changes needed?

The changes are needed so that plugins can provide columnar exchage operators when AQE is enabled.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests have been updated as part of this PR to test replacing broadcast and shuffle exchanges when AQE is enabled.

@tgravescs
Copy link
Contributor

ok to test

assert(found == 111)

val found = collectPlanSteps(df.queryExecution.executedPlan).sum
assert(found == 11121)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to comment what 11121 equals in terms of the execs - MyBroadcastExchangeExec, etc..

@tgravescs
Copy link
Contributor

overall approach makes sense to me. It would be great to get some feedback from people working on AQE - @maryannxue @cloud-fan

@tgravescs
Copy link
Contributor

org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite is failing @andygrove please take a look

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125984 has finished for PR 29134 at commit 635fb1f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #125999 has finished for PR 29134 at commit 2fd2c08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
abstract class ShuffleExchange extends Exchange {
def shuffleId: Int
def getNumMappers: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get this through MapOutputTracker.

* enabled.
*/
abstract class ShuffleExchange extends Exchange {
def shuffleId: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is available in mapOutputStats

val optimizedPlan = applyPhysicalRules(e.child, queryStageOptimizerRules)
// apply optimizer rules to the Exchange node and its children, allowing plugins to be
// able to replace the Exchange node itself
val optimizedPlan = applyPhysicalRules(e, queryStageOptimizerRules)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we guarantee the top node is still an Exchange after applying physical rules?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to use s.withNewChildren(optimizedChildPlan) than adding special handling in those physical rules: https://github.com/apache/spark/pull/29134/files#diff-a30c7a6fcdcdd13e57135fd04d05f3b7R115-R117

That saves you the trouble of worrying about certain assumptions being broken in an arbitrary rule.

val leftParts = if (isLeftSkew && !isLeftCoalesced) {
val reducerId = leftPartSpec.startReducerIndex
val skewSpecs = createSkewPartitionSpecs(
left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get shuffleId from mapStats.

@cloud-fan
Copy link
Contributor

Hi @andygrove and @tgravescs , actually we have done something similar in our fork, to support columnar exchange. Our version supports broadcast as well, and also works with AQE disabled. Do you mind if I push our version out and ask you to review it? Thanks in advance!

@tgravescs
Copy link
Contributor

@cloud-fan that sounds good, push it out and we'll take a look.

@tgravescs
Copy link
Contributor

@cloud-fan do you know when you might have PR up?

@andygrove
Copy link
Member Author

Replaced by #29262

@andygrove andygrove deleted the support-columnar-exchanges branch July 28, 2020 14:20
asfgit pushed a commit that referenced this pull request Jul 29, 2020
### What changes were proposed in this pull request?

This PR adds abstract classes for shuffle and broadcast, so that users can provide their columnar implementations.

This PR updates several places to use the abstract exchange classes, and also update `AdaptiveSparkPlanExec` so that the columnar rules can see exchange nodes.

This is an alternative of #29134 .
Close #29134

### Why are the changes needed?

To allow columnar exchanges.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #29262 from cloud-fan/columnar.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants