Skip to content

Commit 1a4c9de

Browse files
author
Feynman Liang
committed
Add accumulators to SparkPlan and initial impls
Prototype implementations included in a LeafNode (LocalTableScan) and a UnaryNode (basicOperators.project). Will extend to other SparkPlan ops after initial review.
1 parent d4c7a7a commit 1a4c9de

File tree

5 files changed

+33
-15
lines changed

5 files changed

+33
-15
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,17 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
248248
* @param param helper object defining how to add elements of type `T`
249249
* @tparam T result type
250250
*/
251-
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
252-
extends Accumulable[T, T](initialValue, param, name) {
251+
class Accumulator[T](
252+
@transient initialValue: T,
253+
param: AccumulatorParam[T],
254+
name: Option[String],
255+
internal: Boolean)
256+
extends Accumulable[T, T](initialValue, param, name, internal) {
257+
258+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false)
253259

254-
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
260+
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = this(
261+
initialValue, param, name, false)
255262
}
256263

257264
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,8 +1233,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12331233
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
12341234
* driver can access the accumulator's `value`.
12351235
*/
1236-
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
1237-
: Accumulator[T] = {
1236+
def accumulator[T](initialValue: T, name: String, internal: Boolean = false)(
1237+
implicit param: AccumulatorParam[T]): Accumulator[T] = {
12381238
val acc = new Accumulator(initialValue, param, Some(name))
12391239
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
12401240
acc

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.spark.Accumulator
2021
import org.apache.spark.rdd.RDD
2122
import org.apache.spark.sql.Row
22-
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
23+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2324
import org.apache.spark.sql.catalyst.expressions.Attribute
2425

25-
2626
/**
2727
* Physical plan node for scanning data from a local collection.
2828
*/
@@ -32,7 +32,10 @@ private[sql] case class LocalTableScan(
3232

3333
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
3434

35-
protected override def doExecute(): RDD[InternalRow] = rdd
35+
protected override def doExecute(): RDD[InternalRow] = {
36+
accumulators("numTuples").asInstanceOf[Accumulator[Long]] += rdd.count()
37+
rdd
38+
}
3639

3740

3841
override def executeCollect(): Array[Row] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.apache.spark.Logging
22+
import org.apache.spark.{Accumulator, Logging}
2323
import org.apache.spark.annotation.DeveloperApi
2424
import org.apache.spark.rdd.{RDD, RDDOperationScope}
2525
import org.apache.spark.sql.SQLContext
@@ -91,6 +91,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
9191
*/
9292
def canProcessSafeRows: Boolean = true
9393

94+
/**
95+
* Returns instrumentation metrics. The key of the Map is the metric's name and the value is the
96+
* current value of the metric.
97+
*/
98+
def accumulators: Map[String, Accumulator[_]] = Map(
99+
"numTuples"->sparkContext.accumulator(0L, "number of tuples", internal = true))
100+
94101
/**
95102
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
96103
* after adding query plan information to created RDDs for visualization.

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,19 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.types.StructType
2120
import org.apache.spark.annotation.DeveloperApi
2221
import org.apache.spark.rdd.{RDD, ShuffledRDD}
2322
import org.apache.spark.shuffle.sort.SortShuffleManager
24-
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.CatalystTypeConverters
23+
import org.apache.spark.sql.Row
2624
import org.apache.spark.sql.catalyst.errors._
2725
import org.apache.spark.sql.catalyst.expressions._
28-
import org.apache.spark.sql.Row
2926
import org.apache.spark.sql.catalyst.plans.physical._
27+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
28+
import org.apache.spark.sql.types.StructType
3029
import org.apache.spark.util.collection.ExternalSorter
3130
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
3231
import org.apache.spark.util.{CompletionIterator, MutablePair}
33-
import org.apache.spark.{HashPartitioner, SparkEnv}
32+
import org.apache.spark.{Accumulator, HashPartitioner, SparkEnv}
3433

3534
/**
3635
* :: DeveloperApi ::
@@ -43,7 +42,9 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
4342

4443
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
4544
val reusableProjection = buildProjection()
46-
iter.map(reusableProjection)
45+
val result = iter.map(reusableProjection)
46+
accumulators("numTuples").asInstanceOf[Accumulator[Long]] += result.length
47+
result
4748
}
4849

4950
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

0 commit comments

Comments
 (0)