diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bdf2553ce6e90..43ab51482a91f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -19,13 +19,12 @@ package org.apache.spark.executor import java.io.{File, NotSerializableException} import java.lang.Thread.UncaughtExceptionHandler -import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import java.lang.management.ManagementFactory import java.net.{URI, URL} import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent._ import javax.annotation.concurrent.GuardedBy -import javax.management.ObjectName import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -38,8 +37,9 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager} +import org.apache.spark.metrics.MetricGetter import org.apache.spark.rpc.RpcTimeout -import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription} +import org.apache.spark.scheduler._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ @@ -72,12 +72,6 @@ private[spark] class Executor( private val conf = env.conf - // BufferPoolMXBean for direct memory - private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME) - - // BufferPoolMXBean for mapped memory - private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME) - // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -780,8 +774,7 @@ private[spark] class Executor( val curGCTime = computeTotalGcTime() // get executor level memory metrics - val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager, - directBufferPool, mappedBufferPool) + val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager) for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { @@ -820,42 +813,14 @@ private[spark] object Executor { // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] - val DIRECT_BUFFER_POOL_NAME = "direct" - val MAPPED_BUFFER_POOL_NAME = "mapped" - - /** Get the BufferPoolMXBean for the specified buffer pool. */ - def getBufferPool(pool: String): BufferPoolMXBean = { - val name = new ObjectName("java.nio:type=BufferPool,name=" + pool) - ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer, - name.toString, classOf[BufferPoolMXBean]) - } - /** * Get the current executor level memory metrics. - * - * @param memoryManager the memory manager - * @param direct the direct memory buffer pool - * @param mapped the mapped memory buffer pool - * @return the executor memory metrics */ - def getCurrentExecutorMetrics( - memoryManager: MemoryManager, - direct: BufferPoolMXBean, - mapped: BufferPoolMXBean) : ExecutorMetrics = { - val onHeapExecutionMemoryUsed = memoryManager.onHeapExecutionMemoryUsed - val offHeapExecutionMemoryUsed = memoryManager.offHeapExecutionMemoryUsed - val onHeapStorageMemoryUsed = memoryManager.onHeapStorageMemoryUsed - val offHeapStorageMemoryUsed = memoryManager.offHeapStorageMemoryUsed - new ExecutorMetrics(System.currentTimeMillis(), - ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), - ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), - onHeapExecutionMemoryUsed, - offHeapExecutionMemoryUsed, - onHeapStorageMemoryUsed, - offHeapStorageMemoryUsed, - onHeapExecutionMemoryUsed + onHeapStorageMemoryUsed, // on heap unified memory - offHeapExecutionMemoryUsed + offHeapStorageMemoryUsed, // off heap unified memory - direct.getMemoryUsed, - mapped.getMemoryUsed) + def getCurrentExecutorMetrics(memoryManager: MemoryManager): ExecutorMetrics = { + val metrics = new ExecutorMetrics(System.currentTimeMillis()) + MetricGetter.idxAndValues.foreach { case (idx, metric) => + metrics.metrics(idx) = metric.getMetricValue(memoryManager) + } + metrics } } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index 6d35a1c682e25..663aecfbebd3e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -18,6 +18,7 @@ package org.apache.spark.executor import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.MetricGetter /** * :: DeveloperApi :: @@ -28,27 +29,8 @@ import org.apache.spark.annotation.DeveloperApi * * @param timestamp the time the metrics were collected, or -1 for Spark history * log events which are logged when a stage has completed - * @param jvmUsedHeapMemory the amount of JVM used heap memory for the executor - * @param jvmUsedNonHeapMemory the amount of JVM used non-heap memory for the executor - * @param onHeapExecutionMemory the amount of on heap execution memory used - * @param offHeapExecutionMemory the amount of off heap execution memory used - * @param onHeapStorageMemory the amount of on heap storage memory used - * @param offHeapStorageMemory the amount of off heap storage memory used - * @param onHeapUnifiedMemory the amount of on heap unified region memory used - * @param offHeapUnifiedMemory the amount of off heap unified region memory used - * @param directMemory the amount of direct memory used - * @param mappedMemory the amount of mapped memory used */ @DeveloperApi -class ExecutorMetrics private[spark] ( - val timestamp: Long, - val jvmUsedHeapMemory: Long, - val jvmUsedNonHeapMemory: Long, - val onHeapExecutionMemory: Long, - val offHeapExecutionMemory: Long, - val onHeapStorageMemory: Long, - val offHeapStorageMemory: Long, - val onHeapUnifiedMemory: Long, - val offHeapUnifiedMemory: Long, - val directMemory: Long, - val mappedMemory: Long) extends Serializable +class ExecutorMetrics private[spark] (val timestamp: Long) extends Serializable { + val metrics = new Array[Long](MetricGetter.values.length) +} diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala b/core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala new file mode 100644 index 0000000000000..53c87a1261e1d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +sealed trait MetricGetter { + def getMetricValue(memoryManager: MemoryManager): Long + val name = getClass().getName().stripSuffix("$") +} + +abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter { + override def getMetricValue(memoryManager: MemoryManager): Long = { + f(memoryManager) + } +} + +abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter { + val bean = ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer, + new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean]) + + override def getMetricValue(memoryManager: MemoryManager): Long = { + bean.getMemoryUsed + } +} + +case object JVMHeapMemory extends MetricGetter { + override def getMetricValue(memoryManager: MemoryManager): Long = { + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed() + } +} + +case object JVMOffHeapMemory extends MetricGetter { + override def getMetricValue(memoryManager: MemoryManager): Long = { + ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed() + } +} + +case object OnHeapExecution extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed) + +case object OffHeapExecution extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed) + +case object OnHeapStorage extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed) + +case object OffHeapStorage extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed) + +case object DirectPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=direct") +case object MappedPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=mapped") + +object MetricGetter { + val values = IndexedSeq( + JVMHeapMemory, + JVMOffHeapMemory, + OnHeapExecution, + OffHeapExecution, + OnHeapStorage, + OffHeapStorage, + DirectPoolMemory, + MappedPoolMemory + ) + + val idxAndValues = values.zipWithIndex.map(_.swap) +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fd950f0c207fb..98ed6c75d7ebc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -214,12 +214,6 @@ class DAGScheduler( private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater", sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) - /** BufferPoolMXBean for direct memory */ - private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME) - - /** BufferPoolMXBean for mapped memory */ - private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME) - /** * Called by the TaskSetManager to report task's starting. */ @@ -1772,8 +1766,7 @@ class DAGScheduler( /** Reports heartbeat metrics for the driver. */ private def reportHeartBeat(): Unit = { // get driver memory metrics - val driverUpdates = Executor.getCurrentExecutorMetrics( - sc.env.memoryManager, directBufferPool, mappedBufferPool) + val driverUpdates = Executor.getCurrentExecutorMetrics(sc.env.memoryManager) val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0) listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates, Some(driverUpdates))) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a795186f20b2f..baf7127a8392c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -197,12 +197,9 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut executorMap.foreach { executorEntry => { for ((executorId, peakExecutorMetrics) <- executorEntry) { - val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory, - peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory, - peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory, - peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory, - peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory, - peakExecutorMetrics.mappedMemory) + val executorMetrics = new ExecutorMetrics(-1) + System.arraycopy(peakExecutorMetrics.metrics, 0, executorMetrics.metrics, 0, + peakExecutorMetrics.metrics.size) val executorUpdate = new SparkListenerExecutorMetricsUpdate( executorId, accumUpdates, Some(executorMetrics)) logEvent(executorUpdate) diff --git a/core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala index d554c938f5e11..83bb39cf3c3e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.metrics.MetricGetter import org.apache.spark.status.api.v1.PeakMemoryMetrics /** @@ -25,36 +26,8 @@ import org.apache.spark.status.api.v1.PeakMemoryMetrics * values have been recorded yet. */ private[spark] class PeakExecutorMetrics { - private var _jvmUsedHeapMemory = -1L; - private var _jvmUsedNonHeapMemory = 0L; - private var _onHeapExecutionMemory = 0L - private var _offHeapExecutionMemory = 0L - private var _onHeapStorageMemory = 0L - private var _offHeapStorageMemory = 0L - private var _onHeapUnifiedMemory = 0L - private var _offHeapUnifiedMemory = 0L - private var _directMemory = 0L - private var _mappedMemory = 0L - - def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory - - def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory - - def onHeapExecutionMemory: Long = _onHeapExecutionMemory - - def offHeapExecutionMemory: Long = _offHeapExecutionMemory - - def onHeapStorageMemory: Long = _onHeapStorageMemory - - def offHeapStorageMemory: Long = _offHeapStorageMemory - - def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory - - def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory - - def directMemory: Long = _directMemory - - def mappedMemory: Long = _mappedMemory + val metrics = new Array[Long](MetricGetter.values.length) + metrics(0) = -1 /** * Compare the specified memory values with the saved peak executor memory @@ -66,47 +39,13 @@ private[spark] class PeakExecutorMetrics { def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = { var updated: Boolean = false - if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) { - _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory - updated = true - } - if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) { - _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory - updated = true + (0 until MetricGetter.values.length).foreach { metricIdx => + val newVal = executorMetrics.metrics(metricIdx) + if ( newVal > metrics(metricIdx)) { + updated = true + metrics(metricIdx) = newVal + } } - if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) { - _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory - updated = true - } - if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) { - _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory - updated = true - } - if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) { - _onHeapStorageMemory = executorMetrics.onHeapStorageMemory - updated = true - } - if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { - _offHeapStorageMemory = executorMetrics.offHeapStorageMemory - updated = true - } - if (executorMetrics.onHeapUnifiedMemory > _onHeapUnifiedMemory) { - _onHeapUnifiedMemory = executorMetrics.onHeapUnifiedMemory - updated = true - } - if (executorMetrics.offHeapUnifiedMemory > _offHeapUnifiedMemory) { - _offHeapUnifiedMemory = executorMetrics.offHeapUnifiedMemory - updated = true - } - if (executorMetrics.directMemory > _directMemory) { - _directMemory = executorMetrics.directMemory - updated = true - } - if (executorMetrics.mappedMemory > _mappedMemory) { - _mappedMemory = executorMetrics.mappedMemory - updated = true - } - updated } @@ -115,13 +54,18 @@ private[spark] class PeakExecutorMetrics { * values set. */ def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = { - if (_jvmUsedHeapMemory < 0) { + if (metrics(0) < 0) { None } else { - Some(new PeakMemoryMetrics(_jvmUsedHeapMemory, _jvmUsedNonHeapMemory, - _onHeapExecutionMemory, _offHeapExecutionMemory, _onHeapStorageMemory, - _offHeapStorageMemory, _onHeapUnifiedMemory, _offHeapUnifiedMemory, - _directMemory, _mappedMemory)) + val copy = new PeakMemoryMetrics + System.arraycopy(this.metrics, 0, copy.metrics, 0, this.metrics.length) + Some(copy) } } + + /** Clears/resets the saved peak values. */ + def reset(): Unit = { + (0 until metrics.length).foreach { idx => metrics(idx) = 0} + metrics(0) = -1 + } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 15f83213bb6ed..3d6360383683b 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.metrics.MetricGetter case class ApplicationInfo private[spark]( id: String, @@ -108,17 +108,10 @@ class MemoryMetrics private[spark]( val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) -class PeakMemoryMetrics private[spark]( - val jvmUsedHeapMemory: Long, - val jvmUsedNonHeapMemory: Long, - val onHeapExecutionMemory: Long, - val offHeapExecutionMemory: Long, - val onHeapStorageMemory: Long, - val offHeapStorageMemory: Long, - val onHeapUnifiedMemory: Long, - val offHeapUnifiedMemory: Long, - val directMemory: Long, - val mappedMemory: Long) +class PeakMemoryMetrics private[spark]() { + // TODO special json-ification + val metrics = new Array[Long](MetricGetter.values.length) +} class JobData private[spark]( val jobId: Int, diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 35b6aeb8f0e77..37b8099f4eb47 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.metrics.MetricGetter import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -389,17 +390,12 @@ private[spark] object JsonProtocol { * @return the JSON representation */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { - ("Timestamp" -> executorMetrics.timestamp) ~ - ("JVM Used Heap Memory" -> executorMetrics.jvmUsedHeapMemory) ~ - ("JVM Used Nonheap Memory" -> executorMetrics.jvmUsedNonHeapMemory) ~ - ("Onheap Execution Memory" -> executorMetrics.onHeapExecutionMemory) ~ - ("Offheap Execution Memory" -> executorMetrics.offHeapExecutionMemory) ~ - ("Onheap Storage Memory" -> executorMetrics.onHeapStorageMemory) ~ - ("Offheap Storage Memory" -> executorMetrics.offHeapStorageMemory) ~ - ("Onheap Unified Memory" -> executorMetrics.onHeapUnifiedMemory) ~ - ("Offheap Unified Memory" -> executorMetrics.offHeapUnifiedMemory) ~ - ("Direct Memory" -> executorMetrics.directMemory) ~ - ("Mapped Memory" -> executorMetrics.mappedMemory) + val metrics = MetricGetter.idxAndValues.map { case (idx, metric) => + JField(metric.name, executorMetrics.metrics(idx)) + } + JObject( + (Seq(JField("Timestamp", executorMetrics.timestamp)) ++ metrics): _* + ) } def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { @@ -614,20 +610,12 @@ private[spark] object JsonProtocol { */ def executorMetricsFromJson(json: JValue): ExecutorMetrics = { val timeStamp = (json \ "Timestamp").extract[Long] - val jvmUsedHeapMemory = (json \ "JVM Used Heap Memory").extract[Long] - val jvmUsedNonHeapMemory = (json \ "JVM Used Nonheap Memory").extract[Long] - val onHeapExecutionMemory = (json \ "Onheap Execution Memory").extract[Long] - val offHeapExecutionMemory = (json \ "Offheap Execution Memory").extract[Long] - val onHeapStorageMemory = (json \ "Onheap Storage Memory").extract[Long] - val offHeapStorageMemory = (json \ "Offheap Storage Memory").extract[Long] - val onHeapUnifiedMemory = (json \ "Onheap Unified Memory").extract[Long] - val offHeapUnifiedMemory = (json \ "Offheap Unified Memory").extract[Long] - val directMemory = (json \ "Direct Memory").extract[Long] - val mappedMemory = (json \ "Mapped Memory").extract[Long] - new ExecutorMetrics(timeStamp, jvmUsedHeapMemory, jvmUsedNonHeapMemory, - onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, - offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory, - mappedMemory) + val metrics = new ExecutorMetrics(timeStamp) + MetricGetter.idxAndValues.foreach { case (idx, metric) => + val metricValue = (json \ metric.name).extract[Long] + metrics.metrics(idx) = metricValue + } + metrics } def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 6b906a88183c0..f6ad8a6f16d85 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.io._ -import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.{MetricGetter, MetricsSystem} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.{JsonProtocol, Utils} @@ -412,16 +412,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit (executorMetrics1, executorMetrics2) match { case (Some(e1), Some(e2)) => assert(e1.timestamp === e2.timestamp) - assert(e1.jvmUsedHeapMemory === e2.jvmUsedHeapMemory) - assert(e1.jvmUsedNonHeapMemory === e2.jvmUsedNonHeapMemory) - assert(e1.onHeapExecutionMemory === e2.onHeapExecutionMemory) - assert(e1.offHeapExecutionMemory === e2.offHeapExecutionMemory) - assert(e1.onHeapStorageMemory === e2.onHeapStorageMemory) - assert(e1.offHeapStorageMemory === e2.offHeapStorageMemory) - assert(e1.onHeapUnifiedMemory === e2.onHeapUnifiedMemory) - assert(e1.offHeapUnifiedMemory === e2.offHeapUnifiedMemory) - assert(e1.directMemory === e2.directMemory) - assert(e1.mappedMemory === e2.mappedMemory) + (0 until MetricGetter.values.length).foreach { idx => + assert(e1.metrics(idx) === e2.metrics(idx)) + } case (None, None) => case _ => assert(false) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index afe11afe3e5c8..4f3b412dbd435 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.metrics.MetricGetter import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -686,12 +687,9 @@ private[spark] object JsonProtocolSuite extends Assertions { (metrics1, metrics2) match { case (Some(m1), Some(m2)) => assert(m1.timestamp === m2.timestamp) - assert(m1.jvmUsedHeapMemory === m2.jvmUsedHeapMemory) - assert(m1.jvmUsedNonHeapMemory === m2.jvmUsedNonHeapMemory) - assert(m1.onHeapExecutionMemory === m2.onHeapExecutionMemory) - assert(m1.offHeapExecutionMemory === m2.offHeapExecutionMemory) - assert(m1.onHeapStorageMemory === m2.onHeapStorageMemory) - assert(m1.offHeapStorageMemory === m2.offHeapStorageMemory) + (0 until MetricGetter.values.length).foreach { idx => + assert(m1.metrics(idx) === m2.metrics(idx)) + } case (None, None) => case _ => assert(false)