Skip to content

Commit f135e0c

Browse files
committed
[SPARK-49038][SQL] SQLMetric should report the raw value in the accumulator update event
Some `SQLMetrics` set the initial value to `-1`, so that we can recognize no-update metrics (e.g. there is no input data and the metric is not updated at all) and filter them out later in the UI. However, there is a bug here. Spark turns accumulator updates into `AccumulableInfo`, using `AccumulatorV2#value`. To avoid exposing the internal `-1` value to end users, `SQLMetric#value` turns `-1` into `0` before returning the value. See more details in apache#39311 . UI can no longer see `-1` and filter them out. This PR fixes the bug by using the raw value of `SQLMetric` to create `AccumulableInfo`, so that UI can still see `-1` and filters it. To avoid getting the wrong min value for certain SQL metrics when some partitions have no data. Yes, if people write spark listeners to watch the `SparkListenerExecutorMetricsUpdate` event, they can see the correct value of SQL metrics. manual UI tests. We do not have an end-to-end UI test framework for SQL metrics yet. no Closes apache#47721 from cloud-fan/metrics. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 5463bfc commit f135e0c

File tree

9 files changed

+27
-7
lines changed

9 files changed

+27
-7
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ private[spark] class Executor(
530530
// Collect latest accumulator values to report back to the driver
531531
val accums: Seq[AccumulatorV2[_, _]] =
532532
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
533-
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
533+
val accUpdates = accums.map(acc => acc.toInfoUpdate)
534534

535535
setTaskFinishedAndClearInterruptStatus()
536536
(accums, accUpdates)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ private[spark] class TaskSchedulerImpl(
901901
executorRunTime = acc.value.asInstanceOf[Long]
902902
}
903903
}
904-
acc.toInfo(Some(acc.value), None)
904+
acc.toInfoUpdate
905905
}
906906
val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
907907
getTaskProcessRate(recordsRead, executorRunTime)

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
102102
metadata.countFailedValues
103103
}
104104

105+
private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
106+
105107
/**
106108
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
107109
* values.
108110
*/
109111
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
110-
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
111112
AccumulableInfo(id, name, internOption(update), internOption(value), isInternal,
112113
countFailedValues)
113114
}
114115

116+
/**
117+
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update.
118+
*/
119+
private[spark] def toInfoUpdate: AccumulableInfo = {
120+
AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues)
121+
}
122+
115123
final private[spark] def isAtDriverSide: Boolean = atDriverSide
116124

117125
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1350,7 +1350,7 @@ private[spark] object JsonProtocol extends JsonUtils {
13501350
val accumUpdates = jsonOption(json.get("Accumulator Updates"))
13511351
.map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
13521352
.getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
1353-
acc.toInfo(Some(acc.value), None)
1353+
acc.toInfoUpdate
13541354
}).toArray.toSeq)
13551355
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
13561356
case `taskResultLost` => TaskResultLost

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private[spark] object AccumulatorSuite {
147147
* Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the
148148
* info as an accumulator update.
149149
*/
150-
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
150+
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate
151151

152152
/**
153153
* Run one or more Spark jobs and verify that in at least one job the peak execution memory

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
9090
AccumulableInfo(id, name, internOption(update), internOption(value), true, true,
9191
SQLMetrics.cachedSQLAccumIdentifier)
9292
}
93+
94+
// We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly
95+
// filter out the invalid -1 values.
96+
override def toInfoUpdate: AccumulableInfo = {
97+
AccumulableInfo(id, name, internOption(Some(_value)), None, true, true,
98+
SQLMetrics.cachedSQLAccumIdentifier)
99+
}
93100
}
94101

95102
object SQLMetrics {

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class SQLAppStatusListener(
181181
event.taskMetrics.withExternalAccums(_.flatMap { a =>
182182
// This call may fail if the accumulator is gc'ed, so account for that.
183183
try {
184-
Some(a.toInfo(Some(a.value), None))
184+
Some(a.toInfoUpdate)
185185
} catch {
186186
case _: IllegalAccessError => None
187187
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
960960
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero())
961961
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isZero())
962962
}
963+
964+
test("SQLMetric#toInfoUpdate") {
965+
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1))
966+
assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0))
967+
}
963968
}
964969

965970
case class CustomFileCommitProtocol(

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ object InputOutputMetricsHelper {
312312

313313
var maxOutputRows = 0L
314314
taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
315-
val info = accum.toInfo(Some(accum.value), None)
315+
val info = accum.toInfoUpdate
316316
if (info.name.toString.contains("number of output rows")) {
317317
info.update match {
318318
case Some(n: Number) =>

0 commit comments

Comments
 (0)