Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
param: AccumulableParam[R, T],
val name: Option[String])
extends Serializable {

val id = Accumulators.newId
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)

val id: Long = Accumulators.newId

@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false
Expand Down Expand Up @@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
extends Accumulable[T,T](initialValue, param)
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,15 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name))
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be (maybe you were planning to add it later since I know this is still WIP) a similar new accumulable method?

* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand All @@ -766,6 +775,16 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name))

/**
* Create an accumulator from a "mutable collection" type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,40 +414,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue, name)(IntAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
intAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)


/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
: Accumulator[T] =
sc.accumulator(initialValue, name)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
: Accumulable[T, R] =
sc.accumulable(initialValue, name)(param)

/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
*/
@DeveloperApi
class AccumulableInfo (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
val value: String) { }

object AccumulableInfo {
def apply(id: Long, name: String, update: Option[String], value: String) =
new AccumulableInfo(id, name, update, value)

def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
}
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,14 @@ class DAGScheduler(
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
Expand All @@ -809,12 +815,27 @@ class DAGScheduler(
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
runningStages -= stage
}

event.reason match {
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this will make things hard to debug -- e.g., if someone's accumulator doesn't show up in the UI and they don't realize it's because the value wasn't updated as opposed to because they didn't set the show-in-ui variable correctly?

if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = "%s".format(partialValue)
val stringValue = "%s".format(acc.value)
stageToInfos(stage).accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to a method on the Accumulators companion object or something? These details about AccumulableInfo, prettyPartialValues, etc. aren't things that need to appear in the DAGScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra yeah we can move these elsewhere, good idea.

listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}
pendingTasks(stage) -= task
task match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo

Expand All @@ -37,6 +39,8 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
/** Terminal values of accumulables updated during this stage. */
val accumulables = HashMap[Long, AccumulableInfo]()

def stageFailed(reason: String) {
failureReason = Some(reason)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.ListBuffer

import org.apache.spark.annotation.DeveloperApi

/**
Expand All @@ -41,6 +43,13 @@ class TaskInfo(
*/
var gettingResultTime: Long = 0

/**
* Intermediate updates to accumulables during this task. Note that it is valid for the same
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different ID's to exist in a task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: no apostrophe in IDs

*/
val accumulables = ListBuffer[AccumulableInfo]()

/**
* The time when the task has completed successfully (including the time to remotely fetch
* results, if necessary).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.{HashMap, ListBuffer, Map}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -48,6 +48,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {

// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
val stageIdToAccumulables = HashMap[Int, Map[Long, AccumulableInfo]]()
val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
Expand All @@ -73,6 +74,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val stageId = stage.stageId
// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)

val emptyMap = HashMap[Long, AccumulableInfo]()
val accumulables = stageIdToAccumulables.getOrElseUpdate(stageId, emptyMap)
for ((id, info) <- stageCompleted.stageInfo.accumulables) {
accumulables(id) = info
}

activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
Expand All @@ -89,6 +97,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
stageIdToAccumulables.remove(s.stageId)
stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
Expand Down Expand Up @@ -147,6 +156,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val info = taskEnd.taskInfo

if (info != null) {
val emptyMap = HashMap[Long, AccumulableInfo]()
val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, emptyMap)
for (accumulableInfo <- info.accumulables) {
accumulables(accumulableInfo.id) = accumulableInfo
}

// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
Expand Down
Loading