diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 99bc45fa9e9e8..1df812d1aa809 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -82,17 +82,30 @@ class QueryExecution( sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) } - lazy val sparkPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) { - // Clone the logical plan here, in case the planner rules change the states of the logical plan. - QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone()) + private def assertOptimized(): Unit = optimizedPlan + + lazy val sparkPlan: SparkPlan = { + // We need to materialize the optimizedPlan here because sparkPlan is also tracked under + // the planning phase + assertOptimized() + executePhase(QueryPlanningTracker.PLANNING) { + // Clone the logical plan here, in case the planner rules change the states of the logical + // plan. + QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone()) + } } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. - lazy val executedPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) { - // clone the plan to avoid sharing the plan instance between different stages like analyzing, - // optimizing and planning. - QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) + lazy val executedPlan: SparkPlan = { + // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure + // that the optimization time is not counted as part of the planning phase. + assertOptimized() + executePhase(QueryPlanningTracker.PLANNING) { + // clone the plan to avoid sharing the plan instance between different stages like analyzing, + // optimizing and planning. + QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala index 987338cf6cbbf..5ff459513e848 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -58,4 +58,13 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest { StopStream) } + test("The start times should be in order: parsing <= analysis <= optimization <= planning") { + val df = spark.sql("select count(*) from range(1)") + df.queryExecution.executedPlan + val phases = df.queryExecution.tracker.phases + assert(phases("parsing").startTimeMs <= phases("analysis").startTimeMs) + assert(phases("analysis").startTimeMs <= phases("optimization").startTimeMs) + assert(phases("optimization").startTimeMs <= phases("planning").startTimeMs) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index b29e822add8bc..7ddf9d87a6aca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.test.SQLTestUtils /** @@ -237,7 +238,7 @@ object SparkPlanTest { * @param spark SqlContext used for execution of the plan */ def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = { - val execution = new QueryExecution(spark.sparkSession, null) { + val execution = new QueryExecution(spark.sparkSession, LocalRelation(Nil)) { override lazy val sparkPlan: SparkPlan = outputPlan transform { case plan: SparkPlan => val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap