diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4fc2d82ac4af..0f0e7cb3d897 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -550,7 +550,7 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfoUpdate) setTaskFinishedAndClearInterruptStatus() (accums, accUpdates) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ec678256a708..8e3cb1379339 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -875,7 +875,7 @@ private[spark] class TaskSchedulerImpl( executorRunTime = acc.value.asInstanceOf[Long] } } - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate } val taskProcessRate = if (efficientTaskCalcualtionEnabled) { getTaskProcessRate(recordsRead, executorRunTime) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 3237a321f1c3..383a89d40ece 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { metadata.countFailedValues } + private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + /** * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided * values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) AccumulableInfo(id, name, internOption(update), internOption(value), isInternal, countFailedValues) } + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update. + */ + private[spark] def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues) + } + final private[spark] def isAtDriverSide: Boolean = atDriverSide /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d2e21dbe3cbc..19cefbc0479a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1385,7 +1385,7 @@ private[spark] object JsonProtocol extends JsonUtils { val accumUpdates = jsonOption(json.get("Accumulator Updates")) .map(_.extractElements.map(accumulableInfoFromJson).toArray.toImmutableArraySeq) .getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => { - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate }).toArray.toImmutableArraySeq) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 9b70ccdf07e1..a9d7e8a0f2ed 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -147,7 +147,7 @@ private[spark] object AccumulatorSuite { * Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the * info as an accumulator update. */ - def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index a246b47fe655..f0c1c0900c7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -105,6 +105,13 @@ class SQLMetric( AccumulableInfo(id, name, internOption(update), internOption(value), true, true, SQLMetrics.cachedSQLAccumIdentifier) } + + // We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly + // filter out the invalid -1 values. + override def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(_value)), None, true, true, + SQLMetrics.cachedSQLAccumIdentifier) + } } object SQLMetrics { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index fdd38595306a..3c8c7edfeb06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -182,7 +182,7 @@ class SQLAppStatusListener( event.taskMetrics.withExternalAccums(_.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { - Some(a.toInfo(Some(a.value), None)) + Some(a.toInfoUpdate) } catch { case _: IllegalAccessError => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 45c775e6c463..0dd90925d3c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -959,6 +959,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5) } } + + test("SQLMetric#toInfoUpdate") { + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1)) + assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0)) + } } case class CustomFileCommitProtocol( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 2c5750962237..1df5f522b4b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -312,7 +312,7 @@ object InputOutputMetricsHelper { var maxOutputRows = 0L taskEnd.taskMetrics.withExternalAccums(_.foreach { accum => - val info = accum.toInfo(Some(accum.value), None) + val info = accum.toInfoUpdate if (info.name.toString.contains("number of output rows")) { info.update match { case Some(n: Number) =>