Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ object StaticSQLConf {
.intConf
.createWithDefault(1000)

val DISPLAY_TASK_ID_FOR_MAX_METRIC =
buildStaticConf("spark.sql.ui.displayTaskInfoForMaxMetric")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make it an internal conf?

.doc("If turn on, Spark will display stageId-stageAttemptId-taskId of the max metrics to " +
"tell where the max value comes from. It's useful to help debug job quicker.")
.version("3.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 .
Since SPARK-31081 is filed as an Improvement, this should be 3.1.0.
If you want to consider this as a bug or regression at 3.0.0, you had better change JIRA first.

cc @rxin since he is the release manager of 3.0.0 (also cc @gatorsmile ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a UI issue to me. The SQL web UI is really hard to read now with the stage id stuff. Ideally, we should revisit #26843 and think of a better way to improve readability. But no one has proposed an idea yet.

This PR disables the stage id stuff, which seems like a good compromise for 3.0: we keep the UI unchanged, but for people who really need the stage id info, they can still enable it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making this 3.0. The UI changes in #26843 can be confusing to users after 3.0 release.

.booleanConf
.createWithDefault(false)

val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.util.{Arrays, Locale}

import scala.concurrent.duration._

import org.apache.spark.SparkContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}


Expand Down Expand Up @@ -116,8 +117,8 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(s"$name total (min, med, ${attachTaskId("max", sc.conf)})"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new indentation looks incorrect at this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also noted that, thanks.

countFailedValues = false)
acc
}

Expand All @@ -126,16 +127,16 @@ object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name total (min, med, ${attachTaskId("max", sc.conf)})"),
countFailedValues = false)
acc
}

def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"),
countFailedValues = false)
acc.register(sc, name = Some(s"$name total (min, med, ${attachTaskId("max", sc.conf)})"),
countFailedValues = false)
acc
}

Expand All @@ -150,7 +151,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"),
acc.register(sc, name = Some(s"$name (min, med, ${attachTaskId("max", sc.conf)})"),
countFailedValues = false)
acc
}
Expand All @@ -168,13 +169,7 @@ object SQLMetrics {
* A function that defines how we aggregate the final accumulator results among all tasks,
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
// stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task $taskId))
val stringMetric = if (maxMetrics.isEmpty) {
"(driver)"
} else {
s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task ${maxMetrics(3)})"
}
def stringValue(metricsType: String, values: Array[Long], taskId: String = ""): String = {
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
numberFormat.format(values.sum)
Expand All @@ -187,7 +182,7 @@ object SQLMetrics {
} else {
Arrays.sort(validValues)
Seq(toNumberFormat(validValues(0)), toNumberFormat(validValues(validValues.length / 2)),
s"${toNumberFormat(validValues(validValues.length - 1))} $stringMetric")
s"${toNumberFormat(validValues(validValues.length - 1))}$taskId")
}
metric
}
Expand All @@ -212,7 +207,7 @@ object SQLMetrics {
Arrays.sort(validValues)
Seq(strFormat(validValues.sum), strFormat(validValues(0)),
strFormat(validValues(validValues.length / 2)),
s"${strFormat(validValues(validValues.length - 1))} $stringMetric")
s"${strFormat(validValues(validValues.length - 1))}$taskId")
}
metric
}
Expand All @@ -233,4 +228,13 @@ object SQLMetrics {
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
}
}

private def attachTaskId(name: String, conf: SparkConf): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are attaching stageId and attemptId, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. I'm thinking about a better name, while adding stageId/attemptId makes me feel too long for the method name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attachTaskInfo? stage id is part of the task info which tells which stage the task comes from.

val display = conf.get(StaticSQLConf.DISPLAY_TASK_ID_FOR_MAX_METRIC)
if (display) {
s"$name (stageId (attemptId): taskId)"
} else {
name
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.config.Status._
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
import org.apache.spark.util.collection.OpenHashMap
Expand Down Expand Up @@ -209,6 +210,8 @@ class SQLAppStatusListener(

val allMetrics = new mutable.HashMap[Long, Array[Long]]()

val displayTaskId = conf.get(StaticSQLConf.DISPLAY_TASK_ID_FOR_MAX_METRIC)

val maxMetricsFromAllStages = new mutable.HashMap[Long, Array[Long]]()

taskMetrics.foreach { case (id, values) =>
Expand All @@ -221,27 +224,32 @@ class SQLAppStatusListener(
allMetrics(id) = updated
}

// Find the max for each metric id between all stages.
maxMetrics.foreach { case (id, value, taskId, stageId, attemptId) =>
val updated = maxMetricsFromAllStages.getOrElse(id, Array(value, stageId, attemptId, taskId))
if (value > updated(0)) {
updated(0) = value
updated(1) = stageId
updated(2) = attemptId
updated(3) = taskId
if (displayTaskId) {
// Find the max for each metric id between all stages.
maxMetrics.foreach { case (id, value, taskId, stageId, attemptId) =>
val updated = maxMetricsFromAllStages.getOrElse(id,
Array(value, stageId, attemptId, taskId))
if (value > updated(0)) {
updated(0) = value
updated(1) = stageId
updated(2) = attemptId
updated(3) = taskId
}
maxMetricsFromAllStages(id) = updated
}
maxMetricsFromAllStages(id) = updated
}

exec.driverAccumUpdates.foreach { case (id, value) =>
if (metricTypes.contains(id)) {
val prev = allMetrics.getOrElse(id, null)
val updated = if (prev != null) {
// If the driver updates same metrics as tasks and has higher value then remove
// that entry from maxMetricsFromAllStage. This would make stringValue function default
// to "driver" that would be displayed on UI.
if (maxMetricsFromAllStages.contains(id) && value > maxMetricsFromAllStages(id)(0)) {
maxMetricsFromAllStages.remove(id)
if (displayTaskId) {
// If the driver updates same metrics as tasks and has higher value then remove
// that entry from maxMetricsFromAllStage. This would make stringValue function default
// to "driver" that would be displayed on UI.
if (maxMetricsFromAllStages.contains(id) && value > maxMetricsFromAllStages(id)(0)) {
maxMetricsFromAllStages.remove(id)
}
}
val _copy = Arrays.copyOf(prev, prev.length + 1)
_copy(prev.length) = value
Expand All @@ -254,8 +262,18 @@ class SQLAppStatusListener(
}

val aggregatedMetrics = allMetrics.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values, maxMetricsFromAllStages.getOrElse(id,
Array.empty[Long]))
val taskId = if (displayTaskId) {
if (maxMetricsFromAllStages.contains(id)) {
val Seq(stageId, stageAttemptId, taskId) =
maxMetricsFromAllStages(id).slice(1, 4).toSeq
s" (stage $stageId (attempt $stageAttemptId): task $taskId)"
} else {
" (driver)"
}
} else {
""
}
id -> SQLMetrics.stringValue(metricTypes(id), values, taskId)
}.toMap

// Check the execution again for whether the aggregated metrics data has been calculated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
"duration total (min, med, max (stageId (attemptId): taskId))" -> {
"duration total (min, med, max)" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
Expand All @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max)" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max)" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
Expand All @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max)" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
"avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" ->
"avg hash probe bucket list iters (min, med, max)" ->
aggregateMetricsPattern))

val shuffleExpected2 = Map(
Expand Down Expand Up @@ -181,12 +181,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId" +
" (attemptId): taskId))")
val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max)")
// Extract min, med, max from the string and strip off everthing else.
val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
probe => assert(probe.toDouble > 1.0)
probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach {
probe => assert(probe.trim.toDouble > 1.0)
}
}
}
Expand Down Expand Up @@ -231,13 +229,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max (stageId (attemptId): taskId))" -> {
"sort time total (min, med, max)" -> {
_.toString.matches(timingMetricPattern)
},
"peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
"peak memory total (min, med, max)" -> {
_.toString.matches(sizeMetricPattern)
},
"spill size total (min, med, max (stageId (attemptId): taskId))" -> {
"spill size total (min, med, max)" -> {
_.toString.matches(sizeMetricPattern)
})))
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}

val totalNumBytesMetric = executedNode.metrics.find(
_.name == "written output total (min, med, max (stageId (attemptId): taskId))").get
_.name == "written output total (min, med, max)").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,9 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
val driverMetric = physicalPlan.metrics("dummy")
val driverMetric2 = physicalPlan.metrics("dummy2")
val expectedValue = SQLMetrics.stringValue(driverMetric.metricType,
Array(expectedAccumValue), Array.empty[Long])
Array(expectedAccumValue))
val expectedValue2 = SQLMetrics.stringValue(driverMetric2.metricType,
Array(expectedAccumValue2), Array.empty[Long])
Array(expectedAccumValue2))

assert(metrics.contains(driverMetric.id))
assert(metrics(driverMetric.id) === expectedValue)
Expand Down