Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,23 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(name), countFailedValues = false)
acc
}

def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// duration(min, med, max):
// duration total (min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(name), countFailedValues = false)
acc
}

def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(name), countFailedValues = false)
acc
}

Expand All @@ -150,8 +147,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(name), countFailedValues = false)
acc
}

Expand All @@ -164,13 +160,15 @@ object SQLMetrics {
metricsType != SUM_METRIC
}

private val METRICS_NAME_SUFFIX = "(min, med, max (stageId: taskId))"

/**
* A function that defines how we aggregate the final accumulator results among all tasks,
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
// stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
val stringMetric = if (maxMetrics.isEmpty) {
// taskInfo = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
val taskInfo = if (maxMetrics.isEmpty) {
"(driver)"
} else {
s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
Expand All @@ -180,18 +178,20 @@ object SQLMetrics {
numberFormat.format(values.sum)
} else if (metricsType == AVERAGE_METRIC) {
val validValues = values.filter(_ > 0)
val Seq(min, med, max) = {
val metric = if (validValues.isEmpty) {
val zeros = Seq.fill(3)(0L)
zeros.map(v => toNumberFormat(v))
} else {
// When there are only 1 metrics value (or None), no need to display max/min/median. This is
// common for driver-side SQL metrics.
if (validValues.length <= 1) {
toNumberFormat(validValues.headOption.getOrElse(0))
} else {
val Seq(min, med, max) = {
Arrays.sort(validValues)
Seq(toNumberFormat(validValues(0)), toNumberFormat(validValues(validValues.length / 2)),
s"${toNumberFormat(validValues(validValues.length - 1))} $stringMetric")
Seq(
toNumberFormat(validValues(0)),
toNumberFormat(validValues(validValues.length / 2)),
toNumberFormat(validValues(validValues.length - 1)))
}
metric
s"$METRICS_NAME_SUFFIX:\n($min, $med, $max $taskInfo)"
}
s"\n($min, $med, $max)"
} else {
val strFormat: Long => String = if (metricsType == SIZE_METRIC) {
Utils.bytesToString
Expand All @@ -204,19 +204,21 @@ object SQLMetrics {
}

val validValues = values.filter(_ >= 0)
val Seq(sum, min, med, max) = {
val metric = if (validValues.isEmpty) {
val zeros = Seq.fill(4)(0L)
zeros.map(v => strFormat(v))
} else {
// When there are only 1 metrics value (or None), no need to display max/min/median. This is
// common for driver-side SQL metrics.
if (validValues.length <= 1) {
strFormat(validValues.headOption.getOrElse(0))
} else {
val Seq(sum, min, med, max) = {
Arrays.sort(validValues)
Seq(strFormat(validValues.sum), strFormat(validValues(0)),
Seq(
strFormat(validValues.sum),
strFormat(validValues(0)),
strFormat(validValues(validValues.length / 2)),
s"${strFormat(validValues(validValues.length - 1))} $stringMetric")
strFormat(validValues(validValues.length - 1)))
}
metric
s"total $METRICS_NAME_SUFFIX\n$sum ($min, $med, $max $taskInfo)"
}
s"\n$sum ($min, $med, $max)"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question: In this case, it seems that we didn't have $taskInfo previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ private[ui] class SparkPlanGraphNode(
metric <- metrics
value <- metricsValue.get(metric.accumulatorId)
} yield {
metric.name + ": " + value
// The value may contain ":" to extend the name, like `total (min, med, max): ...`
if (value.contains(":")) {
metric.name + " " + value
} else {
metric.name + ": " + value
}
}

if (values.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
"duration total (min, med, max (stageId: taskId))" -> {
"duration" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
Expand All @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
Expand All @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern))

val shuffleExpected2 = Map(
Expand Down Expand Up @@ -181,12 +181,17 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
val probes =
metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))")
// Extract min, med, max from the string and strip off everthing else.
val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
probe => assert(probe.toDouble > 1.0)
val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString
if (!probes.contains("\n")) {
// It's a single metrics value
assert(probes.toDouble > 1.0)
} else {
val mainValue = probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")")
// Extract min, med, max from the string and strip off everthing else.
val index = mainValue.indexOf(" (", 0)
mainValue.slice(0, index).split(", ").foreach {
probe => assert(probe.toDouble > 1.0)
}
}
}
}
Expand Down Expand Up @@ -231,13 +236,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max (stageId: taskId))" -> {
"sort time" -> {
_.toString.matches(timingMetricPattern)
},
"peak memory total (min, med, max (stageId: taskId))" -> {
"peak memory" -> {
_.toString.matches(sizeMetricPattern)
},
"spill size total (min, med, max (stageId: taskId))" -> {
"spill size" -> {
_.toString.matches(sizeMetricPattern)
})))
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
s"(.*\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\))|($bytes)"
}

// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0):
// task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
// task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" OR "1.0 ms"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
s"(.*\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\))|($duration)"
}

// Pattern of size SQLMetric value for Aggregate tests.
// e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
// e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" OR "1"
protected val aggregateMetricsPattern = {
val iters = "([0-9]+(\\.[0-9]+)?)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
s"(.*\\n\\($iters, $iters, $iters( $maxMetrics)?\\))|($iters)"
}

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}

val totalNumBytesMetric = executedNode.metrics.find(
_.name == "written output total (min, med, max (stageId: taskId))").get
_.name == "written output").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,14 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
expected.foreach { case (id, value) =>
// The values in actual can be SQL metrics meaning that they contain additional formatting
// when converted to string. Verify that they start with the expected value.
// TODO: this is brittle. There is no requirement that the actual string needs to start
// with the accumulator value.
assert(actual.contains(id))
val v = actual(id).trim
assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
if (v.contains("\n")) {
// The actual value can be "total (max, ...)\n6 ms (5 ms, ...)".
assert(v.split("\n")(1).startsWith(value.toString), s"Wrong value for accumulator $id")
} else {
assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
}
}
}

Expand Down