diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
index 0fb7dab39146b..bb393d9f2c638 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
@@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) {
// labelSeparator should be a non-graphical character in order not to affect the width of boxes.
var labelSeparator = "\x01";
-var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$";
+var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$";
/*
* Helper function to pre-process the graph layout.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 65aabe004d75b..1394e0f723733 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -116,7 +116,7 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
@@ -126,7 +126,7 @@ object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
@@ -134,7 +134,7 @@ object SQLMetrics {
def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
@@ -150,7 +150,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
- acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
@@ -169,11 +169,11 @@ object SQLMetrics {
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
- // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task $taskId))
+ // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
val stringMetric = if (maxMetrics.isEmpty) {
"(driver)"
} else {
- s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task ${maxMetrics(3)})"
+ s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
}
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index d304369ca4f78..76bc7faf18d01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
- Show the Stage (Stage Attempt): Task ID that corresponds to the max metric
+ Show the Stage ID and Task ID that corresponds to the max metric
val metrics = sqlStore.executionMetrics(executionId)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 7d09577075d5d..11f93c8836500 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
- "duration total (min, med, max (stageId (attemptId): taskId))" -> {
+ "duration total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
@@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
@@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))
val shuffleExpected2 = Map(
@@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId" +
- " (attemptId): taskId))")
+ val probes =
+ metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))")
// Extract min, med, max from the string and strip off everthing else.
val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
@@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
- "sort time total (min, med, max (stageId (attemptId): taskId))" -> {
+ "sort time total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
},
- "peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
+ "peak memory total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
},
- "spill size total (min, med, max (stageId (attemptId): taskId))" -> {
+ "spill size total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
})))
))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 0c1148f7b82e4..766e7a9748510 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore
- // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0
- // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
+ // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0.0:
+ // task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
}
- // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3 (attempt
- // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
+ // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0):
+ // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
}
// Pattern of size SQLMetric value for Aggregate tests.
- // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )"
+ // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
protected val aggregateMetricsPattern = {
val iters = "([0-9]+(\\.[0-9]+)?)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
}
@@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
val totalNumBytesMetric = executedNode.metrics.find(
- _.name == "written output total (min, med, max (stageId (attemptId): taskId))").get
+ _.name == "written output total (min, med, max (stageId: taskId))").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)