Skip to content

Commit b82e90e

Browse files
committed
address comments
1 parent 6226058 commit b82e90e

20 files changed

+85
-312
lines changed

core/src/main/scala/org/apache/spark/InternalAccumulator.scala

Lines changed: 0 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,11 @@
1717

1818
package org.apache.spark
1919

20-
import org.apache.spark.storage.{BlockId, BlockStatus}
21-
22-
2320
/**
2421
* A collection of fields and methods concerned with internal accumulators that represent
2522
* task level metrics.
2623
*/
2724
private[spark] object InternalAccumulator {
28-
29-
import AccumulatorParam._
30-
3125
// Prefixes used in names of internal task level metrics
3226
val METRICS_PREFIX = "internal.metrics."
3327
val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
@@ -79,125 +73,4 @@ private[spark] object InternalAccumulator {
7973
}
8074

8175
// scalastyle:on
82-
83-
/**
84-
* Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
85-
*/
86-
def create(name: String): Accumulator[_] = {
87-
require(name.startsWith(METRICS_PREFIX),
88-
s"internal accumulator name must start with '$METRICS_PREFIX': $name")
89-
getParam(name) match {
90-
case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
91-
case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
92-
case p @ StringAccumulatorParam => newMetric[String]("", name, p)
93-
case p @ UpdatedBlockStatusesAccumulatorParam =>
94-
newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
95-
case p => throw new IllegalArgumentException(
96-
s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
97-
}
98-
}
99-
100-
/**
101-
* Get the [[AccumulatorParam]] associated with the internal metric name,
102-
* which must begin with [[METRICS_PREFIX]].
103-
*/
104-
def getParam(name: String): AccumulatorParam[_] = {
105-
require(name.startsWith(METRICS_PREFIX),
106-
s"internal accumulator name must start with '$METRICS_PREFIX': $name")
107-
name match {
108-
case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
109-
case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
110-
case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
111-
case _ => LongAccumulatorParam
112-
}
113-
}
114-
115-
/**
116-
* Accumulators for tracking internal metrics.
117-
*/
118-
def createAll(): Seq[Accumulator[_]] = {
119-
Seq[String](
120-
EXECUTOR_DESERIALIZE_TIME,
121-
EXECUTOR_RUN_TIME,
122-
RESULT_SIZE,
123-
JVM_GC_TIME,
124-
RESULT_SERIALIZATION_TIME,
125-
MEMORY_BYTES_SPILLED,
126-
DISK_BYTES_SPILLED,
127-
PEAK_EXECUTION_MEMORY,
128-
UPDATED_BLOCK_STATUSES).map(create) ++
129-
createShuffleReadAccums() ++
130-
createShuffleWriteAccums() ++
131-
createInputAccums() ++
132-
createOutputAccums() ++
133-
sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
134-
}
135-
136-
/**
137-
* Accumulators for tracking shuffle read metrics.
138-
*/
139-
def createShuffleReadAccums(): Seq[Accumulator[_]] = {
140-
Seq[String](
141-
shuffleRead.REMOTE_BLOCKS_FETCHED,
142-
shuffleRead.LOCAL_BLOCKS_FETCHED,
143-
shuffleRead.REMOTE_BYTES_READ,
144-
shuffleRead.LOCAL_BYTES_READ,
145-
shuffleRead.FETCH_WAIT_TIME,
146-
shuffleRead.RECORDS_READ).map(create)
147-
}
148-
149-
/**
150-
* Accumulators for tracking shuffle write metrics.
151-
*/
152-
def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
153-
Seq[String](
154-
shuffleWrite.BYTES_WRITTEN,
155-
shuffleWrite.RECORDS_WRITTEN,
156-
shuffleWrite.WRITE_TIME).map(create)
157-
}
158-
159-
/**
160-
* Accumulators for tracking input metrics.
161-
*/
162-
def createInputAccums(): Seq[Accumulator[_]] = {
163-
Seq[String](
164-
input.BYTES_READ,
165-
input.RECORDS_READ).map(create)
166-
}
167-
168-
/**
169-
* Accumulators for tracking output metrics.
170-
*/
171-
def createOutputAccums(): Seq[Accumulator[_]] = {
172-
Seq[String](
173-
output.BYTES_WRITTEN,
174-
output.RECORDS_WRITTEN).map(create)
175-
}
176-
177-
/**
178-
* Accumulators for tracking internal metrics.
179-
*
180-
* These accumulators are created with the stage such that all tasks in the stage will
181-
* add to the same set of accumulators. We do this to report the distribution of accumulator
182-
* values across all tasks within each stage.
183-
*/
184-
def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
185-
val accums = createAll()
186-
accums.foreach { accum =>
187-
Accumulators.register(accum)
188-
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
189-
}
190-
accums
191-
}
192-
193-
/**
194-
* Create a new accumulator representing an internal task metric.
195-
*/
196-
private def newMetric[T](
197-
initialValue: T,
198-
name: String,
199-
param: AccumulatorParam[T]): Accumulator[T] = {
200-
new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
201-
}
202-
20376
}

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,11 @@ object TaskContext {
6262
protected[spark] def unset(): Unit = taskContext.remove()
6363

6464
/**
65-
* An empty task context that does not represent an actual task.
65+
* An empty task context that does not represent an actual task. This is only used in tests.
6666
*/
6767
private[spark] def empty(): TaskContextImpl = {
6868
new TaskContextImpl(0, 0, 0, 0, null, new Properties, null, new TaskMetrics)
6969
}
70-
7170
}
7271

7372

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private[spark] class TaskContextImpl(
3636
override val taskMemoryManager: TaskMemoryManager,
3737
localProperties: Properties,
3838
@transient private val metricsSystem: MetricsSystem,
39-
val taskMetrics: TaskMetrics)
39+
override val taskMetrics: TaskMetrics)
4040
extends TaskContext
4141
with Logging {
4242

core/src/main/scala/org/apache/spark/executor/InputMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ object DataReadMethod extends Enumeration with Serializable {
4242
class InputMetrics private[spark] () extends Serializable {
4343
import InternalAccumulator._
4444

45-
private[executor] val _bytesRead = TaskMetrics.createAccum[Long](input.BYTES_READ)
46-
private[executor] val _recordsRead = TaskMetrics.createAccum[Long](input.RECORDS_READ)
45+
private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ)
46+
private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ)
4747

4848
/**
4949
* Total number of bytes read.

core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ object DataWriteMethod extends Enumeration with Serializable {
4141
class OutputMetrics private[spark] () extends Serializable {
4242
import InternalAccumulator._
4343

44-
private[executor] val _bytesWritten = TaskMetrics.createAccum[Long](output.BYTES_WRITTEN)
45-
private[executor] val _recordsWritten = TaskMetrics.createAccum[Long](output.RECORDS_WRITTEN)
44+
private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
45+
private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
4646

4747
/**
4848
* Total number of bytes written.

core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
3131
import InternalAccumulator._
3232

3333
private[executor] val _remoteBlocksFetched =
34-
TaskMetrics.createAccum[Int](shuffleRead.REMOTE_BLOCKS_FETCHED)
34+
TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
3535
private[executor] val _localBlocksFetched =
36-
TaskMetrics.createAccum[Int](shuffleRead.LOCAL_BLOCKS_FETCHED)
36+
TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
3737
private[executor] val _remoteBytesRead =
38-
TaskMetrics.createAccum[Long](shuffleRead.REMOTE_BYTES_READ)
38+
TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
3939
private[executor] val _localBytesRead =
40-
TaskMetrics.createAccum[Long](shuffleRead.LOCAL_BYTES_READ)
40+
TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
4141
private[executor] val _fetchWaitTime =
42-
TaskMetrics.createAccum[Long](shuffleRead.FETCH_WAIT_TIME)
42+
TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
4343
private[executor] val _recordsRead =
44-
TaskMetrics.createAccum[Long](shuffleRead.RECORDS_READ)
44+
TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)
4545

4646
/**
4747
* Number of remote blocks fetched in this shuffle by this task.

core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
3131
import InternalAccumulator._
3232

3333
private[executor] val _bytesWritten =
34-
TaskMetrics.createAccum[Long](shuffleWrite.BYTES_WRITTEN)
34+
TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN)
3535
private[executor] val _recordsWritten =
36-
TaskMetrics.createAccum[Long](shuffleWrite.RECORDS_WRITTEN)
36+
TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN)
3737
private[executor] val _writeTime =
38-
TaskMetrics.createAccum[Long](shuffleWrite.WRITE_TIME)
38+
TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME)
3939

4040
/**
4141
* Number of bytes written for the shuffle by this task.

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.executor
2020
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark._
23+
import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam}
2324
import org.apache.spark.annotation.DeveloperApi
2425
import org.apache.spark.internal.Logging
2526
import org.apache.spark.scheduler.AccumulableInfo
@@ -44,16 +45,15 @@ class TaskMetrics private[spark] () extends Serializable {
4445
import InternalAccumulator._
4546

4647
// Each metric is internally represented as an accumulator
47-
private val _executorDeserializeTime = TaskMetrics.createAccum[Long](EXECUTOR_DESERIALIZE_TIME)
48-
private val _executorRunTime = TaskMetrics.createAccum[Long](EXECUTOR_RUN_TIME)
49-
private val _resultSize = TaskMetrics.createAccum[Long](RESULT_SIZE)
50-
private val _jvmGCTime = TaskMetrics.createAccum[Long](JVM_GC_TIME)
51-
private val _resultSerializationTime = TaskMetrics.createAccum[Long](RESULT_SERIALIZATION_TIME)
52-
private val _memoryBytesSpilled = TaskMetrics.createAccum[Long](MEMORY_BYTES_SPILLED)
53-
private val _diskBytesSpilled = TaskMetrics.createAccum[Long](DISK_BYTES_SPILLED)
54-
private val _peakExecutionMemory = TaskMetrics.createAccum[Long](PEAK_EXECUTION_MEMORY)
55-
private val _updatedBlockStatuses =
56-
TaskMetrics.createAccum[Seq[(BlockId, BlockStatus)]](UPDATED_BLOCK_STATUSES)
48+
private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME)
49+
private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME)
50+
private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE)
51+
private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME)
52+
private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME)
53+
private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED)
54+
private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED)
55+
private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY)
56+
private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES)
5757

5858
/**
5959
* Time taken on the executor to deserialize this task.
@@ -176,7 +176,7 @@ class TaskMetrics private[spark] () extends Serializable {
176176

177177
// Only used for test
178178
private[spark] val testAccum =
179-
sys.props.get("spark.testing").map(_ => TaskMetrics.createAccum[Long](TEST_ACCUM))
179+
sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM))
180180

181181
@transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = {
182182
val in = inputMetrics
@@ -243,11 +243,30 @@ private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) ext
243243
}
244244

245245
private[spark] object TaskMetrics extends Logging {
246-
import InternalAccumulator._
247246

248247
def empty: TaskMetrics = new TaskMetrics
249248

250-
def createAccum[T](name: String): Accumulator[T] = create(name).asInstanceOf[Accumulator[T]]
249+
/**
250+
* Create a new accumulator representing an internal task metric.
251+
*/
252+
private def newMetric[T](
253+
initialValue: T,
254+
name: String,
255+
param: AccumulatorParam[T]): Accumulator[T] = {
256+
new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
257+
}
258+
259+
def createLongAccum(name: String): Accumulator[Long] = {
260+
newMetric(0L, name, LongAccumulatorParam)
261+
}
262+
263+
def createIntAccum(name: String): Accumulator[Int] = {
264+
newMetric(0, name, IntAccumulatorParam)
265+
}
266+
267+
def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = {
268+
newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam)
269+
}
251270

252271
/**
253272
* Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class StageInfo(
3636
val rddInfos: Seq[RDDInfo],
3737
val parentIds: Seq[Int],
3838
val details: String,
39-
val taskMetrics: TaskMetrics = null,
39+
val taskMetrics: TaskMetrics = new TaskMetrics,
4040
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
4141
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
4242
var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
8181
stage: Stage,
8282
attemptId: Int,
8383
numTasks: Option[Int] = None,
84-
taskMetrics: TaskMetrics = null,
84+
taskMetrics: TaskMetrics = new TaskMetrics,
8585
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
8686
): StageInfo = {
8787
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
4646
* @param partitionId index of the number in the RDD
4747
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
4848
* @param localProperties copy of thread-local properties set by the user on the driver side.
49+
*
50+
* The default values for `metrics` and `localProperties` are used by tests only.
4951
*/
5052
private[spark] abstract class Task[T](
5153
val stageId: Int,
5254
val stageAttemptId: Int,
5355
val partitionId: Int,
54-
val metrics: TaskMetrics,
55-
@transient var localProperties: Properties) extends Serializable {
56+
val metrics: TaskMetrics = new TaskMetrics,
57+
@transient var localProperties: Properties = new Properties) extends Serializable {
5658

5759
/**
5860
* Called by [[org.apache.spark.executor.Executor]] to run this task.

0 commit comments

Comments
 (0)