From 006cdf3262a20ebb8d3c01673f205cb6f6e44626 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 27 Mar 2020 03:16:33 +0900 Subject: [PATCH] Fix the metrics format for taskId and attemptId. --- .../sql/execution/ui/static/spark-sql-viz.js | 2 +- .../sql/execution/metric/SQLMetrics.scala | 12 +++++------ .../sql/execution/ui/ExecutionPage.scala | 2 +- .../execution/metric/SQLMetricsSuite.scala | 20 +++++++++---------- .../metric/SQLMetricsTestUtils.scala | 18 ++++++++--------- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js index 0fb7dab39146b..bb393d9f2c638 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js @@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) { // labelSeparator should be a non-graphical character in order not to affect the width of boxes. var labelSeparator = "\x01"; -var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$"; +var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$"; /* * Helper function to pre-process the graph layout. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 65aabe004d75b..1394e0f723733 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -116,7 +116,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -126,7 +126,7 @@ object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -134,7 +134,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -150,7 +150,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) - acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -169,11 +169,11 @@ object SQLMetrics { * and represent it in string for a SQL physical operator. */ def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = { - // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task $taskId)) + // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId) val stringMetric = if (maxMetrics.isEmpty) { "(driver)" } else { - s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task ${maxMetrics(3)})" + s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})" } if (metricsType == SUM_METRIC) { val numberFormat = NumberFormat.getIntegerInstance(Locale.US) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index d304369ca4f78..76bc7faf18d01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
- Show the Stage (Stage Attempt): Task ID that corresponds to the max metric + Show the Stage ID and Task ID that corresponds to the max metric
val metrics = sqlStore.executionMetrics(executionId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 7d09577075d5d..11f93c8836500 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val ds = spark.range(10).filter('id < 5) testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( 0L -> (("WholeStageCodegen (1)", Map( - "duration total (min, med, max (stageId (attemptId): taskId))" -> { + "duration total (min, med, max (stageId: taskId))" -> { _.toString.matches(timingMetricPattern) })))), true) } @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = testData2.groupBy().count() // 2 partitions val expected1 = Seq( Map("number of output rows" -> 2L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern), Map("number of output rows" -> 1L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern)) val shuffleExpected1 = Map( "records read" -> 2L, @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df2 = testData2.groupBy('a).count() val expected2 = Seq( Map("number of output rows" -> 4L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern), Map("number of output rows" -> 3L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern)) val shuffleExpected2 = Map( @@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { } val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get nodeIds.foreach { nodeId => - val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId" + - " (attemptId): taskId))") + val probes = + metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))") // Extract min, med, max from the string and strip off everthing else. val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0) probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach { @@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = Seq(1, 3, 2).toDF("id").sort('id) testSparkPlanMetricsWithPredicates(df, 2, Map( 0L -> (("Sort", Map( - "sort time total (min, med, max (stageId (attemptId): taskId))" -> { + "sort time total (min, med, max (stageId: taskId))" -> { _.toString.matches(timingMetricPattern) }, - "peak memory total (min, med, max (stageId (attemptId): taskId))" -> { + "peak memory total (min, med, max (stageId: taskId))" -> { _.toString.matches(sizeMetricPattern) }, - "spill size total (min, med, max (stageId (attemptId): taskId))" -> { + "spill size total (min, med, max (stageId: taskId))" -> { _.toString.matches(sizeMetricPattern) }))) )) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0c1148f7b82e4..766e7a9748510 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils { protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore - // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0 - // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)" + // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0.0: + // task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)" protected val sizeMetricPattern = { val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)" } - // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3 (attempt - // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" + // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0): + // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" protected val timingMetricPattern = { val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)" } // Pattern of size SQLMetric value for Aggregate tests. - // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )" + // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" protected val aggregateMetricsPattern = { val iters = "([0-9]+(\\.[0-9]+)?)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)" } @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { } val totalNumBytesMetric = executedNode.metrics.find( - _.name == "written output total (min, med, max (stageId (attemptId): taskId))").get + _.name == "written output total (min, med, max (stageId: taskId))").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "") .split(" ").head.trim.toDouble assert(totalNumBytes > 0)