Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, RepartitionOperation, Statistics}
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOGICAL_QUERY_STAGE, REPARTITION_OPERATION, TreePattern}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec

/**
* The LogicalPlan wrapper for a [[QueryStageExec]], or a snippet of physical plan containing
Expand Down Expand Up @@ -53,8 +54,14 @@ case class LogicalQueryStage(
override def computeStats(): Statistics = {
// TODO this is not accurate when there is other physical nodes above QueryStageExec.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the PR description, we have still issues for the other physical nodes?

Fix data issue. OptimizeOneRowPlan will use stats to remove Aggregate:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked other physical nodes and none of them seem to have this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, shall we remove this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keep it, new physical nodes may be added in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment should be still valid for aggregation with non-empty query stage as this pr only changes the empty case.

val physicalStats = physicalPlan.collectFirst {
case s: QueryStageExec => s
}.flatMap(_.computeStats())
case a: BaseAggregateExec if a.groupingExpressions.isEmpty =>
a.collectFirst {
case s: QueryStageExec => s.computeStats()
}.flatten.map { stat =>
if (stat.rowCount.contains(0)) stat.copy(rowCount = Some(1)) else stat
}
case s: QueryStageExec => s.computeStats()
}.flatten
if (physicalStats.isDefined) {
logDebug(s"Physical stats available as ${physicalStats.get} for plan: $physicalPlan")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2841,6 +2841,14 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-44040: Fix compute stats when AggregateExec nodes above QueryStageExec") {
val emptyDf = spark.range(1).where("false")
val aggDf1 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df1"))
val aggDf2 = emptyDf.agg(sum("id").as("id")).withColumn("name", lit("df2"))
val unionDF = aggDf1.union(aggDf2)
checkAnswer(unionDF.select("id").distinct, Seq(Row(null)))
}
}

/**
Expand Down