diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index a65ec75cc5db..1a9a6929541a 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { * Returns the name of this accumulator, can only be called after registration. */ final def name: Option[String] = { + assertMetadataNotNull() + if (atDriverSide) { - AccumulatorContext.get(id).flatMap(_.metadata.name) + metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name)) } else { - assertMetadataNotNull() metadata.name } } @@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") - val isInternalAcc = - (name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) || - getClass.getSimpleName == "SQLMetric" + val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { + // For non-internal accumulators, we still need to send the name because users may need to + // access the accumulator name at executor side, or they may keep the accumulators sent from + // executors and access the name when the registered accumulator is already garbage + // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc