Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,17 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T, T](initialValue, param, name) {
class Accumulator[T](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this constructor private[spark] so users can't set internal = true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@transient initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false)

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = this(
initialValue, param, name, false)
}

/**
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1234,12 +1234,25 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
: Accumulator[T] = {
: Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name))
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably give this a different Scaladoc in order to differentiate it from the non-internal Accumulator and explain what's different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will also document that T needs to be thread-safe since #7448

* in the Spark UI, and which reports its values to the driver via heartbeats. Tasks can "add"
* values to the accumulator using the `+=` method. Only the driver can access the accumulator's
* `value`.
*/
private[spark] def internalAccumulator[T](initialValue: T, name: String)(
implicit param: AccumulatorParam[T]): Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name), true)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.execution

import org.apache.spark.Accumulator
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.Attribute


/**
* Physical plan node for scanning data from a local collection.
*/
Expand All @@ -32,7 +32,10 @@ private[sql] case class LocalTableScan(

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

protected override def doExecute(): RDD[InternalRow] = rdd
protected override def doExecute(): RDD[InternalRow] = {
accumulators("numTuples").asInstanceOf[Accumulator[Long]] += rdd.count()
rdd
}


override def executeCollect(): Array[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.{Accumulator, Logging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
Expand Down Expand Up @@ -51,6 +51,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

protected def sparkContext = sqlContext.sparkContext

protected val metricToAccumulator = Map(
"numTuples"->sparkContext.internalAccumulator(0L, "number of tuples"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: space around ->

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


// sqlContext will be null when we are being deserialized on the slaves. In this instance
// the value of codegenEnabled will be set by the desserializer after the constructor has run.
val codegenEnabled: Boolean = if (sqlContext != null) {
Expand Down Expand Up @@ -91,6 +94,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
def canProcessSafeRows: Boolean = true

/**
* Returns accumulators for metrics. The key of the Map is the metric's name and the value is the
* current value of the metric.
*/
def accumulators: Map[String, Accumulator[_]] = metricToAccumulator

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.types.StructType
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.{HashPartitioner, SparkEnv}
import org.apache.spark.{Accumulator, HashPartitioner, SparkEnv}

/**
* :: DeveloperApi ::
Expand All @@ -43,7 +42,9 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
val reusableProjection = buildProjection()
iter.map(reusableProjection)
val result = iter.map(reusableProjection)
accumulators("numTuples").asInstanceOf[Accumulator[Long]] += result.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this drain the iterator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it is not safe to call multiple methods on the same one-shot iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching, fixed.

result
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Expand Down