Skip to content

Commit 391e593

Browse files
cloud-fandongjoon-hyun
authored andcommitted
[SPARK-49038][SQL][3.5] SQLMetric should report the raw value in the accumulator update event
backport #47721 to 3.5 Some `SQLMetrics` set the initial value to `-1`, so that we can recognize no-update metrics (e.g. there is no input data and the metric is not updated at all) and filter them out later in the UI. However, there is a bug here. Spark turns accumulator updates into `AccumulableInfo`, using `AccumulatorV2#value`. To avoid exposing the internal `-1` value to end users, `SQLMetric#value` turns `-1` into `0` before returning the value. See more details in #39311 . UI can no longer see `-1` and filter them out. This PR fixes the bug by using the raw value of `SQLMetric` to create `AccumulableInfo`, so that UI can still see `-1` and filters it. To avoid getting the wrong min value for certain SQL metrics when some partitions have no data. Yes, if people write spark listeners to watch the `SparkListenerExecutorMetricsUpdate` event, they can see the correct value of SQL metrics. manual UI tests. We do not have an end-to-end UI test framework for SQL metrics yet. no Closes #47749 from cloud-fan/branch-3.5. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit bd2cbd6) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 758d18e commit 391e593

File tree

9 files changed

+27
-7
lines changed

9 files changed

+27
-7
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ private[spark] class Executor(
478478
// Collect latest accumulator values to report back to the driver
479479
val accums: Seq[AccumulatorV2[_, _]] =
480480
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
481-
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
481+
val accUpdates = accums.map(acc => acc.toInfoUpdate)
482482

483483
setTaskFinishedAndClearInterruptStatus()
484484
(accums, accUpdates)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ private[spark] class TaskSchedulerImpl(
898898
executorRunTime = acc.value.asInstanceOf[Long]
899899
}
900900
}
901-
acc.toInfo(Some(acc.value), None)
901+
acc.toInfoUpdate
902902
}
903903
val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
904904
getTaskProcessRate(recordsRead, executorRunTime)

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
102102
metadata.countFailedValues
103103
}
104104

105+
private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
106+
105107
/**
106108
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
107109
* values.
108110
*/
109111
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
110-
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
111112
AccumulableInfo(id, name, internOption(update), internOption(value), isInternal,
112113
countFailedValues)
113114
}
114115

116+
/**
117+
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update.
118+
*/
119+
private[spark] def toInfoUpdate: AccumulableInfo = {
120+
AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues)
121+
}
122+
115123
final private[spark] def isAtDriverSide: Boolean = atDriverSide
116124

117125
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ private[spark] object JsonProtocol {
13651365
val accumUpdates = jsonOption(json.get("Accumulator Updates"))
13661366
.map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
13671367
.getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
1368-
acc.toInfo(Some(acc.value), None)
1368+
acc.toInfoUpdate
13691369
}).toArray.toSeq)
13701370
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
13711371
case `taskResultLost` => TaskResultLost

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private[spark] object AccumulatorSuite {
113113
* Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the
114114
* info as an accumulator update.
115115
*/
116-
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
116+
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate
117117

118118
/**
119119
* Run one or more Spark jobs and verify that in at least one job the peak execution memory

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
9090
AccumulableInfo(id, name, internOption(update), internOption(value), true, true,
9191
SQLMetrics.cachedSQLAccumIdentifier)
9292
}
93+
94+
// We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly
95+
// filter out the invalid -1 values.
96+
override def toInfoUpdate: AccumulableInfo = {
97+
AccumulableInfo(id, name, internOption(Some(_value)), None, true, true,
98+
SQLMetrics.cachedSQLAccumIdentifier)
99+
}
93100
}
94101

95102
object SQLMetrics {

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class SQLAppStatusListener(
181181
event.taskMetrics.withExternalAccums(_.flatMap { a =>
182182
// This call may fail if the accumulator is gc'ed, so account for that.
183183
try {
184-
Some(a.toInfo(Some(a.value), None))
184+
Some(a.toInfoUpdate)
185185
} catch {
186186
case _: IllegalAccessError => None
187187
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
896896
}))))
897897
)
898898
}
899+
900+
test("SQLMetric#toInfoUpdate") {
901+
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1))
902+
assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0))
903+
}
899904
}
900905

901906
case class CustomFileCommitProtocol(

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ object InputOutputMetricsHelper {
312312

313313
var maxOutputRows = 0L
314314
taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
315-
val info = accum.toInfo(Some(accum.value), None)
315+
val info = accum.toInfoUpdate
316316
if (info.name.toString.contains("number of output rows")) {
317317
info.update match {
318318
case Some(n: Number) =>

0 commit comments

Comments
 (0)