Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 10 additions & 45 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.executor

import java.io.{File, NotSerializableException}
import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent._
import javax.annotation.concurrent.GuardedBy
import javax.management.ObjectName

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand All @@ -38,8 +37,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.MetricGetter
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
Expand Down Expand Up @@ -72,12 +72,6 @@ private[spark] class Executor(

private val conf = env.conf

// BufferPoolMXBean for direct memory
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)

// BufferPoolMXBean for mapped memory
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)

// No ip or host:port - just hostname
Utils.checkHost(executorHostname)
// must not have port specified.
Expand Down Expand Up @@ -780,8 +774,7 @@ private[spark] class Executor(
val curGCTime = computeTotalGcTime()

// get executor level memory metrics
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager,
directBufferPool, mappedBufferPool)
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager)

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
Expand Down Expand Up @@ -820,42 +813,14 @@ private[spark] object Executor {
// used instead.
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]

val DIRECT_BUFFER_POOL_NAME = "direct"
val MAPPED_BUFFER_POOL_NAME = "mapped"

/** Get the BufferPoolMXBean for the specified buffer pool. */
def getBufferPool(pool: String): BufferPoolMXBean = {
val name = new ObjectName("java.nio:type=BufferPool,name=" + pool)
ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
name.toString, classOf[BufferPoolMXBean])
}

/**
* Get the current executor level memory metrics.
*
* @param memoryManager the memory manager
* @param direct the direct memory buffer pool
* @param mapped the mapped memory buffer pool
* @return the executor memory metrics
*/
def getCurrentExecutorMetrics(
memoryManager: MemoryManager,
direct: BufferPoolMXBean,
mapped: BufferPoolMXBean) : ExecutorMetrics = {
val onHeapExecutionMemoryUsed = memoryManager.onHeapExecutionMemoryUsed
val offHeapExecutionMemoryUsed = memoryManager.offHeapExecutionMemoryUsed
val onHeapStorageMemoryUsed = memoryManager.onHeapStorageMemoryUsed
val offHeapStorageMemoryUsed = memoryManager.offHeapStorageMemoryUsed
new ExecutorMetrics(System.currentTimeMillis(),
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(),
onHeapExecutionMemoryUsed,
offHeapExecutionMemoryUsed,
onHeapStorageMemoryUsed,
offHeapStorageMemoryUsed,
onHeapExecutionMemoryUsed + onHeapStorageMemoryUsed, // on heap unified memory
offHeapExecutionMemoryUsed + offHeapStorageMemoryUsed, // off heap unified memory
direct.getMemoryUsed,
mapped.getMemoryUsed)
def getCurrentExecutorMetrics(memoryManager: MemoryManager): ExecutorMetrics = {
val metrics = new ExecutorMetrics(System.currentTimeMillis())
MetricGetter.idxAndValues.foreach { case (idx, metric) =>
metrics.metrics(idx) = metric.getMetricValue(memoryManager)
}
metrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.metrics.MetricGetter

/**
* :: DeveloperApi ::
Expand All @@ -28,27 +29,8 @@ import org.apache.spark.annotation.DeveloperApi
*
* @param timestamp the time the metrics were collected, or -1 for Spark history
* log events which are logged when a stage has completed
* @param jvmUsedHeapMemory the amount of JVM used heap memory for the executor
* @param jvmUsedNonHeapMemory the amount of JVM used non-heap memory for the executor
* @param onHeapExecutionMemory the amount of on heap execution memory used
* @param offHeapExecutionMemory the amount of off heap execution memory used
* @param onHeapStorageMemory the amount of on heap storage memory used
* @param offHeapStorageMemory the amount of off heap storage memory used
* @param onHeapUnifiedMemory the amount of on heap unified region memory used
* @param offHeapUnifiedMemory the amount of off heap unified region memory used
* @param directMemory the amount of direct memory used
* @param mappedMemory the amount of mapped memory used
*/
@DeveloperApi
class ExecutorMetrics private[spark] (
val timestamp: Long,
val jvmUsedHeapMemory: Long,
val jvmUsedNonHeapMemory: Long,
val onHeapExecutionMemory: Long,
val offHeapExecutionMemory: Long,
val onHeapStorageMemory: Long,
val offHeapStorageMemory: Long,
val onHeapUnifiedMemory: Long,
val offHeapUnifiedMemory: Long,
val directMemory: Long,
val mappedMemory: Long) extends Serializable
class ExecutorMetrics private[spark] (val timestamp: Long) extends Serializable {
val metrics = new Array[Long](MetricGetter.values.length)
}
80 changes: 80 additions & 0 deletions core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics

import java.lang.management.{BufferPoolMXBean, ManagementFactory}
import javax.management.ObjectName

import org.apache.spark.memory.MemoryManager

sealed trait MetricGetter {
def getMetricValue(memoryManager: MemoryManager): Long
val name = getClass().getName().stripSuffix("$")
}

abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
f(memoryManager)
}
}

abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter {
val bean = ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])

override def getMetricValue(memoryManager: MemoryManager): Long = {
bean.getMemoryUsed
}
}

case object JVMHeapMemory extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
}
}

case object JVMOffHeapMemory extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
}
}

case object OnHeapExecution extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed)

case object OffHeapExecution extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed)

case object OnHeapStorage extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed)

case object OffHeapStorage extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed)

case object DirectPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=direct")
case object MappedPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=mapped")

object MetricGetter {
val values = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecution,
OffHeapExecution,
OnHeapStorage,
OffHeapStorage,
DirectPoolMemory,
MappedPoolMemory
)

val idxAndValues = values.zipWithIndex.map(_.swap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,6 @@ class DAGScheduler(
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater",
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

/** BufferPoolMXBean for direct memory */
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)

/** BufferPoolMXBean for mapped memory */
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1772,8 +1766,7 @@ class DAGScheduler(
/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
// get driver memory metrics
val driverUpdates = Executor.getCurrentExecutorMetrics(
sc.env.memoryManager, directBufferPool, mappedBufferPool)
val driverUpdates = Executor.getCurrentExecutorMetrics(sc.env.memoryManager)
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,9 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut
executorMap.foreach {
executorEntry => {
for ((executorId, peakExecutorMetrics) <- executorEntry) {
val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory,
peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory,
peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory,
peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory,
peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory,
peakExecutorMetrics.mappedMemory)
val executorMetrics = new ExecutorMetrics(-1)
System.arraycopy(peakExecutorMetrics.metrics, 0, executorMetrics.metrics, 0,
peakExecutorMetrics.metrics.size)
val executorUpdate = new SparkListenerExecutorMetricsUpdate(
executorId, accumUpdates, Some(executorMetrics))
logEvent(executorUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,16 @@
package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.MetricGetter
import org.apache.spark.status.api.v1.PeakMemoryMetrics

/**
* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no
* values have been recorded yet.
*/
private[spark] class PeakExecutorMetrics {
private var _jvmUsedHeapMemory = -1L;
private var _jvmUsedNonHeapMemory = 0L;
private var _onHeapExecutionMemory = 0L
private var _offHeapExecutionMemory = 0L
private var _onHeapStorageMemory = 0L
private var _offHeapStorageMemory = 0L
private var _onHeapUnifiedMemory = 0L
private var _offHeapUnifiedMemory = 0L
private var _directMemory = 0L
private var _mappedMemory = 0L

def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory

def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory

def onHeapExecutionMemory: Long = _onHeapExecutionMemory

def offHeapExecutionMemory: Long = _offHeapExecutionMemory

def onHeapStorageMemory: Long = _onHeapStorageMemory

def offHeapStorageMemory: Long = _offHeapStorageMemory

def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory

def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory

def directMemory: Long = _directMemory

def mappedMemory: Long = _mappedMemory
val metrics = new Array[Long](MetricGetter.values.length)
metrics(0) = -1

/**
* Compare the specified memory values with the saved peak executor memory
Expand All @@ -66,47 +39,13 @@ private[spark] class PeakExecutorMetrics {
def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
var updated: Boolean = false

if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
_jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
updated = true
}
if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
_jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
updated = true
(0 until MetricGetter.values.length).foreach { metricIdx =>
val newVal = executorMetrics.metrics(metricIdx)
if ( newVal > metrics(metricIdx)) {
updated = true
metrics(metricIdx) = newVal
}
}
if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
_onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
updated = true
}
if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
_offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
updated = true
}
if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
_onHeapStorageMemory = executorMetrics.onHeapStorageMemory
updated = true
}
if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
_offHeapStorageMemory = executorMetrics.offHeapStorageMemory
updated = true
}
if (executorMetrics.onHeapUnifiedMemory > _onHeapUnifiedMemory) {
_onHeapUnifiedMemory = executorMetrics.onHeapUnifiedMemory
updated = true
}
if (executorMetrics.offHeapUnifiedMemory > _offHeapUnifiedMemory) {
_offHeapUnifiedMemory = executorMetrics.offHeapUnifiedMemory
updated = true
}
if (executorMetrics.directMemory > _directMemory) {
_directMemory = executorMetrics.directMemory
updated = true
}
if (executorMetrics.mappedMemory > _mappedMemory) {
_mappedMemory = executorMetrics.mappedMemory
updated = true
}

updated
}

Expand All @@ -115,13 +54,18 @@ private[spark] class PeakExecutorMetrics {
* values set.
*/
def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = {
if (_jvmUsedHeapMemory < 0) {
if (metrics(0) < 0) {
None
} else {
Some(new PeakMemoryMetrics(_jvmUsedHeapMemory, _jvmUsedNonHeapMemory,
_onHeapExecutionMemory, _offHeapExecutionMemory, _onHeapStorageMemory,
_offHeapStorageMemory, _onHeapUnifiedMemory, _offHeapUnifiedMemory,
_directMemory, _mappedMemory))
val copy = new PeakMemoryMetrics
System.arraycopy(this.metrics, 0, copy.metrics, 0, this.metrics.length)
Some(copy)
}
}

/** Clears/resets the saved peak values. */
def reset(): Unit = {
(0 until metrics.length).foreach { idx => metrics(idx) = 0}
metrics(0) = -1
}
}
Loading