Skip to content

Commit 9eb2a87

Browse files
committed
create accumulators in TaskMetrics
1 parent 2f1d032 commit 9eb2a87

File tree

11 files changed

+146
-411
lines changed

11 files changed

+146
-411
lines changed

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] class TaskContextImpl(
4343
/**
4444
* Metrics associated with this task.
4545
*/
46-
override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)
46+
override val taskMetrics: TaskMetrics = new TaskMetrics
4747

4848
/** List of callback functions to execute when the task completes. */
4949
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

core/src/main/scala/org/apache/spark/executor/InputMetrics.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.executor
1919

20-
import org.apache.spark.{Accumulator, InternalAccumulator}
20+
import org.apache.spark.InternalAccumulator
2121
import org.apache.spark.annotation.DeveloperApi
2222

2323

@@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable {
3939
* A collection of accumulators that represents metrics about reading data from external systems.
4040
*/
4141
@DeveloperApi
42-
class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long])
43-
extends Serializable {
42+
class InputMetrics private[spark] () extends Serializable {
43+
import InternalAccumulator._
4444

45-
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
46-
this(
47-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
48-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ))
49-
}
45+
private[executor] val _bytesRead = TaskMetrics.createAccum[Long](input.BYTES_READ)
46+
private[executor] val _recordsRead = TaskMetrics.createAccum[Long](input.RECORDS_READ)
5047

5148
/**
5249
* Total number of bytes read.
@@ -66,5 +63,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
6663
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
6764
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
6865
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
69-
7066
}

core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.executor
1919

20-
import org.apache.spark.{Accumulator, InternalAccumulator}
20+
import org.apache.spark.InternalAccumulator
2121
import org.apache.spark.annotation.DeveloperApi
2222

2323

@@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable {
3838
* A collection of accumulators that represents metrics about writing data to external systems.
3939
*/
4040
@DeveloperApi
41-
class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long])
42-
extends Serializable {
41+
class OutputMetrics private[spark] () extends Serializable {
42+
import InternalAccumulator._
4343

44-
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
45-
this(
46-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
47-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN))
48-
}
44+
private[executor] val _bytesWritten = TaskMetrics.createAccum[Long](output.BYTES_WRITTEN)
45+
private[executor] val _recordsWritten = TaskMetrics.createAccum[Long](output.RECORDS_WRITTEN)
4946

5047
/**
5148
* Total number of bytes written.

core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.executor
1919

20-
import org.apache.spark.{Accumulator, InternalAccumulator}
20+
import org.apache.spark.InternalAccumulator
2121
import org.apache.spark.annotation.DeveloperApi
2222

2323

@@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi
2727
* Operations are not thread-safe.
2828
*/
2929
@DeveloperApi
30-
class ShuffleReadMetrics private (
31-
_remoteBlocksFetched: Accumulator[Int],
32-
_localBlocksFetched: Accumulator[Int],
33-
_remoteBytesRead: Accumulator[Long],
34-
_localBytesRead: Accumulator[Long],
35-
_fetchWaitTime: Accumulator[Long],
36-
_recordsRead: Accumulator[Long])
37-
extends Serializable {
38-
39-
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
40-
this(
41-
TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
42-
TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
43-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
44-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
45-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
46-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
47-
}
48-
49-
/**
50-
* Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
51-
*
52-
* This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
53-
* many places only to merge their values together later. In the future, we should revisit
54-
* whether this is needed.
55-
*
56-
* A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by
57-
* [[TaskMetrics.mergeShuffleReadMetrics]].
58-
*/
59-
private[spark] def this() {
60-
this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
61-
}
30+
class ShuffleReadMetrics private[spark] () extends Serializable {
31+
import InternalAccumulator._
32+
33+
private[executor] val _remoteBlocksFetched =
34+
TaskMetrics.createAccum[Int](shuffleRead.REMOTE_BLOCKS_FETCHED)
35+
private[executor] val _localBlocksFetched =
36+
TaskMetrics.createAccum[Int](shuffleRead.LOCAL_BLOCKS_FETCHED)
37+
private[executor] val _remoteBytesRead =
38+
TaskMetrics.createAccum[Long](shuffleRead.REMOTE_BYTES_READ)
39+
private[executor] val _localBytesRead =
40+
TaskMetrics.createAccum[Long](shuffleRead.LOCAL_BYTES_READ)
41+
private[executor] val _fetchWaitTime =
42+
TaskMetrics.createAccum[Long](shuffleRead.FETCH_WAIT_TIME)
43+
private[executor] val _recordsRead =
44+
TaskMetrics.createAccum[Long](shuffleRead.RECORDS_READ)
6245

6346
/**
6447
* Number of remote blocks fetched in this shuffle by this task.

core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.executor
1919

20-
import org.apache.spark.{Accumulator, InternalAccumulator}
20+
import org.apache.spark.InternalAccumulator
2121
import org.apache.spark.annotation.DeveloperApi
2222

2323

@@ -27,31 +27,15 @@ import org.apache.spark.annotation.DeveloperApi
2727
* Operations are not thread-safe.
2828
*/
2929
@DeveloperApi
30-
class ShuffleWriteMetrics private (
31-
_bytesWritten: Accumulator[Long],
32-
_recordsWritten: Accumulator[Long],
33-
_writeTime: Accumulator[Long])
34-
extends Serializable {
30+
class ShuffleWriteMetrics private[spark] () extends Serializable {
31+
import InternalAccumulator._
3532

36-
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
37-
this(
38-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN),
39-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN),
40-
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME))
41-
}
42-
43-
/**
44-
* Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task.
45-
*
46-
* This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in
47-
* many places only to merge their values together later. In the future, we should revisit
48-
* whether this is needed.
49-
*
50-
* A better alternative is [[TaskMetrics.shuffleWriteMetrics]].
51-
*/
52-
private[spark] def this() {
53-
this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
54-
}
33+
private[executor] val _bytesWritten =
34+
TaskMetrics.createAccum[Long](shuffleWrite.BYTES_WRITTEN)
35+
private[executor] val _recordsWritten =
36+
TaskMetrics.createAccum[Long](shuffleWrite.RECORDS_WRITTEN)
37+
private[executor] val _writeTime =
38+
TaskMetrics.createAccum[Long](shuffleWrite.WRITE_TIME)
5539

5640
/**
5741
* Number of bytes written for the shuffle by this task.

0 commit comments

Comments
 (0)