Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 44 additions & 27 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.executor

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import scala.collection.mutable.ArrayBuffer

import com.google.common.collect.Maps

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -200,32 +202,47 @@ class TaskMetrics private[spark] () extends Serializable {


import InternalAccumulator._
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
Copy link
Contributor Author

@JoshRosen JoshRosen May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the use of LinkedHashMap was added by @cloud-fan in #12612 in order to preserve ordering from the old code. As far as I can tell we don't actually rely on the ordering of the entries in this map, so I didn't preserved the use of LinkedHashMap.

EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
EXECUTOR_RUN_TIME -> _executorRunTime,
EXECUTOR_CPU_TIME -> _executorCpuTime,
RESULT_SIZE -> _resultSize,
JVM_GC_TIME -> _jvmGCTime,
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
input.BYTES_READ -> inputMetrics._bytesRead,
input.RECORDS_READ -> inputMetrics._recordsRead,
output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
) ++ testAccum.map(TEST_ACCUM -> _)
@transient private[spark] lazy val nameToAccums = {
// The construction of this map is a performance hotspot in the JobProgressListener, so we
// optimize this by using a pre-sized Java hashmap; see SPARK-20776 for more details.
val mapEntries = Array[(String, AccumulatorV2[_, _])](
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
EXECUTOR_RUN_TIME -> _executorRunTime,
EXECUTOR_CPU_TIME -> _executorCpuTime,
RESULT_SIZE -> _resultSize,
JVM_GC_TIME -> _jvmGCTime,
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
input.BYTES_READ -> inputMetrics._bytesRead,
input.RECORDS_READ -> inputMetrics._recordsRead,
output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
)
val map = Maps.newHashMapWithExpectedSize[String, AccumulatorV2[_, _]](mapEntries.length)
var i = 0
while (i < mapEntries.length) {
val e = mapEntries(i)
map.put(e._1, e._2)
i += 1
}
testAccum.foreach { accum =>
map.put(TEST_ACCUM, accum)
}
map.asScala
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map + wrapper might consume a little bit of extra memory compared to the old code but it doesn't matter because we don't have that many TaskMetrics resident in the JVM at the same time: in the executor, the only instances are in TaskContexts and in the driver you only have one per stage in the scheduler and some temporary ones in the listener bus queue which are freed as soon as the queue events are processed (which happens faster now, outweighing the extra space usage).

}

@transient private[spark] lazy val internalAccums: Seq[AccumulatorV2[_, _]] =
nameToAccums.values.toIndexedSeq
Expand Down