Skip to content

Commit 52e4326

Browse files
cloud-fanRobert Kruszewski
authored andcommitted
[SPARK-12837][SPARK-20666][CORE][FOLLOWUP] getting name should not fail if accumulator is garbage collected
## What changes were proposed in this pull request? After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#17931 from cloud-fan/bug.
1 parent f8c1a39 commit 52e4326

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
8484
* Returns the name of this accumulator, can only be called after registration.
8585
*/
8686
final def name: Option[String] = {
87+
assertMetadataNotNull()
88+
8789
if (atDriverSide) {
88-
AccumulatorContext.get(id).flatMap(_.metadata.name)
90+
metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name))
8991
} else {
90-
assertMetadataNotNull()
9192
metadata.name
9293
}
9394
}
@@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
165166
}
166167
val copyAcc = copyAndReset()
167168
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
168-
val isInternalAcc =
169-
(name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) ||
170-
getClass.getSimpleName == "SQLMetric"
169+
val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
171170
if (isInternalAcc) {
172171
// Do not serialize the name of internal accumulator and send it to executor.
173172
copyAcc.metadata = metadata.copy(name = None)
174173
} else {
174+
// For non-internal accumulators, we still need to send the name because users may need to
175+
// access the accumulator name at executor side, or they may keep the accumulators sent from
176+
// executors and access the name when the registered accumulator is already garbage
177+
// collected(e.g. SQLMetrics).
175178
copyAcc.metadata = metadata
176179
}
177180
copyAcc

0 commit comments

Comments
 (0)