From c64da4fabbd805bf289780bbd3fe0f6aec435957 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 7 Jan 2015 16:05:30 -0500 Subject: [PATCH 1/8] Partially updated task metrics to make some vars private --- .../apache/spark/executor/TaskMetrics.scala | 60 ++++++++++++++----- .../org/apache/spark/rdd/HadoopRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 6 +- .../apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 ++--- .../ui/jobs/JobProgressListenerSuite.scala | 12 ++-- .../apache/spark/util/JsonProtocolSuite.scala | 16 ++--- 8 files changed, 76 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 51b5328cb4c8..27a1313ad588 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -182,7 +182,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ - var bytesRead: Long = 0L + private var _bytesRead: Long = _ + def bytesRead = _bytesRead + def incBytesRead(value: Long) = _bytesRead += value + def decBytesRead(value: Long) = _bytesRead -= value } /** @@ -194,7 +197,10 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { /** * Total bytes written */ - var bytesWritten: Long = 0L + private var _bytesWritten: Long = _ + def bytesWritten = _bytesWritten + def incBytesWritten(value : Long) = _bytesWritten += value + def decBytesWritten(value : Long) = _bytesWritten -= value } /** @@ -203,32 +209,48 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { */ @DeveloperApi class ShuffleReadMetrics extends Serializable { - /** - * Number of blocks fetched in this shuffle by this task (remote or local) - */ - def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched - /** * Number of remote blocks fetched in this shuffle by this task */ - var remoteBlocksFetched: Int = _ - + private var _remoteBlocksFetched: Int = _ + def remoteBlocksFetched = _remoteBlocksFetched + def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value + def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + /** * Number of local blocks fetched in this shuffle by this task */ - var localBlocksFetched: Int = _ + private var _localBlocksFetched: Int = _ + def localBlocksFetched = _localBlocksFetched + def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value + def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value + /** * Time the task spent waiting for remote shuffle blocks. This only includes the time * blocking on shuffle input data. For instance if block B is being fetched while the task is * still not finished processing block A, it is not considered to be blocking on block B. */ - var fetchWaitTime: Long = _ - + private var _fetchWaitTime: Long = _ + def fetchWaitTime = _fetchWaitTime + def incFetchWaitTime(value: Long) = _fetchWaitTime += value + def decFetchWaitTime(value: Long) = _fetchWaitTime -= value + /** * Total number of remote bytes read from the shuffle by this task */ - var remoteBytesRead: Long = _ + private var _remoteBytesRead: Long = _ + def remoteBytesRead = _remoteBytesRead + def incRemoteBytesRead(value: Long) = _remoteBytesRead += value + def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + + /** + * Number of blocks fetched in this shuffle by this task (remote or local) + */ + private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched + def totalBlocksFetched = _totalBlocksFetched + def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value + def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value } /** @@ -240,10 +262,16 @@ class ShuffleWriteMetrics extends Serializable { /** * Number of bytes written for the shuffle by this task */ - @volatile var shuffleBytesWritten: Long = _ - + @volatile private var _shuffleBytesWritten: Long = _ + def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value + def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value + def shuffleBytesWritten = _shuffleBytesWritten /** * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ - @volatile var shuffleWriteTime: Long = _ + @volatile private var _shuffleWriteTime: Long = _ + def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value + def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value + def shuffleWriteTime= _shuffleWriteTime + } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a157e36e2286..7792e8c53a81 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -252,7 +252,7 @@ class HadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.incBytesRead(bytesReadFn()) } else { recordsSinceMetricsUpdate += 1 } @@ -264,12 +264,12 @@ class HadoopRDD[K, V]( reader.close() if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.incBytesRead(bytesReadFn()) } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.inputSplit.value.getLength + inputMetrics.incBytesRead(split.inputSplit.value.getLength) context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e55d03d391e0..3a8ac80b005a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -159,7 +159,7 @@ class NewHadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.incBytesRead(bytesReadFn()) } else { recordsSinceMetricsUpdate += 1 } @@ -174,12 +174,12 @@ class NewHadoopRDD[K, V]( // Update metrics with final amount if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.incBytesRead(bytesReadFn()) } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index c43e1f2fe135..c767ce607943 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1000,7 +1000,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close(hadoopContext) } committer.commitTask(hadoopContext) - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } 1 } : Int @@ -1072,7 +1072,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close() } writer.commit() - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } } self.context.runJob(self, writeToFile) @@ -1095,7 +1095,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 && bytesWrittenCallback.isDefined) { - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d7b184f8a10e..26a47e8bd0ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,7 +54,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.bytesRead = bytes + inputMetrics.incBytesRead(bytes) } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e7b80e8774b9..27c4f9f3f46c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -621,31 +621,31 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics - metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] - metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] - metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] - metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long] + metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int]) + metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) + metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) + metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) metrics } def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { val metrics = new ShuffleWriteMetrics - metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long] - metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long] + metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) + metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) metrics } def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics.incBytesRead((json \ "Bytes Read").extract[Long]) metrics } def outputMetricsFromJson(json: JValue): OutputMetrics = { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) - metrics.bytesWritten = (json \ "Bytes Written").extract[Long] + metrics.incBytesWritten((json \ "Bytes Written").extract[Long]) metrics } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 12af60caf7d5..22c2a65e5106 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -138,7 +138,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead - shuffleReadMetrics.remoteBytesRead = 1000 + shuffleReadMetrics.incRemoteBytesRead(1000) taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 @@ -224,18 +224,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val shuffleWriteMetrics = new ShuffleWriteMetrics() taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) - shuffleReadMetrics.remoteBytesRead = base + 1 - shuffleReadMetrics.remoteBlocksFetched = base + 2 - shuffleWriteMetrics.shuffleBytesWritten = base + 3 + shuffleReadMetrics.incRemoteBytesRead(base + 1) + shuffleReadMetrics.incRemoteBlocksFetched(base + 2) + shuffleWriteMetrics.incShuffleBytesWritten(base + 3) taskMetrics.executorRunTime = base + 4 taskMetrics.diskBytesSpilled = base + 5 taskMetrics.memoryBytesSpilled = base + 6 val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) - inputMetrics.bytesRead = base + 7 + inputMetrics.incBytesRead(base + 7) val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) - outputMetrics.bytesWritten = base + 8 + outputMetrics.incBytesWritten(base + 8) taskMetrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 593d6dd8c379..589de887cc1c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -633,24 +633,24 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.bytesRead = d + e + f + inputMetrics.incBytesRead(d + e + f) t.inputMetrics = Some(inputMetrics) } else { val sr = new ShuffleReadMetrics - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + sr.incRemoteBytesRead(b + d) + sr.incLocalBlocksFetched(e) + sr.incFetchWaitTime(a + d) + sr.incRemoteBlocksFetched(f) t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - outputMetrics.bytesWritten = a + b + c + outputMetrics.incBytesWritten(a + b + c) t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics - sw.shuffleBytesWritten = a + b + c - sw.shuffleWriteTime = b + c + d + sw.incShuffleBytesWritten(a + b + c) + sw.incShuffleWriteTime(b + c + d) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From 5525c2011d96929d6143e24cf1abdd9823a9fe26 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 7 Jan 2015 16:25:40 -0500 Subject: [PATCH 2/8] Completed refactoring to make vars in TaskMetrics class private --- .../org/apache/spark/executor/Executor.scala | 12 ++--- .../apache/spark/executor/TaskMetrics.scala | 47 ++++++++++++++----- .../org/apache/spark/scheduler/Task.scala | 2 +- .../spark/scheduler/TaskResultGetter.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 +++---- .../ui/jobs/JobProgressListenerSuite.scala | 6 +-- .../apache/spark/util/JsonProtocolSuite.scala | 14 +++--- 7 files changed, 62 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 52de6980ecbf..d21c63fa53f8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,10 +206,10 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.executorDeserializeTime = taskStart - deserializeStartTime - m.executorRunTime = taskFinish - taskStart - m.jvmGCTime = gcTime - startGCTime - m.resultSerializationTime = afterSerialization - beforeSerialization + m.incExecutorDeserializeTime(taskStart - deserializeStartTime) + m.incExecutorRunTime(taskFinish - taskStart) + m.incJvmGCTime(gcTime - startGCTime) + m.incResultSerializationTime(afterSerialization - beforeSerialization) } val accumUpdates = Accumulators.values @@ -260,8 +260,8 @@ private[spark] class Executor( val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { - m.executorRunTime = serviceTime - m.jvmGCTime = gcTime - startGCTime + m.incExecutorRunTime(serviceTime) + m.incJvmGCTime(gcTime - startGCTime) } val reason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 27a1313ad588..5903c1091e79 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -39,42 +39,67 @@ class TaskMetrics extends Serializable { /** * Host's name the task runs on */ - var hostname: String = _ - + private var _hostname: String = _ + def hostname = _hostname + def setHostname(value : String) = _hostname = value + /** * Time taken on the executor to deserialize this task */ - var executorDeserializeTime: Long = _ - + private var _executorDeserializeTime: Long = _ + def executorDeserializeTime = _executorDeserializeTime + def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value + def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value + + /** * Time the executor spends actually running the task (including fetching shuffle data) */ - var executorRunTime: Long = _ - + private var _executorRunTime: Long = _ + def executorRunTime = _executorRunTime + def incExecutorRunTime(value: Long) = _executorRunTime += value + def decExecutorRunTime(value: Long) = _executorRunTime -= value + /** * The number of bytes this task transmitted back to the driver as the TaskResult */ - var resultSize: Long = _ + private var _resultSize: Long = _ + def resultSize = _resultSize + def incResultSize(value: Long) = _resultSize += value + def decResultSize(value: Long) = _resultSize -= value + /** * Amount of time the JVM spent in garbage collection while executing this task */ - var jvmGCTime: Long = _ + private var _jvmGCTime: Long = _ + def jvmGCTime = _jvmGCTime + def incJvmGCTime(value: Long) = _jvmGCTime += value + def decJvmGCTime(value: Long) = _jvmGCTime -= value /** * Amount of time spent serializing the task result */ - var resultSerializationTime: Long = _ + private var _resultSerializationTime: Long = _ + def resultSerializationTime = _resultSerializationTime + def incResultSerializationTime(value: Long) = _resultSerializationTime += value + def decResultSerializationTime(value: Long) = _resultSerializationTime -= value /** * The number of in-memory bytes spilled by this task */ - var memoryBytesSpilled: Long = _ + private var _memoryBytesSpilled: Long = _ + def memoryBytesSpilled = _memoryBytesSpilled + def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value + def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value /** * The number of on-disk bytes spilled by this task */ - var diskBytesSpilled: Long = _ + private var _diskBytesSpilled: Long = _ + def diskBytesSpilled = _diskBytesSpilled + def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value + def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value /** * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 2552d03d18d0..1eb273998252 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -47,7 +47,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(attemptId: Long): T = { context = new TaskContextImpl(stageId, partitionId, attemptId, false) TaskContextHelper.setTaskContext(context) - context.taskMetrics.hostname = Utils.localHostName() + context.taskMetrics.setHostname(Utils.localHostName()) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 819b51e12ad8..c4f16c783a40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -76,7 +76,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } - result.metrics.resultSize = size + result.metrics.incResultSize(size) scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 27c4f9f3f46c..7a7d4efb47aa 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -592,14 +592,14 @@ private[spark] object JsonProtocol { return TaskMetrics.empty } val metrics = new TaskMetrics - metrics.hostname = (json \ "Host Name").extract[String] - metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] - metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] - metrics.resultSize = (json \ "Result Size").extract[Long] - metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long] - metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] - metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] - metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] + metrics.setHostname((json \ "Host Name").extract[String]) + metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long]) + metrics.incResultSize((json \ "Result Size").extract[Long]) + metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long]) + metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long]) + metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) + metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 22c2a65e5106..6389bd13cd85 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -227,9 +227,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) shuffleWriteMetrics.incShuffleBytesWritten(base + 3) - taskMetrics.executorRunTime = base + 4 - taskMetrics.diskBytesSpilled = base + 5 - taskMetrics.memoryBytesSpilled = base + 6 + taskMetrics.incExecutorRunTime(base + 4) + taskMetrics.incDiskBytesSpilled(base + 5) + taskMetrics.incMemoryBytesSpilled(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) inputMetrics.incBytesRead(base + 7) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 589de887cc1c..c0fe9b80a8e4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -623,13 +623,13 @@ class JsonProtocolSuite extends FunSuite { hasHadoopInput: Boolean, hasOutput: Boolean) = { val t = new TaskMetrics - t.hostname = "localhost" - t.executorDeserializeTime = a - t.executorRunTime = b - t.resultSize = c - t.jvmGCTime = d - t.resultSerializationTime = a + b - t.memoryBytesSpilled = a + c + t.setHostname("localhost") + t.incExecutorDeserializeTime(a) + t.incExecutorRunTime(b) + t.incResultSize(c) + t.incJvmGCTime(d) + t.incResultSerializationTime(a + b) + t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) From 1fd59b27a91d645b15e7c701cedb8bc56fbd987a Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 8 Jan 2015 12:08:10 -0500 Subject: [PATCH 3/8] Updated documentation for accumulators to highlight lazy evaluation issue --- docs/programming-guide.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 5e0d5c15d706..3a55d39468ef 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1316,7 +1316,15 @@ For accumulator updates performed inside actions only, Spark guarantees t will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task's update may be applied more than once if tasks or job stages are re-executed. +In addition, accumulators do not maintain lineage for the operations that use them. Consequently, accumulators updates are not guaranteed to be executed when made within a lazy transformation like `map()`. Unless something has triggered the evaluation of the lazy transformation that updates the value of the accumlator, subsequent operations will not themselves trigger that evaluation and the value of the accumulator will remain unchanged. The below code fragment demonstrates this issue: +
+{% highlight scala %} + val acc = sc.accumulator(0) + data.map(x => acc += x; f(x)) + // Here, acc is still 0 because no actions have cause the `map` to be computed. +{% endhighlight %} +
# Deploying to a Cluster From 33b5a2d4f0ea354d546fdefb0eaed3330709f3c2 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 8 Jan 2015 12:14:32 -0500 Subject: [PATCH 4/8] Added code examples for java and python --- docs/programming-guide.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 3a55d39468ef..f216b22362b5 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1318,6 +1318,8 @@ of that each task's update may be applied more than once if tasks or job stages In addition, accumulators do not maintain lineage for the operations that use them. Consequently, accumulators updates are not guaranteed to be executed when made within a lazy transformation like `map()`. Unless something has triggered the evaluation of the lazy transformation that updates the value of the accumlator, subsequent operations will not themselves trigger that evaluation and the value of the accumulator will remain unchanged. The below code fragment demonstrates this issue: +
+
{% highlight scala %} val acc = sc.accumulator(0) @@ -1326,6 +1328,24 @@ In addition, accumulators do not maintain lineage for the operations that use th {% endhighlight %}
+
+{% highlight java %} + Accumulator accum = sc.accumulator(0); + data.map(x -> accum.add(x); f(x);); + // Here, acc is still 0 because no actions have cause the `map` to be computed. +{% endhighlight %} +
+ +
+{% highlight python %} + accum = sc.accumulator(0) + data.map(lambda x => acc.add(x); f(x)) + # Here, acc is still 0 because no actions have cause the `map` to be computed. +{% endhighlight %} +
+ +
+ # Deploying to a Cluster The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster. From 3a38db1ea96f0b1045f78fb5aa093be302c1ab42 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 9 Jan 2015 13:30:55 -0500 Subject: [PATCH 5/8] Verified documentation update by building via jekyll --- docs/programming-guide.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f216b22362b5..0ade46886ffd 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1316,31 +1316,31 @@ For accumulator updates performed inside actions only, Spark guarantees t will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task's update may be applied more than once if tasks or job stages are re-executed. -In addition, accumulators do not maintain lineage for the operations that use them. Consequently, accumulators updates are not guaranteed to be executed when made within a lazy transformation like `map()`. Unless something has triggered the evaluation of the lazy transformation that updates the value of the accumlator, subsequent operations will not themselves trigger that evaluation and the value of the accumulator will remain unchanged. The below code fragment demonstrates this issue: +In addition, accumulators do not maintain lineage for the operations that use them. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like `map()`. Unless something has triggered the evaluation of the lazy transformation that updates the value of the accumlator, subsequent operations will not themselves trigger that evaluation and the value of the accumulator will remain unchanged. The below code fragment demonstrates this issue:
{% highlight scala %} - val acc = sc.accumulator(0) - data.map(x => acc += x; f(x)) - // Here, acc is still 0 because no actions have cause the `map` to be computed. +val acc = sc.accumulator(0) +data.map(x => acc += x; f(x)) +// Here, acc is still 0 because no actions have cause the `map` to be computed. {% endhighlight %}
{% highlight java %} - Accumulator accum = sc.accumulator(0); - data.map(x -> accum.add(x); f(x);); - // Here, acc is still 0 because no actions have cause the `map` to be computed. +Accumulator accum = sc.accumulator(0); +data.map(x -> accum.add(x); f(x);); +// Here, accum is still 0 because no actions have cause the `map` to be computed. {% endhighlight %}
{% highlight python %} - accum = sc.accumulator(0) - data.map(lambda x => acc.add(x); f(x)) - # Here, acc is still 0 because no actions have cause the `map` to be computed. +accum = sc.accumulator(0) +data.map(lambda x => acc.add(x); f(x)) +# Here, acc is still 0 because no actions have cause the `map` to be computed. {% endhighlight %}
From 3f6c5127b094519be0fe3d7ec99a8654d3a58728 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 13 Jan 2015 12:51:03 -0500 Subject: [PATCH 6/8] Revert "Completed refactoring to make vars in TaskMetrics class private" This reverts commit 5525c2011d96929d6143e24cf1abdd9823a9fe26. --- .../org/apache/spark/executor/Executor.scala | 12 ++--- .../apache/spark/executor/TaskMetrics.scala | 47 +++++-------------- .../org/apache/spark/scheduler/Task.scala | 2 +- .../spark/scheduler/TaskResultGetter.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 +++---- .../ui/jobs/JobProgressListenerSuite.scala | 6 +-- .../apache/spark/util/JsonProtocolSuite.scala | 14 +++--- 7 files changed, 37 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3a5d2638a32e..0f99cd9f3b08 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -194,10 +194,10 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.incExecutorDeserializeTime(taskStart - deserializeStartTime) - m.incExecutorRunTime(taskFinish - taskStart) - m.incJvmGCTime(gcTime - startGCTime) - m.incResultSerializationTime(afterSerialization - beforeSerialization) + m.executorDeserializeTime = taskStart - deserializeStartTime + m.executorRunTime = taskFinish - taskStart + m.jvmGCTime = gcTime - startGCTime + m.resultSerializationTime = afterSerialization - beforeSerialization } val accumUpdates = Accumulators.values @@ -248,8 +248,8 @@ private[spark] class Executor( val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { - m.incExecutorRunTime(serviceTime) - m.incJvmGCTime(gcTime - startGCTime) + m.executorRunTime = serviceTime + m.jvmGCTime = gcTime - startGCTime } val reason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 5903c1091e79..27a1313ad588 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -39,67 +39,42 @@ class TaskMetrics extends Serializable { /** * Host's name the task runs on */ - private var _hostname: String = _ - def hostname = _hostname - def setHostname(value : String) = _hostname = value - + var hostname: String = _ + /** * Time taken on the executor to deserialize this task */ - private var _executorDeserializeTime: Long = _ - def executorDeserializeTime = _executorDeserializeTime - def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value - def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value - - + var executorDeserializeTime: Long = _ + /** * Time the executor spends actually running the task (including fetching shuffle data) */ - private var _executorRunTime: Long = _ - def executorRunTime = _executorRunTime - def incExecutorRunTime(value: Long) = _executorRunTime += value - def decExecutorRunTime(value: Long) = _executorRunTime -= value - + var executorRunTime: Long = _ + /** * The number of bytes this task transmitted back to the driver as the TaskResult */ - private var _resultSize: Long = _ - def resultSize = _resultSize - def incResultSize(value: Long) = _resultSize += value - def decResultSize(value: Long) = _resultSize -= value - + var resultSize: Long = _ /** * Amount of time the JVM spent in garbage collection while executing this task */ - private var _jvmGCTime: Long = _ - def jvmGCTime = _jvmGCTime - def incJvmGCTime(value: Long) = _jvmGCTime += value - def decJvmGCTime(value: Long) = _jvmGCTime -= value + var jvmGCTime: Long = _ /** * Amount of time spent serializing the task result */ - private var _resultSerializationTime: Long = _ - def resultSerializationTime = _resultSerializationTime - def incResultSerializationTime(value: Long) = _resultSerializationTime += value - def decResultSerializationTime(value: Long) = _resultSerializationTime -= value + var resultSerializationTime: Long = _ /** * The number of in-memory bytes spilled by this task */ - private var _memoryBytesSpilled: Long = _ - def memoryBytesSpilled = _memoryBytesSpilled - def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value - def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value + var memoryBytesSpilled: Long = _ /** * The number of on-disk bytes spilled by this task */ - private var _diskBytesSpilled: Long = _ - def diskBytesSpilled = _diskBytesSpilled - def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value - def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value + var diskBytesSpilled: Long = _ /** * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1727d69e3c73..d7dde4fe3843 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -47,7 +47,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(attemptId: Long): T = { context = new TaskContextImpl(stageId, partitionId, attemptId, runningLocally = false) TaskContextHelper.setTaskContext(context) - context.taskMetrics.setHostname(Utils.localHostName()) + context.taskMetrics.hostname = Utils.localHostName() taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 9fa5a09cc29d..4896ec845bbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } - result.metrics.incResultSize(size) + result.metrics.resultSize = size scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 68a46bbcb707..63d36967201b 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -592,14 +592,14 @@ private[spark] object JsonProtocol { return TaskMetrics.empty } val metrics = new TaskMetrics - metrics.setHostname((json \ "Host Name").extract[String]) - metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) - metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long]) - metrics.incResultSize((json \ "Result Size").extract[Long]) - metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long]) - metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long]) - metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) - metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) + metrics.hostname = (json \ "Host Name").extract[String] + metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] + metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] + metrics.resultSize = (json \ "Result Size").extract[Long] + metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long] + metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] + metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] + metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 6389bd13cd85..22c2a65e5106 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -227,9 +227,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) shuffleWriteMetrics.incShuffleBytesWritten(base + 3) - taskMetrics.incExecutorRunTime(base + 4) - taskMetrics.incDiskBytesSpilled(base + 5) - taskMetrics.incMemoryBytesSpilled(base + 6) + taskMetrics.executorRunTime = base + 4 + taskMetrics.diskBytesSpilled = base + 5 + taskMetrics.memoryBytesSpilled = base + 6 val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) inputMetrics.incBytesRead(base + 7) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 25693ee86648..6ec08a5f564c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -599,13 +599,13 @@ class JsonProtocolSuite extends FunSuite { hasHadoopInput: Boolean, hasOutput: Boolean) = { val t = new TaskMetrics - t.setHostname("localhost") - t.incExecutorDeserializeTime(a) - t.incExecutorRunTime(b) - t.incResultSize(c) - t.incJvmGCTime(d) - t.incResultSerializationTime(a + b) - t.incMemoryBytesSpilled(a + c) + t.hostname = "localhost" + t.executorDeserializeTime = a + t.executorRunTime = b + t.resultSize = c + t.jvmGCTime = d + t.resultSerializationTime = a + b + t.memoryBytesSpilled = a + c if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) From df3afd7895b97c5280bf28d8c24e543d60775834 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 13 Jan 2015 12:51:18 -0500 Subject: [PATCH 7/8] Revert "Partially updated task metrics to make some vars private" This reverts commit c64da4fabbd805bf289780bbd3fe0f6aec435957. --- .../apache/spark/executor/TaskMetrics.scala | 60 +++++-------------- .../org/apache/spark/rdd/HadoopRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 6 +- .../apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 ++--- .../ui/jobs/JobProgressListenerSuite.scala | 12 ++-- .../apache/spark/util/JsonProtocolSuite.scala | 16 ++--- 8 files changed, 48 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 27a1313ad588..51b5328cb4c8 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -182,10 +182,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ - private var _bytesRead: Long = _ - def bytesRead = _bytesRead - def incBytesRead(value: Long) = _bytesRead += value - def decBytesRead(value: Long) = _bytesRead -= value + var bytesRead: Long = 0L } /** @@ -197,10 +194,7 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { /** * Total bytes written */ - private var _bytesWritten: Long = _ - def bytesWritten = _bytesWritten - def incBytesWritten(value : Long) = _bytesWritten += value - def decBytesWritten(value : Long) = _bytesWritten -= value + var bytesWritten: Long = 0L } /** @@ -209,48 +203,32 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { */ @DeveloperApi class ShuffleReadMetrics extends Serializable { + /** + * Number of blocks fetched in this shuffle by this task (remote or local) + */ + def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched + /** * Number of remote blocks fetched in this shuffle by this task */ - private var _remoteBlocksFetched: Int = _ - def remoteBlocksFetched = _remoteBlocksFetched - def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value - + var remoteBlocksFetched: Int = _ + /** * Number of local blocks fetched in this shuffle by this task */ - private var _localBlocksFetched: Int = _ - def localBlocksFetched = _localBlocksFetched - def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value - + var localBlocksFetched: Int = _ /** * Time the task spent waiting for remote shuffle blocks. This only includes the time * blocking on shuffle input data. For instance if block B is being fetched while the task is * still not finished processing block A, it is not considered to be blocking on block B. */ - private var _fetchWaitTime: Long = _ - def fetchWaitTime = _fetchWaitTime - def incFetchWaitTime(value: Long) = _fetchWaitTime += value - def decFetchWaitTime(value: Long) = _fetchWaitTime -= value - - /** - * Total number of remote bytes read from the shuffle by this task - */ - private var _remoteBytesRead: Long = _ - def remoteBytesRead = _remoteBytesRead - def incRemoteBytesRead(value: Long) = _remoteBytesRead += value - def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + var fetchWaitTime: Long = _ /** - * Number of blocks fetched in this shuffle by this task (remote or local) + * Total number of remote bytes read from the shuffle by this task */ - private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched - def totalBlocksFetched = _totalBlocksFetched - def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value - def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value + var remoteBytesRead: Long = _ } /** @@ -262,16 +240,10 @@ class ShuffleWriteMetrics extends Serializable { /** * Number of bytes written for the shuffle by this task */ - @volatile private var _shuffleBytesWritten: Long = _ - def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value - def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value - def shuffleBytesWritten = _shuffleBytesWritten + @volatile var shuffleBytesWritten: Long = _ + /** * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ - @volatile private var _shuffleWriteTime: Long = _ - def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value - def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value - def shuffleWriteTime= _shuffleWriteTime - + @volatile var shuffleWriteTime: Long = _ } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fded29feb1bb..0001c2329c83 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -252,7 +252,7 @@ class HadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.incBytesRead(bytesReadFn()) + inputMetrics.bytesRead = bytesReadFn() } else { recordsSinceMetricsUpdate += 1 } @@ -264,12 +264,12 @@ class HadoopRDD[K, V]( reader.close() if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.incBytesRead(bytesReadFn()) + inputMetrics.bytesRead = bytesReadFn() } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.incBytesRead(split.inputSplit.value.getLength) + inputMetrics.bytesRead = split.inputSplit.value.getLength context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 3a8ac80b005a..e55d03d391e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -159,7 +159,7 @@ class NewHadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.incBytesRead(bytesReadFn()) + inputMetrics.bytesRead = bytesReadFn() } else { recordsSinceMetricsUpdate += 1 } @@ -174,12 +174,12 @@ class NewHadoopRDD[K, V]( // Update metrics with final amount if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.incBytesRead(bytesReadFn()) + inputMetrics.bytesRead = bytesReadFn() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index c53950b25562..38f8f36a4a4d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1010,7 +1010,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close(hadoopContext) } committer.commitTask(hadoopContext) - bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } 1 } : Int @@ -1082,7 +1082,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close() } writer.commit() - bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } } self.context.runJob(self, writeToFile) @@ -1105,7 +1105,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 && bytesWrittenCallback.isDefined) { - bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) } + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 26a47e8bd0ff..d7b184f8a10e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,7 +54,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.incBytesRead(bytes) + inputMetrics.bytesRead = bytes } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 63d36967201b..d94e8252650d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -621,31 +621,31 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics - metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int]) - metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) - metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) - metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) + metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] + metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] + metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] + metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long] metrics } def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { val metrics = new ShuffleWriteMetrics - metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) - metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) + metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long] + metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long] metrics } def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.incBytesRead((json \ "Bytes Read").extract[Long]) + metrics.bytesRead = (json \ "Bytes Read").extract[Long] metrics } def outputMetricsFromJson(json: JValue): OutputMetrics = { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) - metrics.incBytesWritten((json \ "Bytes Written").extract[Long]) + metrics.bytesWritten = (json \ "Bytes Written").extract[Long] metrics } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 22c2a65e5106..12af60caf7d5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -138,7 +138,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead - shuffleReadMetrics.incRemoteBytesRead(1000) + shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 @@ -224,18 +224,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val shuffleWriteMetrics = new ShuffleWriteMetrics() taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) - shuffleReadMetrics.incRemoteBytesRead(base + 1) - shuffleReadMetrics.incRemoteBlocksFetched(base + 2) - shuffleWriteMetrics.incShuffleBytesWritten(base + 3) + shuffleReadMetrics.remoteBytesRead = base + 1 + shuffleReadMetrics.remoteBlocksFetched = base + 2 + shuffleWriteMetrics.shuffleBytesWritten = base + 3 taskMetrics.executorRunTime = base + 4 taskMetrics.diskBytesSpilled = base + 5 taskMetrics.memoryBytesSpilled = base + 6 val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) - inputMetrics.incBytesRead(base + 7) + inputMetrics.bytesRead = base + 7 val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) - outputMetrics.incBytesWritten(base + 8) + outputMetrics.bytesWritten = base + 8 taskMetrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6ec08a5f564c..63c2559c5c5f 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -609,24 +609,24 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.incBytesRead(d + e + f) + inputMetrics.bytesRead = d + e + f t.inputMetrics = Some(inputMetrics) } else { val sr = new ShuffleReadMetrics - sr.incRemoteBytesRead(b + d) - sr.incLocalBlocksFetched(e) - sr.incFetchWaitTime(a + d) - sr.incRemoteBlocksFetched(f) + sr.remoteBytesRead = b + d + sr.localBlocksFetched = e + sr.fetchWaitTime = a + d + sr.remoteBlocksFetched = f t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - outputMetrics.incBytesWritten(a + b + c) + outputMetrics.bytesWritten = a + b + c t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics - sw.incShuffleBytesWritten(a + b + c) - sw.incShuffleWriteTime(b + c + d) + sw.shuffleBytesWritten = a + b + c + sw.shuffleWriteTime = b + c + d t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From 587def543648c908e144027adb859f651cc9b574 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 16 Jan 2015 09:36:33 -0800 Subject: [PATCH 8/8] Updated to clarify verbage --- docs/programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 0ade46886ffd..1309c8a3e110 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1316,7 +1316,7 @@ For accumulator updates performed inside actions only, Spark guarantees t will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task's update may be applied more than once if tasks or job stages are re-executed. -In addition, accumulators do not maintain lineage for the operations that use them. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like `map()`. Unless something has triggered the evaluation of the lazy transformation that updates the value of the accumlator, subsequent operations will not themselves trigger that evaluation and the value of the accumulator will remain unchanged. The below code fragment demonstrates this issue: +Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like `map()`. The below code fragment demonstrates this property: