-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35442][SQL] Support propagate empty relation through aggregate/union #35149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase | |
| import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} | ||
| import org.apache.spark.sql.execution.aggregate.BaseAggregateExec | ||
| import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys | ||
|
|
||
| /** | ||
|
|
@@ -32,14 +33,24 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys | |
| */ | ||
| object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { | ||
| override protected def isEmpty(plan: LogicalPlan): Boolean = | ||
| super.isEmpty(plan) || getRowCount(plan).contains(0) | ||
| super.isEmpty(plan) || getEstimatedRowCount(plan).contains(0) | ||
|
|
||
| override protected def nonEmpty(plan: LogicalPlan): Boolean = | ||
| super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) | ||
| super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) | ||
|
|
||
| private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { | ||
| private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's document the assumptions: |
||
| case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => | ||
| stage.getRuntimeStatistics.rowCount | ||
|
|
||
| case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty && | ||
| agg.child.isInstanceOf[QueryStageExec] => | ||
| val stage = agg.child.asInstanceOf[QueryStageExec] | ||
| if (stage.isMaterialized) { | ||
| stage.getRuntimeStatistics.rowCount | ||
| } else { | ||
| None | ||
| } | ||
|
|
||
| case _ => None | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} | |
| import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} | ||
| import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, UnionExec} | ||
| import org.apache.spark.sql.execution.aggregate.BaseAggregateExec | ||
| import org.apache.spark.sql.execution.command.DataWritingCommandExec | ||
| import org.apache.spark.sql.execution.datasources.noop.NoopDataSource | ||
| import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec | ||
|
|
@@ -120,6 +121,12 @@ class AdaptiveQueryExecSuite | |
| } | ||
| } | ||
|
|
||
| private def findTopLevelBaeAggregate(plan: SparkPlan): Seq[BaseAggregateExec] = { | ||
| collect(plan) { | ||
| case agg: BaseAggregateExec => agg | ||
| } | ||
| } | ||
|
|
||
| private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = { | ||
| collect(plan) { | ||
| case s: SortExec => s | ||
|
|
@@ -1406,6 +1413,60 @@ class AdaptiveQueryExecSuite | |
| } | ||
| } | ||
|
|
||
| test("SPARK-35442: Support propagate empty relation through aggregate") { | ||
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
| val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( | ||
| "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") | ||
| assert(findTopLevelBaeAggregate(plan1).size == 2) | ||
|
||
| assert(!plan1.isInstanceOf[LocalTableScanExec]) | ||
| assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) | ||
|
|
||
| val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( | ||
| "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") | ||
| assert(findTopLevelBaeAggregate(plan2).size == 2) | ||
| assert(!plan2.isInstanceOf[LocalTableScanExec]) | ||
| assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) | ||
|
|
||
| val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( | ||
| "SELECT count(*) FROM testData WHERE value = 'no_match'") | ||
| assert(findTopLevelBaeAggregate(plan3).size == 2) | ||
| assert(!plan3.isInstanceOf[LocalTableScanExec]) | ||
| assert(findTopLevelBaeAggregate(adaptivePlan3).size == 2) | ||
| assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec]) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-35442: Support propagate empty relation through union") { | ||
| def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { | ||
| assert( | ||
| collect(plan) { | ||
| case u: UnionExec => u | ||
| }.size == numUnion) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
| val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( | ||
| """ | ||
| |SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key | ||
| |UNION ALL | ||
| |SELECT key, 1 FROM testData | ||
| |""".stripMargin) | ||
| checkNumUnion(plan1, 1) | ||
| checkNumUnion(adaptivePlan1, 0) | ||
| assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) | ||
|
|
||
| val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( | ||
| """ | ||
| |SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key | ||
| |UNION ALL | ||
| |SELECT /*+ REPARTITION */ key, 1 FROM testData WHERE value = 'no_match' | ||
| |""".stripMargin) | ||
| checkNumUnion(plan2, 1) | ||
| checkNumUnion(adaptivePlan2, 0) | ||
| assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-32753: Only copy tags to node with no tags") { | ||
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
| withTempView("v1") { | ||
|
|
@@ -1794,7 +1855,8 @@ class AdaptiveQueryExecSuite | |
| test("SPARK-35239: Coalesce shuffle partition should handle empty input RDD") { | ||
| withTable("t") { | ||
| withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", | ||
| SQLConf.SHUFFLE_PARTITIONS.key -> "2") { | ||
| SQLConf.SHUFFLE_PARTITIONS.key -> "2", | ||
| SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { | ||
| spark.sql("CREATE TABLE t (c1 int) USING PARQUET") | ||
| val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) FROM t GROUP BY c1") | ||
| assert( | ||
|
|
@@ -2261,7 +2323,8 @@ class AdaptiveQueryExecSuite | |
| test("SPARK-37742: AQE reads invalid InMemoryRelation stats and mistakenly plans BHJ") { | ||
| withSQLConf( | ||
| SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { | ||
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584", | ||
| SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { | ||
| // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be | ||
| // estimated at ~120m bytes which is greater than the broadcast join threshold. | ||
| val joinKeyOne = "00112233445566778899" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you bypass
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUniqueas we introduce conflicting attr id here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked if the exprId is same, and reuse the original attr if same, create a new alias if different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the conflicting attr id is introduced by #29053, original code is:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, why doesn't
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUniquecapture this...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current branch we only create
Aliasif the old attr id is different with new attr id, so it won't introduce the conflicting attr id.For the #29053, it seems there is no test can cover this case so it passed..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's easy to trigger this bug:
Union(A, B), A is empty and we return B instead. @ulysses-you can you help to fix this bug? We have a common solution for it:QueryPlan.transformWithNewOutput