Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ class QueryPlanningTracker {
ret
}

/**
* print out the timeSpent for each phase of a SQL
*/
def acquireParsingTime(): String = {
val timeSpentSummary: StringBuilder = new StringBuilder()
Seq(QueryPlanningTracker.PARSING, QueryPlanningTracker.ANALYSIS,
QueryPlanningTracker.OPTIMIZATION, QueryPlanningTracker.PLANNING).foreach { phase =>
val duration = phasesMap.getOrDefault(phase, new PhaseSummary(-1, -1)).durationMs
timeSpentSummary.append(s"$phase: $duration ms\n")
}
timeSpentSummary.toString()
}

/**
* Record a specific invocation of a rule.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object SQLExecution {
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
time = System.currentTimeMillis(),
queryExecution.tracker.acquireParsingTime(),
redactedConfigs))
body
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ private[ui] class ExecutionPagedTable(
("Description", true, None),
("Submitted", true, None),
("Duration", true, Some("Time from query submission to completion (or if still executing," +
"time since submission)"))) ++ {
"time since submission)")),
("ParsingTime", true, Some("Time spent for parsing SQL on every phase " +
"about parse, analyze, optimize, planning"))) ++ {
if (showRunningJobs && showSucceededJobs && showFailedJobs) {
Seq(
("Running Job IDs", true, None),
Expand Down Expand Up @@ -293,6 +295,9 @@ private[ui] class ExecutionPagedTable(
<td sorttable_customkey={duration.toString}>
{UIUtils.formatDuration(duration)}
</td>
<td>
{parsingTimeCell(executionUIData)}
</td>
{if (showRunningJobs) {
<td>
{jobLinks(executionTableRow.runningJobData)}
Expand Down Expand Up @@ -334,6 +339,28 @@ private[ui] class ExecutionPagedTable(
<div>{desc}{details}</div>
}

private def parsingTimeCell(execution: SQLExecutionUIData): Seq[Node] = {
val details = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) {
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stage-details collapsed">
<pre>{execution.parsingTime}</pre>
</div>
} else {
Nil
}

val desc = if (execution.parsingTime != null && execution.parsingTime.nonEmpty) {
{execution.parsingTime.substring(0, execution.parsingTime.indexOf('\n'))}
} else {
{"No ParsingTime"}
}

<div>{desc}{details}</div>
}

private def jobURL(request: HttpServletRequest, jobId: Long): String =
"%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId)

Expand Down Expand Up @@ -406,6 +433,7 @@ private[ui] class ExecutionDataSource(
case "Description" => Ordering.by(_.executionUIData.description)
case "Submitted" => Ordering.by(_.executionUIData.submissionTime)
case "Duration" => Ordering.by(_.duration)
case "ParsingTime" => Ordering.by(_.executionUIData.parsingTime)
case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption)
case "Running Job IDs" => Ordering.by(_.runningJobData.headOption)
case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class SQLAppStatusListener(
executionData.metrics = sqlStoreData.metrics
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.parsingTime = sqlStoreData.parsingTime
executionData.jobs = sqlStoreData.jobs
executionData.stages = sqlStoreData.stages
executionData.metricsValues = sqlStoreData.metricValues
Expand Down Expand Up @@ -337,7 +338,7 @@ class SQLAppStatusListener(

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event
physicalPlanDescription, sparkPlanInfo, time, parsingTime, modifiedConfigs) = event

val planGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
Expand All @@ -357,6 +358,7 @@ class SQLAppStatusListener(
exec.modifiedConfigs = modifiedConfigs
exec.metrics = sqlPlanMetrics
exec.submissionTime = time
exec.parsingTime = parsingTime
update(exec)
}

Expand Down Expand Up @@ -485,6 +487,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var metrics = Seq[SQLPlanMetric]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var parsingTime: String = null

var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
Expand All @@ -506,6 +509,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metrics,
submissionTime,
completionTime,
parsingTime,
jobs,
stages,
metricsValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SQLExecutionUIData(
val metrics: Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
val parsingTime: String,
@JsonDeserialize(keyAs = classOf[Integer])
val jobs: Map[Int, JobExecutionStatus],
@JsonDeserialize(contentAs = classOf[Integer])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ case class SparkListenerSQLExecutionStart(
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long,
parsingTime: String,
modifiedConfigs: Map[String, String] = Map.empty)
extends SparkListenerEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
| "metrics":[]
| },
| "time":0,
| "parsingTime":"",
| "modifiedConfigs": {
| "k1":"v1"
| }
Expand All @@ -61,11 +62,12 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
if (newExecutionStartEvent) {
val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail",
"test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
Map("k1" -> "v1"))
"", Map("k1" -> "v1"))
assert(reconstructedEvent == expectedEvent)
} else {
val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail",
"test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
"test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil),
0, "")
assert(reconstructedEvent == expectedOldEvent)
}
}
Expand Down Expand Up @@ -103,5 +105,6 @@ private case class OldVersionSQLExecutionStart(
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long)
time: Long,
parsingTime: String)
extends SparkListenerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite {

// Start SQL Execution
listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan",
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty))
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time,
"parsingTime", Map.empty))

time += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {

// Verifying with finished SQL execution 1
assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1",
"plan", null, 0, Map.empty)))
"plan", null, 0, "parsingTime", Map.empty)))
assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0)))
assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null)))
assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty)))
Expand Down Expand Up @@ -89,7 +89,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {

// Verifying with live SQL execution 2
assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2",
"plan", null, 0, Map.empty)))
"plan", null, 0, "parsingTime", Map.empty)))
assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0)))
assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null)))
assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
df.queryExecution.tracker.acquireParsingTime(),
Map.empty))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
getClass().getName(),
planInfo,
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty)

val executionEnd = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))

listener.onJobStart(SparkListenerJobStart(
Expand Down Expand Up @@ -346,7 +347,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) =>
case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _, _) =>
assert(expected.forall(planDescription.contains))
checkDone = true
case _ => // ignore other events
Expand Down Expand Up @@ -389,6 +390,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -419,6 +421,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -460,6 +463,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -490,6 +494,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
Expand Down Expand Up @@ -521,6 +526,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))

var stageId = 0
Expand Down Expand Up @@ -661,6 +667,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time,
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
Expand All @@ -670,6 +677,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time,
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))

// Stop execution 2 before execution 1
Expand All @@ -687,6 +695,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time,
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))
assert(statusStore.executionsCount === 2)
assert(statusStore.execution(2) === None)
Expand Down Expand Up @@ -723,6 +732,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
df.queryExecution.toString,
oldPlan,
System.currentTimeMillis(),
"parsing: 0 ms\nanalysis: 34 ms\noptimization: 71 ms\nplanning: 153 ms",
Map.empty))

listener.onJobStart(SparkListenerJobStart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object SqlResourceSuite {
metrics = metrics,
submissionTime = 1586768888233L,
completionTime = Some(new Date(1586768888999L)),
parsingTime = "parsing: 0 ms\nanalysis: 34 ms\noptimization:71 ms\nplanning: 153 ms",
jobs = Map[Int, JobExecutionStatus](
0 -> JobExecutionStatus.SUCCEEDED,
1 -> JobExecutionStatus.SUCCEEDED),
Expand Down