-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
Changes from 4 commits
c8e8abe
5d6ae1c
ad10d28
10ed328
2d20367
f904f1e
c502ec4
7879e66
2662f6f
2871335
da83f2e
f25a44b
ca85c82
8b74ba8
036148c
91fb1db
2d8894a
99044e6
263c8c8
812fdcf
7ed42a5
8d9acdf
20799d2
8905d23
a0eed11
03cd5bc
10e7f15
a14b82a
2897281
ee4aa1d
571285b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
| /** | ||
| * Creates a heartbeat thread which will call the specified reportHeartbeat function at | ||
| * intervals of intervalMs. | ||
| * | ||
| * @param reportHeartbeat the heartbeat reporting function to call. | ||
| * @param name the thread name for the heartbeater. | ||
| * @param intervalMs the interval between heartbeats. | ||
| */ | ||
| private[spark] class Heartbeater(reportHeartbeat: () => Unit, name: String, intervalMs: Long) { | ||
| // Executor for the heartbeat task | ||
| private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name) | ||
|
|
||
| /** Schedules a task to report a heartbeat. */ | ||
| private[spark] def start(): Unit = { | ||
| // Wait a random interval so the heartbeats don't end up in sync | ||
| val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] | ||
|
|
||
| val heartbeatTask = new Runnable() { | ||
| override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat()) | ||
| } | ||
| heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) | ||
| } | ||
|
|
||
| /** Stops the heartbeat thread. */ | ||
| private[spark] def stop(): Unit = { | ||
| heartbeater.shutdown() | ||
| heartbeater.awaitTermination(10, TimeUnit.SECONDS) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,13 @@ package org.apache.spark.executor | |
|
|
||
| import java.io.{File, NotSerializableException} | ||
| import java.lang.Thread.UncaughtExceptionHandler | ||
| import java.lang.management.ManagementFactory | ||
| import java.lang.management.{BufferPoolMXBean, ManagementFactory} | ||
| import java.net.{URI, URL} | ||
| import java.nio.ByteBuffer | ||
| import java.util.Properties | ||
| import java.util.concurrent._ | ||
| import javax.annotation.concurrent.GuardedBy | ||
| import javax.management.ObjectName | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.{ArrayBuffer, HashMap, Map} | ||
|
|
@@ -36,7 +37,7 @@ import org.apache.spark._ | |
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} | ||
| import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager} | ||
| import org.apache.spark.rpc.RpcTimeout | ||
| import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription} | ||
| import org.apache.spark.shuffle.FetchFailedException | ||
|
|
@@ -71,6 +72,12 @@ private[spark] class Executor( | |
|
|
||
| private val conf = env.conf | ||
|
|
||
| // BufferPoolMXBean for direct memory | ||
| private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME) | ||
|
|
||
| // BufferPoolMXBean for mapped memory | ||
| private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME) | ||
|
|
||
| // No ip or host:port - just hostname | ||
| Utils.checkHost(executorHostname) | ||
| // must not have port specified. | ||
|
|
@@ -148,7 +155,8 @@ private[spark] class Executor( | |
| private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] | ||
|
|
||
| // Executor for the heartbeat task. | ||
| private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") | ||
| private val heartbeater = new Heartbeater(reportHeartBeat, "executor-heartbeater", | ||
| conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) | ||
|
|
||
| // must be initialized before running startDriverHeartbeat() | ||
| private val heartbeatReceiverRef = | ||
|
|
@@ -167,7 +175,7 @@ private[spark] class Executor( | |
| */ | ||
| private var heartbeatFailures = 0 | ||
|
|
||
| startDriverHeartbeater() | ||
| heartbeater.start() | ||
|
|
||
| private[executor] def numRunningTasks: Int = runningTasks.size() | ||
|
|
||
|
|
@@ -216,8 +224,7 @@ private[spark] class Executor( | |
|
|
||
| def stop(): Unit = { | ||
| env.metricsSystem.report() | ||
| heartbeater.shutdown() | ||
| heartbeater.awaitTermination(10, TimeUnit.SECONDS) | ||
| heartbeater.stop() | ||
|
||
| threadPool.shutdown() | ||
| if (!isLocal) { | ||
| env.stop() | ||
|
|
@@ -772,6 +779,10 @@ private[spark] class Executor( | |
| val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() | ||
| val curGCTime = computeTotalGcTime() | ||
|
|
||
| // get executor level memory metrics | ||
| val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager, | ||
| directBufferPool, mappedBufferPool) | ||
|
|
||
| for (taskRunner <- runningTasks.values().asScala) { | ||
| if (taskRunner.task != null) { | ||
| taskRunner.task.metrics.mergeShuffleReadMetrics() | ||
|
|
@@ -780,7 +791,8 @@ private[spark] class Executor( | |
| } | ||
| } | ||
|
|
||
| val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) | ||
| val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId, | ||
| executorUpdates) | ||
| try { | ||
| val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( | ||
| message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) | ||
|
|
@@ -800,26 +812,50 @@ private[spark] class Executor( | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Schedules a task to report heartbeat and partial metrics for active tasks to driver. | ||
| */ | ||
| private def startDriverHeartbeater(): Unit = { | ||
| val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") | ||
|
|
||
| // Wait a random interval so the heartbeats don't end up in sync | ||
| val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] | ||
|
|
||
| val heartbeatTask = new Runnable() { | ||
| override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) | ||
| } | ||
| heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) | ||
| } | ||
| } | ||
|
|
||
| private[spark] object Executor { | ||
| // This is reserved for internal use by components that need to read task properties before a | ||
| // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be | ||
| // used instead. | ||
| val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] | ||
|
|
||
| val DIRECT_BUFFER_POOL_NAME = "direct" | ||
| val MAPPED_BUFFER_POOL_NAME = "mapped" | ||
|
|
||
| /** Get the BufferPoolMXBean for the specified buffer pool. */ | ||
| def getBufferPool(pool: String): BufferPoolMXBean = { | ||
| val name = new ObjectName("java.nio:type=BufferPool,name=" + pool) | ||
| ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer, | ||
| name.toString, classOf[BufferPoolMXBean]) | ||
| } | ||
|
|
||
| /** | ||
| * Get the current executor level memory metrics. | ||
| * | ||
| * @param memoryManager the memory manager | ||
| * @param direct the direct memory buffer pool | ||
| * @param mapped the mapped memory buffer pool | ||
| * @return the executor memory metrics | ||
| */ | ||
| def getCurrentExecutorMetrics( | ||
| memoryManager: MemoryManager, | ||
| direct: BufferPoolMXBean, | ||
| mapped: BufferPoolMXBean) : ExecutorMetrics = { | ||
|
||
| val onHeapExecutionMemoryUsed = memoryManager.onHeapExecutionMemoryUsed | ||
| val offHeapExecutionMemoryUsed = memoryManager.offHeapExecutionMemoryUsed | ||
| val onHeapStorageMemoryUsed = memoryManager.onHeapStorageMemoryUsed | ||
| val offHeapStorageMemoryUsed = memoryManager.offHeapStorageMemoryUsed | ||
| new ExecutorMetrics(System.currentTimeMillis(), | ||
| ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), | ||
| ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), | ||
| onHeapExecutionMemoryUsed, | ||
| offHeapExecutionMemoryUsed, | ||
| onHeapStorageMemoryUsed, | ||
| offHeapStorageMemoryUsed, | ||
| onHeapExecutionMemoryUsed + onHeapStorageMemoryUsed, // on heap unified memory | ||
| offHeapExecutionMemoryUsed + offHeapStorageMemoryUsed, // off heap unified memory | ||
| direct.getMemoryUsed, | ||
| mapped.getMemoryUsed) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Executor level metrics. | ||
| * | ||
| * This is sent to the driver periodically (on executor heartbeat), to provide | ||
| * information about each executor's metrics. | ||
| * | ||
| * @param timestamp the time the metrics were collected, or -1 for Spark history | ||
| * log events which are logged when a stage has completed | ||
| * @param jvmUsedHeapMemory the amount of JVM used heap memory for the executor | ||
| * @param jvmUsedNonHeapMemory the amount of JVM used non-heap memory for the executor | ||
| * @param onHeapExecutionMemory the amount of on heap execution memory used | ||
| * @param offHeapExecutionMemory the amount of off heap execution memory used | ||
| * @param onHeapStorageMemory the amount of on heap storage memory used | ||
| * @param offHeapStorageMemory the amount of off heap storage memory used | ||
| * @param onHeapUnifiedMemory the amount of on heap unified region memory used | ||
| * @param offHeapUnifiedMemory the amount of off heap unified region memory used | ||
| * @param directMemory the amount of direct memory used | ||
| * @param mappedMemory the amount of mapped memory used | ||
| */ | ||
| @DeveloperApi | ||
| class ExecutorMetrics private[spark] ( | ||
| val timestamp: Long, | ||
| val jvmUsedHeapMemory: Long, | ||
| val jvmUsedNonHeapMemory: Long, | ||
| val onHeapExecutionMemory: Long, | ||
| val offHeapExecutionMemory: Long, | ||
| val onHeapStorageMemory: Long, | ||
| val offHeapStorageMemory: Long, | ||
| val onHeapUnifiedMemory: Long, | ||
| val offHeapUnifiedMemory: Long, | ||
| val directMemory: Long, | ||
| val mappedMemory: Long) extends Serializable |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,11 @@ package object config { | |
| .bytesConf(ByteUnit.KiB) | ||
| .createWithDefaultString("100k") | ||
|
|
||
| private[spark] val EVENT_LOG_EXECUTOR_METRICS_UPDATES = | ||
| ConfigBuilder("spark.eventLog.logExecutorMetricsUpdates.enabled") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
||
|
|
||
| private[spark] val EVENT_LOG_OVERWRITE = | ||
| ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( | |
| onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed | ||
| } | ||
|
|
||
| /** | ||
| * On heap execution memory currently in use, in bytes. | ||
| */ | ||
| final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed | ||
|
||
|
|
||
| /** | ||
| * Off heap execution memory currently in use, in bytes. | ||
| */ | ||
| final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed | ||
|
||
|
|
||
| /** | ||
| * On heap storage memory currently in use, in bytes. | ||
| */ | ||
| final def onHeapStorageMemoryUsed: Long = onHeapStorageMemoryPool.memoryUsed | ||
|
|
||
| /** | ||
| * Off heap storage memory currently in use, in bytes. | ||
| */ | ||
| final def offHeapStorageMemoryUsed: Long = offHeapStorageMemoryPool.memoryUsed | ||
|
|
||
| /** | ||
| * Returns the execution memory consumption, in bytes, for the given task. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: whole class is
private[spark]so you don't need to add this to individual methodsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.