diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 35551d8ba77d..bfc09d1aaf4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -120,6 +120,19 @@ class QueryPlanningTracker { ret } + /** + * print out the timeSpent for each phase of a SQL + */ + def acquireParsingTime(): String = { + val timeSpentSummary: StringBuilder = new StringBuilder() + Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS, + QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase => + val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs + timeSpentSummary.append(s"$phase: $duration ms\n") + } + timeSpentSummary.toString() + } + /** * Record a specific invocation of a rule. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 748f75b18626..ea520d64c6c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -105,6 +105,7 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), + queryExecution.tracker.acquireParsingTime(), redactedConfigs)) body } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 71bfc98b9eeb..1f30ad22990c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -248,7 +248,9 @@ private[ui] class ExecutionPagedTable( ("Description", true, None), ("Submitted", true, None), ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { + "time since submission)")), + ("ParsingTime", true, Some("Time spent for parsing SQL on every phase " + + "about parse, analyze, optimize, planning"))) ++ { if (showRunningJobs && showSucceededJobs && showFailedJobs) { Seq( ("Running Job IDs", true, None), @@ -293,6 +295,9 @@ private[ui] class ExecutionPagedTable( {UIUtils.formatDuration(duration)} + + {parsingTimeCell(executionUIData)} + {if (showRunningJobs) { {jobLinks(executionTableRow.runningJobData)} @@ -334,6 +339,28 @@ private[ui] class ExecutionPagedTable(
{desc}{details}
} + private def parsingTimeCell(execution: SQLExecutionUIData): Seq[Node] = { + val details = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + + +details + ++ + + } else { + Nil + } + + val desc = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + {execution.parsingTime.substring(0, execution.parsingTime.indexOf('\n'))} + } else { + {"No ParsingTime"} + } + +
{desc}{details}
+ } + private def jobURL(request: HttpServletRequest, jobId: Long): String = "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) @@ -406,6 +433,7 @@ private[ui] class ExecutionDataSource( case "Description" => Ordering.by(_.executionUIData.description) case "Submitted" => Ordering.by(_.executionUIData.submissionTime) case "Duration" => Ordering.by(_.duration) + case "ParsingTime" => Ordering.by(_.executionUIData.parsingTime) case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption) case "Running Job IDs" => Ordering.by(_.runningJobData.headOption) case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index d892dbdc2316..a9c538afbbbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -97,6 +97,7 @@ class SQLAppStatusListener( executionData.metrics = sqlStoreData.metrics executionData.submissionTime = sqlStoreData.submissionTime executionData.completionTime = sqlStoreData.completionTime + executionData.parsingTime = sqlStoreData.parsingTime executionData.jobs = sqlStoreData.jobs executionData.stages = sqlStoreData.stages executionData.metricsValues = sqlStoreData.metricValues @@ -337,7 +338,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event + physicalPlanDescription, sparkPlanInfo, time, parsingTime, modifiedConfigs) = event val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => @@ -357,6 +358,7 @@ class SQLAppStatusListener( exec.modifiedConfigs = modifiedConfigs exec.metrics = sqlPlanMetrics exec.submissionTime = time + exec.parsingTime = parsingTime update(exec) } @@ -485,6 +487,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var metrics = Seq[SQLPlanMetric]() var submissionTime = -1L var completionTime: Option[Date] = None + var parsingTime: String = null var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() @@ -506,6 +509,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metrics, submissionTime, completionTime, + parsingTime, jobs, stages, metricsValues) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 7c3315e3d76e..5af81deacd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -90,6 +90,7 @@ class SQLExecutionUIData( val metrics: Seq[SQLPlanMetric], val submissionTime: Long, val completionTime: Option[Date], + val parsingTime: String, @JsonDeserialize(keyAs = classOf[Integer]) val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 26805e135b77..3c539c1400eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -48,6 +48,7 @@ case class SparkListenerSQLExecutionStart( physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, time: Long, + parsingTime: String, modifiedConfigs: Map[String, String] = Map.empty) extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 55f171342249..226784f5c547 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -51,6 +51,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { | "metrics":[] | }, | "time":0, + | "parsingTime":"", | "modifiedConfigs": { | "k1":"v1" | } @@ -61,11 +62,12 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { if (newExecutionStartEvent) { val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - Map("k1" -> "v1")) + "", Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) } else { val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), + 0, "") assert(reconstructedEvent == expectedOldEvent) } } @@ -103,5 +105,6 @@ private case class OldVersionSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, - time: Long) + time: Long, + parsingTime: String) extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 090c149886a8..943ef3356d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -58,7 +58,8 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { // Start SQL Execution listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", - new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) + new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, + "parsingTime", Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index 724df8ebe8bf..870d24fc2e14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -42,7 +42,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with finished SQL execution 1 assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) @@ -89,7 +89,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with live SQL execution 2 assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 1f5cbb0e19ec..db0b8b0a7fcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -113,6 +113,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + df.queryExecution.tracker.acquireParsingTime(), Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index aa3988ae37e4..b03298e5bbfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -80,6 +80,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { getClass().getName(), planInfo, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty) val executionEnd = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9b5b532d3ecd..7c9e2effb667 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -199,6 +199,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( @@ -346,7 +347,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) => + case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _, _) => assert(expected.forall(planDescription.contains)) checkDone = true case _ => // ignore other events @@ -389,6 +390,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -419,6 +421,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -460,6 +463,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -490,6 +494,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) @@ -521,6 +526,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) var stageId = 0 @@ -661,6 +667,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( @@ -670,6 +677,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) // Stop execution 2 before execution 1 @@ -687,6 +695,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) @@ -723,6 +732,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, oldPlan, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 11201aadf67f..e9e90a23b526 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -89,6 +89,7 @@ object SqlResourceSuite { metrics = metrics, submissionTime = 1586768888233L, completionTime = Some(new Date(1586768888999L)), + parsingTime = "parsing: 0 ms\nanalysis: 34 ms\noptimization:71 ms\nplanning: 153 ms", jobs = Map[Int, JobExecutionStatus]( 0 -> JobExecutionStatus.SUCCEEDED, 1 -> JobExecutionStatus.SUCCEEDED),