Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit c24bc5b

Browse files
committed
Add comments
1 parent bd7dcf1 commit c24bc5b

File tree

4 files changed

+26
-3
lines changed

4 files changed

+26
-3
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ import org.apache.spark.util.Utils
4040
* @param initialValue initial value of accumulator
4141
* @param param helper object defining how to add elements of type `R` and `T`
4242
* @param name human-readable name for use in Spark's web UI
43+
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
44+
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
45+
* thread safe so that they can be reported correctly.
4346
* @tparam R the full accumulated data (result type)
4447
* @tparam T partial data that can be added in
4548
*/
@@ -70,8 +73,9 @@ class Accumulable[R, T] private[spark] (
7073
Accumulators.register(this)
7174

7275
/**
73-
* Internal accumulators will be reported via heartbeats. For internal accumulators, `R` must be
74-
* thread safe so that they can be reported correctly.
76+
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
77+
* via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
78+
* reported correctly.
7579
*/
7680
private[spark] def isInternal: Boolean = internal
7781

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,21 @@ abstract class TaskContext extends Serializable {
153153
*/
154154
private[spark] def taskMemoryManager(): TaskMemoryManager
155155

156+
/**
157+
* Register an accumulator that belongs to this task. Accumulators must call this method when
158+
* deserializing in executors.
159+
*/
156160
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
157161

162+
/**
163+
* Return the local values of internal accumulators that belong to this task. The key of the Map
164+
* is the accumulator id and the value of the Map is the latest accumulator local value.
165+
*/
158166
private[spark] def collectInternalAccumulators(): Map[Long, Any]
159167

168+
/**
169+
* Return the local values of accumulators that belong to this task. The key of the Map is the
170+
* accumulator id and the value of the Map is the latest accumulator local value.
171+
*/
160172
private[spark] def collectAccumulators(): Map[Long, Any]
161173
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ class TaskMetrics extends Serializable {
231231
_accumulatorUpdates = _accumulatorsUpdater()
232232
}
233233

234+
/**
235+
* Return the latest updates of accumulators in this task.
236+
*/
234237
def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates
235238

236239
private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = {

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,18 @@ import org.apache.spark.util.Utils
4545
*/
4646
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
4747

48+
/**
49+
* The key of the Map is the accumulator id and the value of the Map is the latest accumulator
50+
* local value.
51+
*/
4852
type AccumulatorUpdates = Map[Long, Any]
4953

5054
/**
5155
* Called by [[Executor]] to run this task.
5256
*
5357
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
5458
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
55-
* @return the result of the task
59+
* @return the result of the task along with updates of Accumulators.
5660
*/
5761
final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = {
5862
context = new TaskContextImpl(

0 commit comments

Comments
 (0)