Skip to content

Commit 1849fe3

Browse files
author
Feynman Liang
committed
Move internal accumulator into a private SparkContext method
1 parent 9629ce0 commit 1849fe3

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,13 +1233,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12331233
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
12341234
* driver can access the accumulator's `value`.
12351235
*/
1236-
def accumulator[T](initialValue: T, name: String, internal: Boolean = false)(
1237-
implicit param: AccumulatorParam[T]): Accumulator[T] = {
1236+
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
1237+
: Accumulator[T] = {
12381238
val acc = new Accumulator(initialValue, param, Some(name))
12391239
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
12401240
acc
12411241
}
12421242

1243+
/**
1244+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
1245+
* in the Spark UI, and which reports its values to the driver via heartbeats. Tasks can "add"
1246+
* values to the accumulator using the `+=` method. Only the driver can access the accumulator's
1247+
* `value`.
1248+
*/
1249+
private[spark] def internalAccumulator[T](initialValue: T, name: String)(
1250+
implicit param: AccumulatorParam[T]): Accumulator[T] = {
1251+
val acc = new Accumulator(initialValue, param, Some(name), true)
1252+
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
1253+
acc
1254+
}
1255+
12431256
/**
12441257
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
12451258
* with `+=`. Only the driver can access the accumuable's `value`.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
5252
protected def sparkContext = sqlContext.sparkContext
5353

5454
protected val metricToAccumulator = Map(
55-
"numTuples"->sparkContext.accumulator(0L, "number of tuples", internal = true))
55+
"numTuples"->sparkContext.internalAccumulator(0L, "number of tuples"))
5656

5757
// sqlContext will be null when we are being deserialized on the slaves. In this instance
5858
// the value of codegenEnabled will be set by the desserializer after the constructor has run.

0 commit comments

Comments
 (0)