Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Feb 10, 2022

What changes were proposed in this pull request?

  • Add a new rule OptimizeOneMaxRowPlan in normal Optimizer and AQE Optimizer.
  • Move the similar optimization of EliminateSorts into OptimizeOneMaxRowPlan, also update its comment and test

Why are the changes needed?

Optimize the plan if its max row is equal to or less than 1 in these cases:

  • if the max rows of the child of sort less than or equal to 1, remove the sort
  • if the max rows per partition of the child of local sort less than or equal to 1,
    remove the local sort
  • if the max rows of the child of aggregate less than or equal to 1 and its child and
    it's grouping only(include the rewritten distinct plan), convert aggregate to project
  • if the max rows of the child of aggregate less than or equal to 1,
    set distinct to false in all aggregate expression

Does this PR introduce any user-facing change?

no, only change the plan

How was this patch tested?

  • Add a new test OptimizeOneMaxRowPlanSuite for normal optimizer
  • Add test in AdaptiveQueryExecSuite for AQE optimizer

@github-actions github-actions bot added the SQL label Feb 10, 2022
@HyukjinKwon
Copy link
Member

cc @maryannxue FYI

@ulysses-you ulysses-you force-pushed the SPARK-38162 branch 2 times, most recently from 5db385c to 530aec8 Compare February 11, 2022 03:57
@ulysses-you
Copy link
Contributor Author

previous test failed caused by bug, now is ok. cc @HyukjinKwon @cloud-fan @maryannxue

@zhengruifeng
Copy link
Contributor

A weird case is Sample with withReplacement=true. The underlying SampleExec may output more rows than maxRows.

see #35250

* - if the child of aggregate max rows less than or equal to 1, set distinct to false in all
* aggregate expression
*/
object OptimizeOneRowPlan extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put it in a new file?


/**
* The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
* - if the child of sort max rows less than or equal to 1, remove the sort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* - if the child of sort max rows less than or equal to 1, remove the sort
* - if the max rows of the child of sort less than or equal to 1, remove the sort

case Sort(_, _, child) if maxRowNotLargerThanOne(child) => child
case Sort(_, false, child) if maxRowPerPartitionNotLargerThanOne(child) => child
case agg @ Aggregate(_, _, child) if agg.groupOnly &&
agg.outputSet.subsetOf(child.outputSet) && maxRowNotLargerThanOne(child) => child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this change the query plan output columns? I think a clear idea is: if child outputs at most one row, we can turn group-only aggregate into a project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's more general. updated

}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformDownWithPruning(_.containsAnyPattern(SORT, AGGREGATE), ruleId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this rule removes node, I think transform up should be more efficient.

_.containsPattern(SORT))(applyLocally)

private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
case Sort(_, _, child) if child.maxRows.exists(_ <= 1L) => recursiveRemoveSort(child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, are you sure the new rule can fully cover this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is. The EliminateLimits only run Once , and the added rule run fixedPoint. It's no harmful since we have transformWithPruning

val aliasedExprs = aggregateExprs.map {
case ne: NamedExpression => ne
case e => Alias(e, e.toString)()
case e => UnresolvedAlias(e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a small bug in test, the name will be an unresolved string if there is no alias specified.


/**
* The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
* - if the max rows of the child of sort less than or equal to 1, remove the sort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* - if the max rows of the child of sort less than or equal to 1, remove the sort
* - if the max rows of the child of sort is less than or equal to 1, remove the sort

*/
object OptimizeOneRowPlan extends Rule[LogicalPlan] {
private def maxRowNotLargerThanOne(plan: LogicalPlan): Boolean = {
plan.maxRows.exists(_ <= 1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the code itself is simple and clean, we don't need to create a method for it.

test("SPARK-38162: Optimize one row plan in AQE Optimizer") {
withTempView("v") {
spark.sparkContext.parallelize(
(1 to 4).map(i => TestData( i, i.toString)), 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(1 to 4).map(i => TestData( i, i.toString)), 2)
(1 to 4).map(i => TestData(i, i.toString)), 2)

// convert group only aggregate to project
val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
"""
|SELECT distinct c1 FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if there is no /*+ repartition(c1) */?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing happens, the aggregate node is inside the logical query stage, so we can not optimize it at logical side:

LogicalQueryStage(logicalAgg: Aggregate, physicalAgg: BaseAggregateExec)

And the plan inside physicalAgg:

BaseAggregateExec final
  ShuffleQueryStage
    Exchange
      BaseAggregateExec partial

// remove distinct in aggregate
val (origin3, adaptive3) = runAdaptiveAndVerifyResult(
"""
|SELECT sum(distinct c1) FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in b425156 Feb 23, 2022
@ulysses-you ulysses-you deleted the SPARK-38162 branch February 23, 2022 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants