Skip to content

Commit ad85076

Browse files
committed
Example of using named accumulators for custom RDD metrics.
1 parent 0b72660 commit ad85076

File tree

3 files changed

+12
-0
lines changed

3 files changed

+12
-0
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
4747
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
48+
if (blockResult.inputMetrics.bytesRead > 0) {
49+
rdd.inputBytes += blockResult.inputMetrics.bytesRead
50+
}
4851
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
4952

5053
case None =>

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ class HadoopRDD[K, V](
119119
minPartitions)
120120
}
121121

122+
val hadoopInputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.hadoop")(SparkContext.LongAccumulatorParam)
123+
122124
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
123125

124126
protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
@@ -205,6 +207,7 @@ class HadoopRDD[K, V](
205207
* always at record boundaries, so tasks may need to read into other splits to complete
206208
* a record. */
207209
inputMetrics.bytesRead = split.inputSplit.value.getLength()
210+
hadoopInputBytes += split.inputSplit.value.getLength()
208211
} catch {
209212
case e: java.io.IOException =>
210213
logWarning("Unable to get input size to set InputMetrics for task", e)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,4 +1270,10 @@ abstract class RDD[T: ClassTag](
12701270
def toJavaRDD() : JavaRDD[T] = {
12711271
new JavaRDD(this)(elementClassTag)
12721272
}
1273+
1274+
// =======================================================================
1275+
// Common metrics
1276+
// =======================================================================
1277+
// Input bytes if this RDD was read from persisted data or a filesystem
1278+
val inputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.persisted")
12731279
}

0 commit comments

Comments
 (0)