-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer #32602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4514d27
5097247
8df68d9
165077b
2f5fa20
7b80db0
05e074c
48dd92a
8f4dc80
1220087
e26df96
f7a14cf
2c0dfb0
0e151d9
c086f72
9b78ac0
767dd92
47e0c3a
d9ca6da
2fead86
0754936
a6213ea
624e45e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,59 +26,46 @@ import org.apache.spark.sql.catalyst.rules._ | |
| import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} | ||
|
|
||
| /** | ||
| * Collapse plans consisting empty local relations generated by [[PruneFilters]]. | ||
| * 1. Binary(or Higher)-node Logical Plans | ||
| * - Union with all empty children. | ||
| * - Join with one or two empty children (including Intersect/Except). | ||
| * 2. Unary-node Logical Plans | ||
| * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. | ||
| * - Join with false condition. | ||
| * - Aggregate with all empty children and at least one grouping expression. | ||
| * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. | ||
| * The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with | ||
| * empty or non-empty relations: | ||
| * 1. Binary-node Logical Plans | ||
| * - Join with one or two empty children (including Intersect/Except). | ||
| * - Left semi Join | ||
| * Right side is non-empty and condition is empty. Eliminate join to its left side. | ||
| * - Left anti join | ||
| * Right side is non-empty and condition is empty. Eliminate join to an empty | ||
| * [[LocalRelation]]. | ||
| * 2. Unary-node Logical Plans | ||
| * - Limit/Repartition with all empty children. | ||
| * - Aggregate with all empty children and at least one grouping expression. | ||
| * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. | ||
| */ | ||
| object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport { | ||
| private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { | ||
| abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { | ||
| protected def isEmpty(plan: LogicalPlan): Boolean = plan match { | ||
| case p: LocalRelation => p.data.isEmpty | ||
| case _ => false | ||
| } | ||
|
|
||
| private def empty(plan: LogicalPlan) = | ||
| protected def nonEmpty(plan: LogicalPlan): Boolean = plan match { | ||
| case p: LocalRelation => p.data.nonEmpty | ||
| case _ => false | ||
| } | ||
|
|
||
| protected def empty(plan: LogicalPlan): LocalRelation = | ||
| LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) | ||
|
|
||
| // Construct a project list from plan's output, while the value is always NULL. | ||
| private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = | ||
| plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( | ||
| _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { | ||
| case p: Union if p.children.exists(isEmptyLocalRelation) => | ||
| val newChildren = p.children.filterNot(isEmptyLocalRelation) | ||
| if (newChildren.isEmpty) { | ||
| empty(p) | ||
| } else { | ||
| val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head | ||
| val outputs = newPlan.output.zip(p.output) | ||
| // the original Union may produce different output attributes than the new one so we alias | ||
| // them if needed | ||
| if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { | ||
| newPlan | ||
| } else { | ||
| val outputAliases = outputs.map { case (newAttr, oldAttr) => | ||
| val newExplicitMetadata = | ||
| if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None | ||
| Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) | ||
| } | ||
| Project(outputAliases, newPlan) | ||
| } | ||
| } | ||
|
|
||
| protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { | ||
| // Joins on empty LocalRelations generated from streaming sources are not eliminated | ||
| // as stateful streaming joins need to perform other state management operations other than | ||
| // just processing the input data. | ||
| case p @ Join(_, _, joinType, conditionOpt, _) | ||
| if !p.children.exists(_.isStreaming) => | ||
| val isLeftEmpty = isEmptyLocalRelation(p.left) | ||
| val isRightEmpty = isEmptyLocalRelation(p.right) | ||
| val isLeftEmpty = isEmpty(p.left) | ||
| val isRightEmpty = isEmpty(p.right) | ||
| val isFalseCondition = conditionOpt match { | ||
| case Some(FalseLiteral) => true | ||
| case _ => false | ||
|
|
@@ -103,14 +90,15 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit | |
| Project(nullValueProjectList(p.left) ++ p.right.output, p.right) | ||
| case _ => p | ||
| } | ||
| } else if (joinType == LeftSemi && conditionOpt.isEmpty && nonEmpty(p.right)) { | ||
| p.left | ||
| } else if (joinType == LeftAnti && conditionOpt.isEmpty && nonEmpty(p.right)) { | ||
| empty(p) | ||
| } else { | ||
| p | ||
| } | ||
|
|
||
| case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { | ||
| case _: Project => empty(p) | ||
| case _: Filter => empty(p) | ||
| case _: Sample => empty(p) | ||
| case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match { | ||
| case _: Sort => empty(p) | ||
| case _: GlobalLimit if !p.isStreaming => empty(p) | ||
| case _: LocalLimit if !p.isStreaming => empty(p) | ||
|
|
@@ -137,3 +125,55 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This rule runs in the normal optimizer and optimizes more cases | ||
| * compared to [[PropagateEmptyRelationBase]]: | ||
| * 1. Higher-node Logical Plans | ||
| * - Union with all empty children. | ||
| * 2. Unary-node Logical Plans | ||
| * - Project/Filter/Sample with all empty children. | ||
| * | ||
| * The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough | ||
| * and it may introduce extra exchanges. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After more thought, I think this is a big performance issue if we can't propagate empty relations through project/filter which are quite common. The risk of introducing new shuffles is relatively small compared to this. @ulysses-you can we move all the logic to |
||
| */ | ||
| object PropagateEmptyRelation extends PropagateEmptyRelationBase { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private def applyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { | ||
| case p: Union if p.children.exists(isEmpty) => | ||
| val newChildren = p.children.filterNot(isEmpty) | ||
| if (newChildren.isEmpty) { | ||
| empty(p) | ||
| } else { | ||
| val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head | ||
| val outputs = newPlan.output.zip(p.output) | ||
| // the original Union may produce different output attributes than the new one so we alias | ||
| // them if needed | ||
| if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { | ||
| newPlan | ||
| } else { | ||
| val outputAliases = outputs.map { case (newAttr, oldAttr) => | ||
| val newExplicitMetadata = | ||
| if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None | ||
| Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) | ||
| } | ||
| Project(outputAliases, newPlan) | ||
| } | ||
| } | ||
|
|
||
| case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && canPropagate(p) => | ||
| empty(p) | ||
| } | ||
|
|
||
| // extract the pattern avoid conflict with commonApplyFunc | ||
| private def canPropagate(plan: LogicalPlan): Boolean = plan match { | ||
| case _: Project => true | ||
| case _: Filter => true | ||
| case _: Sample => true | ||
| case _ => false | ||
| } | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( | ||
| _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { | ||
| applyFunc.orElse(commonApplyFunc) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.adaptive | ||
|
|
||
| import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase | ||
| import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys | ||
|
|
||
| /** | ||
| * This rule runs in the AQE optimizer and optimizes more cases | ||
| * compared to [[PropagateEmptyRelationBase]]: | ||
| * 1. Join is single column NULL-aware anti join (NAAJ) | ||
| * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an | ||
| * empty [[LocalRelation]]. | ||
| */ | ||
| object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { | ||
| override protected def isEmpty(plan: LogicalPlan): Boolean = | ||
| super.isEmpty(plan) || getRowCount(plan).contains(0) | ||
|
|
||
| override protected def nonEmpty(plan: LogicalPlan): Boolean = | ||
| super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) | ||
|
|
||
| private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { | ||
| case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => | ||
| stage.getRuntimeStatistics.rowCount | ||
| case _ => None | ||
| } | ||
|
|
||
| private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match { | ||
| case LogicalQueryStage(_, stage: BroadcastQueryStageExec) | ||
| if stage.resultOption.get().isDefined => | ||
| stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys | ||
| case _ => false | ||
| } | ||
|
|
||
| private def eliminateSingleColumnNullAwareAntiJoin: PartialFunction[LogicalPlan, LogicalPlan] = { | ||
| case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => | ||
| empty(j) | ||
| } | ||
|
|
||
| // TODO we need use transformUpWithPruning instead of transformUp | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan , if we want to use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea it's OK as it's not a regression. cc @sigmod
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan seems we don't need to use |
||
| eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc) | ||
| } | ||
| } | ||
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.