From 4e7d02b3881984d46ad3781f1ed29bdf7cb27072 Mon Sep 17 00:00:00 2001 From: caican Date: Fri, 19 Nov 2021 14:31:39 +0800 Subject: [PATCH 01/12] [SPARK-37383][SQL]Print the parsing time for each phase of a SQL --- .../sql/catalyst/QueryPlanningTracker.scala | 23 +++++++++++++++++-- .../spark/sql/execution/QueryExecution.scala | 4 +++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 35551d8ba77d..eaaa67bb7933 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst -import scala.collection.JavaConverters._ +import org.apache.spark.internal.Logging +import scala.collection.JavaConverters._ import org.apache.spark.util.BoundedPriorityQueue @@ -90,7 +91,7 @@ object QueryPlanningTracker { } -class QueryPlanningTracker { +class QueryPlanningTracker extends Logging { import QueryPlanningTracker._ @@ -120,6 +121,24 @@ class QueryPlanningTracker { ret } + /** + * print out the timeSpent for each phase of a SQL + */ + def logTimeSpent(): Unit = { + var totalTimeSpent = 0L + val timeSpentSummary: StringBuffer = new StringBuffer() + Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS, + QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase => + val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs + timeSpentSummary.append(s"phase: $phase, timeSpent: $duration ms\n") + totalTimeSpent += duration + } + logInfo( + s""" + |Query planning time spent:\n ${timeSpentSummary.toString} + |Total time spent: $totalTimeSpent""".stripMargin) + } + /** * Record a specific invocation of a rule. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index bb1b0ca3b645..55c9c658f112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -164,11 +164,13 @@ class QueryExecution( // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure // that the optimization time is not counted as part of the planning phase. assertOptimized() - executePhase(QueryPlanningTracker.PLANNING) { + val plan = executePhase(QueryPlanningTracker.PLANNING) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } + tracker.logTimeSpent() + plan } /** From 74e6d35f0c3972d3ef3e45a4579f854466d557c1 Mon Sep 17 00:00:00 2001 From: caican Date: Mon, 7 Mar 2022 12:59:50 +0800 Subject: [PATCH 02/12] [SPARK-37383][SQL]update --- .../spark/sql/catalyst/QueryPlanningTracker.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index eaaa67bb7933..aca40806e5e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql.catalyst -import org.apache.spark.internal.Logging - import scala.collection.JavaConverters._ -import org.apache.spark.util.BoundedPriorityQueue +import org.apache.spark.internal.Logging +import org.apache.spark.util.BoundedPriorityQueue /** * A simple utility for tracking runtime and associated stats in query planning. @@ -134,9 +133,9 @@ class QueryPlanningTracker extends Logging { totalTimeSpent += duration } logInfo( - s""" - |Query planning time spent:\n ${timeSpentSummary.toString} - |Total time spent: $totalTimeSpent""".stripMargin) + s"""Query planning time spent:\n ${timeSpentSummary.toString} + |Total time spent: $totalTimeSpent ms + """.stripMargin) } /** From d741418e4454ec810e246ee4503452a8a5f0e16b Mon Sep 17 00:00:00 2001 From: caican Date: Mon, 7 Mar 2022 13:36:44 +0800 Subject: [PATCH 03/12] [SPARK-37383][SQL]update --- .../org/apache/spark/sql/catalyst/QueryPlanningTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index aca40806e5e0..e2a9a81968c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -134,7 +134,7 @@ class QueryPlanningTracker extends Logging { } logInfo( s"""Query planning time spent:\n ${timeSpentSummary.toString} - |Total time spent: $totalTimeSpent ms + |Total time spent: $totalTimeSpent ms. """.stripMargin) } From fb2095b14f82b15e536d649b3e0ea264af7c27fe Mon Sep 17 00:00:00 2001 From: caican Date: Tue, 8 Mar 2022 10:23:48 +0800 Subject: [PATCH 04/12] [SPARK-37383][SQL]update --- .../org/apache/spark/sql/catalyst/QueryPlanningTracker.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index e2a9a81968c0..18a9532db438 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.util.BoundedPriorityQueue + /** * A simple utility for tracking runtime and associated stats in query planning. * @@ -125,7 +126,7 @@ class QueryPlanningTracker extends Logging { */ def logTimeSpent(): Unit = { var totalTimeSpent = 0L - val timeSpentSummary: StringBuffer = new StringBuffer() + val timeSpentSummary: StringBuilder = new StringBuilder() Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS, QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase => val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs From 0d6b33a0019705e0dfbb6d97aed0701fd516fb30 Mon Sep 17 00:00:00 2001 From: caican Date: Thu, 10 Mar 2022 20:43:34 +0800 Subject: [PATCH 05/12] [SPARK-37383][SQL]update --- .../sql/catalyst/QueryPlanningTracker.scala | 11 ++----- .../spark/sql/execution/QueryExecution.scala | 10 ++++-- .../sql/execution/ui/AllExecutionsPage.scala | 31 ++++++++++++++++++- .../execution/ui/SQLAppStatusListener.scala | 3 ++ .../sql/execution/ui/SQLAppStatusStore.scala | 1 + .../spark/sql/execution/ui/SQLListener.scala | 6 ++++ 6 files changed, 51 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 18a9532db438..54891969059d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -124,19 +124,14 @@ class QueryPlanningTracker extends Logging { /** * print out the timeSpent for each phase of a SQL */ - def logTimeSpent(): Unit = { - var totalTimeSpent = 0L + def acquireParsingTime(): String = { val timeSpentSummary: StringBuilder = new StringBuilder() Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS, QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase => val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs - timeSpentSummary.append(s"phase: $phase, timeSpent: $duration ms\n") - totalTimeSpent += duration + timeSpentSummary.append(s"$phase: $duration ms\n") } - logInfo( - s"""Query planning time spent:\n ${timeSpentSummary.toString} - |Total time spent: $totalTimeSpent ms. - """.stripMargin) + timeSpentSummary.toString() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 60b9b28b4a11..c925f1ec50f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, CTERelationDef, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} +import org.apache.spark.sql.execution.ui.SparkListenerSQLParsingEnd import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.util.Utils @@ -169,7 +170,12 @@ class QueryExecution( // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } - tracker.logTimeSpent() + sparkSession.sparkContext.listenerBus.post( + SparkListenerSQLParsingEnd( + sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong, + tracker.acquireParsingTime() + ) + ) plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 71bfc98b9eeb..34634f426812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -248,7 +248,9 @@ private[ui] class ExecutionPagedTable( ("Description", true, None), ("Submitted", true, None), ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { + "time since submission)")), + ("ParsingTime", true, Some("Time spent for parsing SQL on every phase " + + "about parse, analyze, optimize, planning"))) ++ { if (showRunningJobs && showSucceededJobs && showFailedJobs) { Seq( ("Running Job IDs", true, None), @@ -293,6 +295,9 @@ private[ui] class ExecutionPagedTable( {UIUtils.formatDuration(duration)} + + parsingTimeCell(executionUIData) + {if (showRunningJobs) { {jobLinks(executionTableRow.runningJobData)} @@ -334,6 +339,29 @@ private[ui] class ExecutionPagedTable(
{desc}{details}
} + private def parsingTimeCell(execution: SQLExecutionUIData): Seq[Node] = { + val details = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + + +details + ++ + + } else { + Nil + } + + val desc = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + {execution.parsingTime} + } else { + {"No ParsingTime"} + } + +
{desc}{details}
+ } + private def jobURL(request: HttpServletRequest, jobId: Long): String = "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) @@ -406,6 +434,7 @@ private[ui] class ExecutionDataSource( case "Description" => Ordering.by(_.executionUIData.description) case "Submitted" => Ordering.by(_.executionUIData.submissionTime) case "Duration" => Ordering.by(_.duration) + case "ParsingTime" => Ordering.by(_.executionUIData.parsingTime) case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption) case "Running Job IDs" => Ordering.by(_.runningJobData.headOption) case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index d892dbdc2316..33d17c5796fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -97,6 +97,7 @@ class SQLAppStatusListener( executionData.metrics = sqlStoreData.metrics executionData.submissionTime = sqlStoreData.submissionTime executionData.completionTime = sqlStoreData.completionTime + executionData.parsingTime = sqlStoreData.parsingTime executionData.jobs = sqlStoreData.jobs executionData.stages = sqlStoreData.stages executionData.metricsValues = sqlStoreData.metricValues @@ -485,6 +486,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var metrics = Seq[SQLPlanMetric]() var submissionTime = -1L var completionTime: Option[Date] = None + var parsingTime: String = null var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() @@ -506,6 +508,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metrics, submissionTime, completionTime, + parsingTime, jobs, stages, metricsValues) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 7c3315e3d76e..5af81deacd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -90,6 +90,7 @@ class SQLExecutionUIData( val metrics: Seq[SQLPlanMetric], val submissionTime: Long, val completionTime: Option[Date], + val parsingTime: String, @JsonDeserialize(keyAs = classOf[Integer]) val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 26805e135b77..134b865ff2e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -51,6 +51,12 @@ case class SparkListenerSQLExecutionStart( modifiedConfigs: Map[String, String] = Map.empty) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSQLParsingEnd( + executionId: Long, + parsingTime: String) + extends SparkListenerEvent + @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent { From d9dbce966fbf1f0ea81fca5923c4761696e46bcb Mon Sep 17 00:00:00 2001 From: caican Date: Thu, 10 Mar 2022 20:45:53 +0800 Subject: [PATCH 06/12] [SPARK-37383][SQL]update --- .../org/apache/spark/sql/catalyst/QueryPlanningTracker.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 54891969059d..bfc09d1aaf4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst import scala.collection.JavaConverters._ -import org.apache.spark.internal.Logging import org.apache.spark.util.BoundedPriorityQueue @@ -91,7 +90,7 @@ object QueryPlanningTracker { } -class QueryPlanningTracker extends Logging { +class QueryPlanningTracker { import QueryPlanningTracker._ From c7bb365f6f7eb94c04085c5e4ff2e2c0d344bb0e Mon Sep 17 00:00:00 2001 From: caican Date: Fri, 11 Mar 2022 11:04:29 +0800 Subject: [PATCH 07/12] [SPARK-37383][SQL]update --- .../apache/spark/sql/execution/QueryExecution.scala | 12 ++---------- .../apache/spark/sql/execution/SQLExecution.scala | 1 + .../sql/execution/ui/SQLAppStatusListener.scala | 3 ++- .../apache/spark/sql/execution/ui/SQLListener.scala | 7 +------ .../spark/sql/execution/SQLJsonProtocolSuite.scala | 2 +- .../history/SQLEventFilterBuilderSuite.scala | 3 ++- .../history/SQLLiveEntitiesEventFilterSuite.scala | 4 ++-- .../sql/execution/ui/AllExecutionsPageSuite.scala | 1 + .../execution/ui/MetricsAggregationBenchmark.scala | 1 + .../sql/execution/ui/SQLAppStatusListenerSuite.scala | 12 +++++++++++- 10 files changed, 24 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index c925f1ec50f1..1b089943a680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, CTERelationDef, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} -import org.apache.spark.sql.execution.ui.SparkListenerSQLParsingEnd import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.util.Utils @@ -165,18 +164,11 @@ class QueryExecution( // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure // that the optimization time is not counted as part of the planning phase. assertOptimized() - val plan = executePhase(QueryPlanningTracker.PLANNING) { + executePhase(QueryPlanningTracker.PLANNING) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } - sparkSession.sparkContext.listenerBus.post( - SparkListenerSQLParsingEnd( - sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong, - tracker.acquireParsingTime() - ) - ) - plan } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 748f75b18626..ea520d64c6c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -105,6 +105,7 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), + queryExecution.tracker.acquireParsingTime(), redactedConfigs)) body } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 33d17c5796fa..a9c538afbbbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -338,7 +338,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event + physicalPlanDescription, sparkPlanInfo, time, parsingTime, modifiedConfigs) = event val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => @@ -358,6 +358,7 @@ class SQLAppStatusListener( exec.modifiedConfigs = modifiedConfigs exec.metrics = sqlPlanMetrics exec.submissionTime = time + exec.parsingTime = parsingTime update(exec) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 134b865ff2e7..3c539c1400eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -48,15 +48,10 @@ case class SparkListenerSQLExecutionStart( physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, time: Long, + parsingTime: String, modifiedConfigs: Map[String, String] = Map.empty) extends SparkListenerEvent -@DeveloperApi -case class SparkListenerSQLParsingEnd( - executionId: Long, - parsingTime: String) - extends SparkListenerEvent - @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 55f171342249..980222614c45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -61,7 +61,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { if (newExecutionStartEvent) { val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - Map("k1" -> "v1")) + "parsingTime", Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) } else { val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 090c149886a8..943ef3356d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -58,7 +58,8 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { // Start SQL Execution listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", - new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) + new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, + "parsingTime", Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index 724df8ebe8bf..870d24fc2e14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -42,7 +42,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with finished SQL execution 1 assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) @@ -89,7 +89,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with live SQL execution 2 assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 1f5cbb0e19ec..db0b8b0a7fcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -113,6 +113,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + df.queryExecution.tracker.acquireParsingTime(), Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index aa3988ae37e4..b03298e5bbfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -80,6 +80,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { getClass().getName(), planInfo, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty) val executionEnd = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9b5b532d3ecd..7c9e2effb667 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -199,6 +199,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( @@ -346,7 +347,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) => + case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _, _) => assert(expected.forall(planDescription.contains)) checkDone = true case _ => // ignore other events @@ -389,6 +390,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -419,6 +421,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -460,6 +463,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -490,6 +494,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) @@ -521,6 +526,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) var stageId = 0 @@ -661,6 +667,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( @@ -670,6 +677,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) // Stop execution 2 before execution 1 @@ -687,6 +695,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) @@ -723,6 +732,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, oldPlan, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( From ea79c1e1b3164937d50d6d23c52a1e06bc3859f5 Mon Sep 17 00:00:00 2001 From: caican Date: Thu, 10 Mar 2022 20:43:34 +0800 Subject: [PATCH 08/12] [SPARK-37383][SQL]update --- .../sql/catalyst/QueryPlanningTracker.scala | 14 +++------ .../spark/sql/execution/QueryExecution.scala | 4 +-- .../spark/sql/execution/SQLExecution.scala | 1 + .../sql/execution/ui/AllExecutionsPage.scala | 31 ++++++++++++++++++- .../execution/ui/SQLAppStatusListener.scala | 6 +++- .../sql/execution/ui/SQLAppStatusStore.scala | 1 + .../spark/sql/execution/ui/SQLListener.scala | 1 + .../sql/execution/SQLJsonProtocolSuite.scala | 2 +- .../history/SQLEventFilterBuilderSuite.scala | 3 +- .../SQLLiveEntitiesEventFilterSuite.scala | 4 +-- .../execution/ui/AllExecutionsPageSuite.scala | 1 + .../ui/MetricsAggregationBenchmark.scala | 1 + .../ui/SQLAppStatusListenerSuite.scala | 12 ++++++- 13 files changed, 61 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 18a9532db438..bfc09d1aaf4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst import scala.collection.JavaConverters._ -import org.apache.spark.internal.Logging import org.apache.spark.util.BoundedPriorityQueue @@ -91,7 +90,7 @@ object QueryPlanningTracker { } -class QueryPlanningTracker extends Logging { +class QueryPlanningTracker { import QueryPlanningTracker._ @@ -124,19 +123,14 @@ class QueryPlanningTracker extends Logging { /** * print out the timeSpent for each phase of a SQL */ - def logTimeSpent(): Unit = { - var totalTimeSpent = 0L + def acquireParsingTime(): String = { val timeSpentSummary: StringBuilder = new StringBuilder() Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS, QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase => val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs - timeSpentSummary.append(s"phase: $phase, timeSpent: $duration ms\n") - totalTimeSpent += duration + timeSpentSummary.append(s"$phase: $duration ms\n") } - logInfo( - s"""Query planning time spent:\n ${timeSpentSummary.toString} - |Total time spent: $totalTimeSpent ms. - """.stripMargin) + timeSpentSummary.toString() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 60b9b28b4a11..1b089943a680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -164,13 +164,11 @@ class QueryExecution( // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure // that the optimization time is not counted as part of the planning phase. assertOptimized() - val plan = executePhase(QueryPlanningTracker.PLANNING) { + executePhase(QueryPlanningTracker.PLANNING) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } - tracker.logTimeSpent() - plan } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 748f75b18626..ea520d64c6c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -105,6 +105,7 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), + queryExecution.tracker.acquireParsingTime(), redactedConfigs)) body } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 71bfc98b9eeb..34634f426812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -248,7 +248,9 @@ private[ui] class ExecutionPagedTable( ("Description", true, None), ("Submitted", true, None), ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { + "time since submission)")), + ("ParsingTime", true, Some("Time spent for parsing SQL on every phase " + + "about parse, analyze, optimize, planning"))) ++ { if (showRunningJobs && showSucceededJobs && showFailedJobs) { Seq( ("Running Job IDs", true, None), @@ -293,6 +295,9 @@ private[ui] class ExecutionPagedTable( {UIUtils.formatDuration(duration)} + + parsingTimeCell(executionUIData) + {if (showRunningJobs) { {jobLinks(executionTableRow.runningJobData)} @@ -334,6 +339,29 @@ private[ui] class ExecutionPagedTable(
{desc}{details}
} + private def parsingTimeCell(execution: SQLExecutionUIData): Seq[Node] = { + val details = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + + +details + ++ + + } else { + Nil + } + + val desc = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { + {execution.parsingTime} + } else { + {"No ParsingTime"} + } + +
{desc}{details}
+ } + private def jobURL(request: HttpServletRequest, jobId: Long): String = "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) @@ -406,6 +434,7 @@ private[ui] class ExecutionDataSource( case "Description" => Ordering.by(_.executionUIData.description) case "Submitted" => Ordering.by(_.executionUIData.submissionTime) case "Duration" => Ordering.by(_.duration) + case "ParsingTime" => Ordering.by(_.executionUIData.parsingTime) case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption) case "Running Job IDs" => Ordering.by(_.runningJobData.headOption) case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index d892dbdc2316..a9c538afbbbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -97,6 +97,7 @@ class SQLAppStatusListener( executionData.metrics = sqlStoreData.metrics executionData.submissionTime = sqlStoreData.submissionTime executionData.completionTime = sqlStoreData.completionTime + executionData.parsingTime = sqlStoreData.parsingTime executionData.jobs = sqlStoreData.jobs executionData.stages = sqlStoreData.stages executionData.metricsValues = sqlStoreData.metricValues @@ -337,7 +338,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event + physicalPlanDescription, sparkPlanInfo, time, parsingTime, modifiedConfigs) = event val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => @@ -357,6 +358,7 @@ class SQLAppStatusListener( exec.modifiedConfigs = modifiedConfigs exec.metrics = sqlPlanMetrics exec.submissionTime = time + exec.parsingTime = parsingTime update(exec) } @@ -485,6 +487,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var metrics = Seq[SQLPlanMetric]() var submissionTime = -1L var completionTime: Option[Date] = None + var parsingTime: String = null var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() @@ -506,6 +509,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metrics, submissionTime, completionTime, + parsingTime, jobs, stages, metricsValues) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 7c3315e3d76e..5af81deacd82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -90,6 +90,7 @@ class SQLExecutionUIData( val metrics: Seq[SQLPlanMetric], val submissionTime: Long, val completionTime: Option[Date], + val parsingTime: String, @JsonDeserialize(keyAs = classOf[Integer]) val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 26805e135b77..3c539c1400eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -48,6 +48,7 @@ case class SparkListenerSQLExecutionStart( physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, time: Long, + parsingTime: String, modifiedConfigs: Map[String, String] = Map.empty) extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 55f171342249..980222614c45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -61,7 +61,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { if (newExecutionStartEvent) { val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - Map("k1" -> "v1")) + "parsingTime", Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) } else { val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 090c149886a8..943ef3356d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -58,7 +58,8 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { // Start SQL Execution listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", - new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) + new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, + "parsingTime", Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index 724df8ebe8bf..870d24fc2e14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -42,7 +42,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with finished SQL execution 1 assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) @@ -89,7 +89,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { // Verifying with live SQL execution 2 assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", - "plan", null, 0, Map.empty))) + "plan", null, 0, "parsingTime", Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 1f5cbb0e19ec..db0b8b0a7fcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -113,6 +113,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + df.queryExecution.tracker.acquireParsingTime(), Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index aa3988ae37e4..b03298e5bbfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -80,6 +80,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { getClass().getName(), planInfo, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty) val executionEnd = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9b5b532d3ecd..7c9e2effb667 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -199,6 +199,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( @@ -346,7 +347,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) => + case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _, _) => assert(expected.forall(planDescription.contains)) checkDone = true case _ => // ignore other events @@ -389,6 +390,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -419,6 +421,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -460,6 +463,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -490,6 +494,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) @@ -521,6 +526,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) var stageId = 0 @@ -661,6 +667,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( @@ -670,6 +677,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) // Stop execution 2 before execution 1 @@ -687,6 +695,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), time, + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) @@ -723,6 +732,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils df.queryExecution.toString, oldPlan, System.currentTimeMillis(), + "parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms", Map.empty)) listener.onJobStart(SparkListenerJobStart( From f3954908adbf054c633946cb5edeed25735578f8 Mon Sep 17 00:00:00 2001 From: caican Date: Fri, 11 Mar 2022 12:51:43 +0800 Subject: [PATCH 09/12] [SPARK-37383][SQL]update --- .../org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala | 1 + .../org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 980222614c45..22fa842b9e38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -51,6 +51,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { | "metrics":[] | }, | "time":0, + | "parsingTime":"", | "modifiedConfigs": { | "k1":"v1" | } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 11201aadf67f..e9e90a23b526 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -89,6 +89,7 @@ object SqlResourceSuite { metrics = metrics, submissionTime = 1586768888233L, completionTime = Some(new Date(1586768888999L)), + parsingTime = "parsing: 0 ms\nanalysis: 34 ms\noptimization:71 ms\nplanning: 153 ms", jobs = Map[Int, JobExecutionStatus]( 0 -> JobExecutionStatus.SUCCEEDED, 1 -> JobExecutionStatus.SUCCEEDED), From c7af8084454bf805e3cce4a8e6e3a672539d417d Mon Sep 17 00:00:00 2001 From: caican Date: Fri, 11 Mar 2022 15:02:24 +0800 Subject: [PATCH 10/12] [SPARK-37383][SQL]update --- .../apache/spark/sql/execution/ui/AllExecutionsPage.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 34634f426812..1f30ad22990c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -296,7 +296,7 @@ private[ui] class ExecutionPagedTable( {UIUtils.formatDuration(duration)} - parsingTimeCell(executionUIData) + {parsingTimeCell(executionUIData)} {if (showRunningJobs) { @@ -346,15 +346,14 @@ private[ui] class ExecutionPagedTable( +details ++ } else { Nil } val desc = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) { - {execution.parsingTime} + {execution.parsingTime.substring(0, execution.parsingTime.indexOf('\n'))} } else { {"No ParsingTime"} } From 2d42b10dadf622908bf061e4a69fa0e67a2c2469 Mon Sep 17 00:00:00 2001 From: caican Date: Mon, 14 Mar 2022 10:57:17 +0800 Subject: [PATCH 11/12] [SPARK-37383][SQL]update --- .../apache/spark/sql/execution/SQLJsonProtocolSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 22fa842b9e38..d3938c5143e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -66,7 +66,8 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { assert(reconstructedEvent == expectedEvent) } else { val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), + 0, "parsingTime") assert(reconstructedEvent == expectedOldEvent) } } @@ -104,5 +105,6 @@ private case class OldVersionSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, - time: Long) + time: Long, + parsingTime: String) extends SparkListenerEvent From 32f4efc0ed1eed2fc251971edfd2f9e829822f80 Mon Sep 17 00:00:00 2001 From: caican Date: Mon, 14 Mar 2022 14:01:06 +0800 Subject: [PATCH 12/12] [SPARK-37383][SQL]update --- .../org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index d3938c5143e5..226784f5c547 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -62,12 +62,12 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { if (newExecutionStartEvent) { val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - "parsingTime", Map("k1" -> "v1")) + "", Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) } else { val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), - 0, "parsingTime") + 0, "") assert(reconstructedEvent == expectedOldEvent) } }