Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,17 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T, T](initialValue, param, name) {
class Accumulator[T] private[spark] (
@transient initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false)

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = this(
initialValue, param, name, false)
}

/**
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
acc
}

/**
* Create an internal [[org.apache.spark.Accumulator]] variable of a given type, with a name for
* display in the Spark UI. Unlike a non-internal accumulator, this acccumulator reports its
* values to the driver via heartbeats. Tasks can "add" values to the accumulator using the `+=`
* method. Only the driver can access the accumulator's `value`.
*
* @tparam T type that can be added to the accumulator, must be thread safe
*/
private[spark] def internalAccumulator[T](initialValue: T, name: String)(
implicit param: AccumulatorParam[T]): Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name), true)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.execution

import org.apache.spark.Accumulator
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.Attribute


/**
* Physical plan node for scanning data from a local collection.
*/
Expand All @@ -32,7 +32,10 @@ private[sql] case class LocalTableScan(

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

protected override def doExecute(): RDD[InternalRow] = rdd
protected override def doExecute(): RDD[InternalRow] = {
accumulators("numTuples").asInstanceOf[Accumulator[Long]] += rdd.count()
rdd
}

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.{Accumulator, Logging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
Expand Down Expand Up @@ -51,6 +51,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

protected def sparkContext = sqlContext.sparkContext

protected val metricToAccumulator = Map(
"numTuples" -> sparkContext.internalAccumulator(0L, "number of tuples"))

// sqlContext will be null when we are being deserialized on the slaves. In this instance
// the value of codegenEnabled will be set by the desserializer after the constructor has run.
val codegenEnabled: Boolean = if (sqlContext != null) {
Expand Down Expand Up @@ -91,6 +94,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
def canProcessSafeRows: Boolean = true

/**
* Returns accumulators for metrics. The key of the Map is the metric's name and the value is the
* current value of the metric.
*/
def accumulators: Map[String, Accumulator[_]] = metricToAccumulator

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.{HashPartitioner, SparkEnv}
import org.apache.spark.{Accumulator, HashPartitioner, SparkEnv}

/**
* :: DeveloperApi ::
Expand All @@ -43,7 +43,11 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
val reusableProjection = buildProjection()
iter.map(reusableProjection)
val numTuplesAccumulator = accumulators("numTuples").asInstanceOf[Accumulator[Long]]
iter.map { row =>
numTuplesAccumulator += 1
reusableProjection(row)
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class CachedTableSuite extends QueryTest {
val accsSize = Accumulators.originals.size
ctx.cacheTable("t1")
ctx.cacheTable("t2")
assert((accsSize + 2) == Accumulators.originals.size)
assert((accsSize + 2*4) === Accumulators.originals.size)
}

sql("SELECT * FROM t1").count()
Expand All @@ -321,7 +321,7 @@ class CachedTableSuite extends QueryTest {
val accsSize = Accumulators.originals.size
ctx.uncacheTable("t1")
ctx.uncacheTable("t2")
assert((accsSize - 2) == Accumulators.originals.size)
assert((accsSize - 2) === Accumulators.originals.size)
}
}
}