Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -219,10 +220,19 @@ case class AdaptiveSparkPlanExec(
}

private def getExecutionId: Option[Long] = {
// If the `QueryExecution` does not match the current execution ID, it means the execution ID
// belongs to another (parent) query, and we should not call update UI in this query.
Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
.map(_.toLong)
}

private def isCurrentExecution(executionId: Option[Long]): Boolean = {
// If the `QueryExecution` does not match the current execution ID, it means the execution ID
// belongs to another (parent) query, and we should call update metrics instead of plan in
// this query.
executionId.exists(SQLExecution.getQueryExecution(_) eq context.qe)
}

private def shouldUpdateMetrics(executionId: Option[Long]): Boolean = {
isSubquery || !isCurrentExecution(executionId)
}

def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
Expand All @@ -235,6 +245,7 @@ case class AdaptiveSparkPlanExec(
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
val shouldUpdateSQLMetrics = shouldUpdateMetrics(executionId)
// Use inputPlan logicalLink here in case some top level physical nodes may be removed
// during `initialPlan`
var currentLogicalPlan = inputPlan.logicalLink.get
Expand All @@ -246,7 +257,8 @@ case class AdaptiveSparkPlanExec(
currentPhysicalPlan = result.newPlan
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
executionId.foreach(onUpdatePlanAndMetrics(
_, result.newStages.map(_.plan), shouldUpdateSQLMetrics))

// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting
// for tasks to be scheduled and leading to broadcast timeout.
Expand Down Expand Up @@ -334,18 +346,34 @@ case class AdaptiveSparkPlanExec(
postStageCreationRules(supportsColumnar),
Some((planChangeLogger, "AQE Post Stage Creation")))
_isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
executionId.foreach(onUpdatePlanAndMetrics(
_, Seq(currentPhysicalPlan), shouldUpdateSQLMetrics))
currentPhysicalPlan
}
}

private def shouldUpdateFinalPlan(executionId: Long): Boolean = {
// Update the final plan to avoid:
// 1. Subqueries that don't belong to any query stage of the main query will execute after the
// last UI update in `getFinalPhysicalPlan`.
// 2. InMemoryTableScanExec may contain an another AdaptiveSparkPlanExec that changes query plan
// after `getFinalPhysicalPlan`.
// So we need to update UI here again to make sure the newly generated nodes of those are
// updated.
!isSubquery && isCurrentExecution(Some(executionId)) && currentPhysicalPlan.exists {
case c: InMemoryTableScanExec
if c.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
case p if p.subqueries.nonEmpty => true
case _ => false
}
}

// Use a lazy val to avoid this being called more than once.
@transient private lazy val finalPlanUpdate: Unit = {
// Subqueries that don't belong to any query stage of the main query will execute after the
// last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure
// the newly generated nodes of those subqueries are updated.
if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) {
getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
getExecutionId.foreach { executionId =>
if (shouldUpdateFinalPlan(executionId)) {
onUpdatePlanAndMetrics(executionId, Seq.empty)
}
}
logOnLevel(s"Final plan:\n$currentPhysicalPlan")
}
Expand Down Expand Up @@ -703,10 +731,16 @@ case class AdaptiveSparkPlanExec(
}

/**
* Notify the listeners of the physical plan change.
* Notify the listeners of the physical plan change and metrics change.
*/
private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = {
if (isSubquery) {
private def onUpdatePlanAndMetrics(
executionId: Long,
newSubPlans: Seq[SparkPlan],
shouldUpdateSQLMetrics: Boolean = false): Unit = {
if (shouldUpdateSQLMetrics) {
// Update SQL metrics for two cases:
// 1. This is inside InMemoryTableScanExec
// 2. This is a subquery
// When executing subqueries, we can't update the query plan in the UI as the
// UI doesn't support partial update yet. However, the subquery may have been
// optimized into a different plan and we must let the UI know the SQL metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, Exchange, REPARTITION_BY_COL, REPARTITION_BY_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
Expand All @@ -59,6 +60,7 @@ class AdaptiveQueryExecSuite

private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = {
var finalPlanCnt = 0
var hasMetricsEvent = false
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
Expand All @@ -67,6 +69,8 @@ class AdaptiveQueryExecSuite
"AdaptiveSparkPlan isFinalPlan=true")) {
finalPlanCnt += 1
}
case _: SparkListenerSQLAdaptiveSQLMetricUpdates =>
hasMetricsEvent = true
case _ => // ignore other events
}
}
Expand All @@ -83,13 +87,19 @@ class AdaptiveQueryExecSuite
}
val planAfter = dfAdaptive.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val adaptiveSparkPlanExec = planAfter.asInstanceOf[AdaptiveSparkPlanExec]
val adaptivePlan = adaptiveSparkPlanExec.executedPlan

spark.sparkContext.listenerBus.waitUntilEmpty()
// AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that
// AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries/IMR that
// exist out of query stages.
val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1)
val subqueriesOrIMR = findInMemoryTable(adaptiveSparkPlanExec).nonEmpty ||
adaptivePlan.exists(_.subqueries.nonEmpty)
val expectedFinalPlanCnt = if (subqueriesOrIMR) 2 else 1
assert(finalPlanCnt == expectedFinalPlanCnt)
val expectedMetrics = findInMemoryTable(adaptiveSparkPlanExec).nonEmpty ||
subqueriesAll(adaptiveSparkPlanExec).nonEmpty
assert(hasMetricsEvent == expectedMetrics)
spark.sparkContext.removeSparkListener(listener)

val exchanges = adaptivePlan.collect {
Expand Down Expand Up @@ -160,6 +170,13 @@ class AdaptiveQueryExecSuite
}
}

private def findInMemoryTable(plan: SparkPlan): Seq[InMemoryTableScanExec] = {
collect(plan) {
case c: InMemoryTableScanExec
if c.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => c
}
}

private def checkNumLocalShuffleReads(
plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
val numShuffles = collect(plan) {
Expand Down Expand Up @@ -2700,6 +2717,21 @@ class AdaptiveQueryExecSuite
assert(df.rdd.getNumPartitions == 3)
}
}

test("SPARK-41214: Fix AQE cache does not update plan and metrics") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdaptiveSparkPlan nodes are being injected for following use-cases:
1- Parent Query level as root node of SparkPlan,
2- AQE under InMemoryRelation,
3- SubQueries.
Does it makes sense to have UT also including both subQuery + AQE under IMR cases?

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
val arr = Seq(
(1, "Employee_1", "Department_1"),
(2, "Employee_2", "Department_2"))
val df = arr.toDF("id", "name", "department").filter($"id" < 3).groupBy($"name").count()
df.cache().createOrReplaceTempView("v1")
val arr2 = Seq((1, "Employee_1", "Department_1"))
val df2 = arr2.toDF("id", "name", "department").filter($"id" > 0).groupBy($"name").count()
df2.cache().createOrReplaceTempView("v2")

runAdaptiveAndVerifyResult("SELECT * FROM v1 JOIN v2 on v1.name = v2.name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashAggregateExec nodes metrics were coming as empty before:
https://issues.apache.org/jira/secure/attachment/13052914/DAG%20when%20AQE%3DON%20and%20AQECachedDFSupport%3DON%20without%20fix.png

Does it make sense to verify also HashAggregateExec metric(s) (coming before InMemoryRelation nodes) to support robustness? For example: HashAggregateExec - number of output rows does not change per test run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test check metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added one more assert with SparkListenerSQLAdaptiveSQLMetricUpdates

}
}
}

/**
Expand Down