-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35442][SQL] Support propagate empty relation through aggregate/union #35149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val outputAliases = outputs.map { case (newAttr, oldAttr) => | ||
| val newExplicitMetadata = | ||
| if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None | ||
| Alias(newAttr, oldAttr.name)(explicitMetadata = newExplicitMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small bug fix, the previous is
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
We should make alias use the not same expr id with references as the integral check do LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique.
A negative case:
SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key
UNION ALL
SELECT key, 1 FROM testData| override protected def nonEmpty(plan: LogicalPlan): Boolean = | ||
| super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) | ||
|
|
||
| private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest renaming this to getEstimatedRowCount with a comment to say that it can be over estimated if row count is not 0, to match the new change.
| withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
| val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( | ||
| "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") | ||
| assert(findTopLevelBaeAggregate(plan1).size == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it's clearer to check assert(!plan1.isInstanceOf[LocalTableScanExec])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we can remove this now.
| super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) | ||
|
|
||
| private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { | ||
| private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's document the assumptions: 0 means the plan must produce 0 row. Positive value means an estimated row count which can be over-estimated.
|
thank you @cloud-fan , updated with
|
|
thanks, merging to master! |
| } else { | ||
| val newExplicitMetadata = | ||
| if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None | ||
| Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you bypass LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique as we introduce conflicting attr id here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked if the exprId is same, and reuse the original attr if same, create a new alias if different.
if (newAttr.exprId == oldAttr.exprId) {
newAttr
} else {
val newExplicitMetadata =
if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the conflicting attr id is introduced by #29053, original code is:
val outputAliases = outputs.map { case (newAttr, oldAttr) =>
val newExplicitMetadata =
if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, why doesn't LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique capture this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current branch we only create Alias if the old attr id is different with new attr id, so it won't introduce the conflicting attr id.
For the #29053, it seems there is no test can cover this case so it passed..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's easy to trigger this bug: Union(A, B), A is empty and we return B instead. @ulysses-you can you help to fix this bug? We have a common solution for it: QueryPlan.transformWithNewOutput
…/union ### What changes were proposed in this pull request? - Add `LogicalQueryStage(_, agg: BaseAggregateExec)` check in `AQEPropagateEmptyRelation` - Add `LeafNode` check in `PropagateEmptyRelationBase`, so we can eliminate `LogicalQueryStage` to `LocalRelation` - Unify the `applyFunc` and `commonApplyFunc` in `PropagateEmptyRelationBase` ### Why are the changes needed? The Aggregate in AQE is different with others, the `LogicalQueryStage` looks like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this case specially. Logically, if the Aggregate grouping expression is not empty, we can eliminate it safely. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add new test in `AdaptiveQueryExecSuite` - `Support propagate empty relation through aggregate` - `Support propagate empty relation through union` Closes apache#35149 from ulysses-you/SPARK-35442-GA-SPARK. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
@cloud-fan @ulysses-you Sorry to disturb you in a pr that has been merged, but I find that this pr may have some negative effects. I used databricks spark-sql-pref + Spark 3.3 in spark-shell to run 3TB TPCDS q24a or q24b, the test code as follows: val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
// just use to load tables
val tables = new TPCDSTables(
spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
scaleFactor = scaleFactor,
useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")
// TPCDS 24a
val result = spark.sql(""" with ssales as
(select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
from store_sales, store_returns, store, item, customer, customer_address
where ss_ticket_number = sr_ticket_number
and ss_item_sk = sr_item_sk
and ss_customer_sk = c_customer_sk
and ss_item_sk = i_item_sk
and ss_store_sk = s_store_sk
and c_birth_country = upper(ca_country)
and s_zip = ca_zip
and s_market_id = 8
group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size)
select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
from ssales
where i_color = 'pale'
group by c_last_name, c_first_name, s_store_name
having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
sc.stop() The above test may failed due to The DAG corresponding to sql is as follows: And the details as follows: Since I saw the And already file a jira https://issues.apache.org/jira/browse/SPARK-40278 PS: I'm not sure whether master has fixed this issue |
|
hi @LuciferYang , thank you for the test. It is a known issue that when plan converts to empty in AQE once one side of shuffle stage is materialized, it is possible that there may exist some other stages which are still running. The plan is acutally finished then the driver is unblocked. So if the driver exits, the running stage will be cancelled as you see. It does not affect the result and just some unused running stages. I have a related pr #34316 try to fix this issue, but it is not easy. |
|
@ulysses-you Do we have plans to continue #34316? Should we temporarily add a switch or some document for this feature? This may make users feel puzzled. |
|
So the problem is we have confusing error messages in the log? The query itself is fine, right? |
|
No, the problem is the SQL be classified as |
|
Hi @ulysses-you , do you have time to investigate why the query is marked as failed in the UI? This seems something that is fix-able. |
@cloud-fan For the above demo program, if there are other jobs to be executed after That is to say, a completed SQL will affect the running efficiency of the next SQL. This will be displayed when using databricks spark-sql-pref to run TPCDS power test with all sqls. |
This is not worse than before. Without this optimization, the first query will run longer and wait for the jobs to complete. At least we allow the second query to kick off earlier now. It's a further improvement to cancel the unused running jobs. |
@cloud-fan In general, the unused stages will run until finished, but if the driver stops after the plan finished, the running stages will be cancelled. That causes the failed stages in UI. |
|
Therefore, when the last statement of the App is such a case, it is likely to fail from the UI? |
make sense |
do you mean "before" the plan finishes? Anyway, can we detect this case and not mark the query as failed? e.g. if the query is already finished, ignore canceled stage events that arrive later. |




What changes were proposed in this pull request?
LogicalQueryStage(_, agg: BaseAggregateExec)check inAQEPropagateEmptyRelationLeafNodecheck inPropagateEmptyRelationBase, so we can eliminateLogicalQueryStagetoLocalRelationapplyFuncandcommonApplyFuncinPropagateEmptyRelationBaseWhy are the changes needed?
The Aggregate in AQE is different with others, the
LogicalQueryStagelooks likeLogicalQueryStage(Aggregate, BaseAggregate). We should handle this case specially.Logically, if the Aggregate grouping expression is not empty, we can eliminate it safely.
Does this PR introduce any user-facing change?
no
How was this patch tested?
add new test in
AdaptiveQueryExecSuiteSupport propagate empty relation through aggregateSupport propagate empty relation through union