Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) {

// labelSeparator should be a non-graphical character in order not to affect the width of boxes.
var labelSeparator = "\x01";
var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$";
var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$";

/*
* Helper function to pre-process the graph layout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
Expand All @@ -126,15 +126,15 @@ object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}

def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
Expand All @@ -150,7 +150,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
Expand All @@ -169,11 +169,11 @@ object SQLMetrics {
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
// stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task $taskId))
// stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
val stringMetric = if (maxMetrics.isEmpty) {
"(driver)"
} else {
s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task ${maxMetrics(3)})"
s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
}
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
</div>
<div>
<input type="checkbox" id="stageId-and-taskId-checkbox"></input>
<span>Show the Stage (Stage Attempt): Task ID that corresponds to the max metric</span>
<span>Show the Stage ID and Task ID that corresponds to the max metric</span>
</div>

val metrics = sqlStore.executionMetrics(executionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
"duration total (min, med, max (stageId (attemptId): taskId))" -> {
"duration total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
Expand All @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
Expand All @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))

val shuffleExpected2 = Map(
Expand Down Expand Up @@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId" +
" (attemptId): taskId))")
val probes =
metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))")
// Extract min, med, max from the string and strip off everthing else.
val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
Expand Down Expand Up @@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max (stageId (attemptId): taskId))" -> {
"sort time total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
},
"peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
"peak memory total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
},
"spill size total (min, med, max (stageId (attemptId): taskId))" -> {
"spill size total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
})))
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils {

protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore

// Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0
// (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
// Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0.0:
// task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
}

// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3 (attempt
// 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0):
// task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
}

// Pattern of size SQLMetric value for Aggregate tests.
// e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )"
// e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
protected val aggregateMetricsPattern = {
val iters = "([0-9]+(\\.[0-9]+)?)"
val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
}

Expand Down Expand Up @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}

val totalNumBytesMetric = executedNode.metrics.find(
_.name == "written output total (min, med, max (stageId (attemptId): taskId))").get
_.name == "written output total (min, med, max (stageId: taskId))").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)
Expand Down