From 3e0d9536512300d27201e1d5cc4d9b5755a47871 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Mon, 17 Sep 2018 14:55:21 -0700 Subject: [PATCH 1/9] Don't send zero accumulators for metrics in heartbeat --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 072277cb78dc1..a9235e1d7cdae 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -799,7 +799,8 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) + accumUpdates += + ((taskRunner.taskId, taskRunner.task.metrics.accumulators().filterNot(_.isZero))) } } From 3cf88a4ab34064074d42f5daa3a448e8f9def649 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Wed, 19 Sep 2018 11:40:47 -0700 Subject: [PATCH 2/9] add tests --- .../apache/spark/executor/ExecutorSuite.scala | 100 ++++++++++++++++-- 1 file changed, 94 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 77a7668d3a1d1..464206899bc0d 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.nio.ByteBuffer import java.util.Properties -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.concurrent.duration._ import scala.language.postfixOps @@ -39,14 +40,14 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.memory.MemoryManager -import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.{JVMHeapMemory, JVMOffHeapMemory, MetricsSystem} import org.apache.spark.rdd.RDD -import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription} +import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout} +import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.storage.{BlockManager, BlockManagerId} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils} class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually { @@ -252,11 +253,96 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero metrics") { + withHeartbeatExecutor((executor, heartbeats) => { + // When no tasks are running, there should be no accumulators sent in heartbeat + invokeReportHeartbeat(executor) + assert(heartbeats.length == 1) + assert(heartbeats(0).accumUpdates.length == 0, + "No updates should be sent when no tasks are running") + + // When we start a task with a nonzero accumulator, that should end up in the heartbeat + val metrics = new TaskMetrics() + val nonZeroAccumulator = new LongAccumulator() + nonZeroAccumulator.add(1) + metrics.registerAccumulator(nonZeroAccumulator) + + val executorClass = classOf[Executor] + val tasksMap = { + val field = + executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks") + field.setAccessible(true) + field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]] + } + val mockTaskRunner = mock[executor.TaskRunner] + val mockTask = mock[Task[Any]] + when(mockTask.metrics).thenReturn(metrics) + when(mockTaskRunner.taskId).thenReturn(6) + when(mockTaskRunner.task).thenReturn(mockTask) + when(mockTaskRunner.startGCTime).thenReturn(1) + tasksMap.put(6, mockTaskRunner) + + invokeReportHeartbeat(executor) + assert(heartbeats.length == 2) + val updates = heartbeats(1).accumUpdates + assert(updates.length == 1 && updates(0)._1 == 6, + "Heartbeat should only send update for the one task running") + val accumsSent = updates(0)._2.length + assert(accumsSent > 0, "The nonzero accumulator we added should be sent") + assert(accumsSent == metrics.accumulators().count(!_.isZero), + "The number of accumulators sent should match the number of nonzero accumulators") + }) + } + + private def withHeartbeatExecutor(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + val conf = new SparkConf + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) + val executorClass = classOf[Executor] + + // Set ExecutorMetricType.values to be a minimal set to avoid get null exceptions + val metricClass = + Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName() + "$") + val metricTypeValues = metricClass.getDeclaredField("values") + metricTypeValues.setAccessible(true) + metricTypeValues.set( + org.apache.spark.metrics.ExecutorMetricType, + IndexedSeq(JVMHeapMemory, JVMOffHeapMemory)) + + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + val mockReceiver = mock[RpcEndpointRef] + when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { + override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + } + }) + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiver) + + f(executor, heartbeats) + } + + private def invokeReportHeartbeat(executor: Executor): Unit = { + val method = classOf[Executor] + .getDeclaredMethod("org$apache$spark$executor$Executor$$reportHeartBeat") + method.setAccessible(true) + method.invoke(executor) + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] val mockMetricsSystem = mock[MetricsSystem] val mockMemoryManager = mock[MemoryManager] + val mockBlockManager = mock[BlockManager] when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) when(mockEnv.serializerManager).thenReturn(mock[SerializerManager]) @@ -264,6 +350,8 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) when(mockEnv.memoryManager).thenReturn(mockMemoryManager) when(mockEnv.closureSerializer).thenReturn(serializer) + when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234)) + when(mockEnv.blockManager).thenReturn(mockBlockManager) SparkEnv.set(mockEnv) mockEnv } From fc982560c862037413d46cd74be7fd7f82000578 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Wed, 19 Sep 2018 14:52:33 -0700 Subject: [PATCH 3/9] Refactor heartbeat configs and gate dropping zero metrics behind a config --- .../org/apache/spark/executor/Executor.scala | 17 +++++++++++------ .../apache/spark/internal/config/package.scala | 11 +++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a9235e1d7cdae..93add74a03326 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -120,7 +120,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + private val userClassPathFirst = conf.getBoolean(EXECUTOR_USER_CLASS_PATH_FIRST.key, false) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) @@ -149,7 +149,7 @@ private[spark] class Executor( // Executor for the heartbeat task. private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, - "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + "executor-heartbeater", conf.getTimeAsMs(EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = @@ -160,7 +160,7 @@ private[spark] class Executor( * times, it should kill itself. The default value is 60. It means we will retry to send * heartbeats about 10 minutes because the heartbeat interval is 10s. */ - private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) + private val HEARTBEAT_MAX_FAILURES = conf.getInt(EXECUTOR_HEARTBEAT_MAX_FAILURES.key, 60) /** * Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each @@ -799,8 +799,13 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += - ((taskRunner.taskId, taskRunner.task.metrics.accumulators().filterNot(_.isZero))) + val accumulatorsToReport = + if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) { + taskRunner.task.metrics.accumulators().filterNot(_.isZero) + } else { + taskRunner.task.metrics.accumulators() + } + accumUpdates += ((taskRunner.taskId, accumulatorsToReport)) } } @@ -808,7 +813,7 @@ private[spark] class Executor( executorUpdates) try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8d827189ebb57..1d740d5e4d1bb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -83,6 +83,17 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = + ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) + + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = + ConfigBuilder("spark.executor.heartbeatInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = + ConfigBuilder("spark.executor.heartbeat.maxFailures").intConf.createWithDefault(60) + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional From 6bb91b01571601e22566d3c4084dc16f1f3f8f61 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Wed, 19 Sep 2018 15:25:08 -0700 Subject: [PATCH 4/9] update tests --- .../apache/spark/executor/ExecutorSuite.scala | 92 +++++++++++-------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 464206899bc0d..560b6f8e56887 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -254,48 +254,17 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } test("Heartbeat should drop zero metrics") { - withHeartbeatExecutor((executor, heartbeats) => { - // When no tasks are running, there should be no accumulators sent in heartbeat - invokeReportHeartbeat(executor) - assert(heartbeats.length == 1) - assert(heartbeats(0).accumUpdates.length == 0, - "No updates should be sent when no tasks are running") - - // When we start a task with a nonzero accumulator, that should end up in the heartbeat - val metrics = new TaskMetrics() - val nonZeroAccumulator = new LongAccumulator() - nonZeroAccumulator.add(1) - metrics.registerAccumulator(nonZeroAccumulator) - - val executorClass = classOf[Executor] - val tasksMap = { - val field = - executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks") - field.setAccessible(true) - field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]] - } - val mockTaskRunner = mock[executor.TaskRunner] - val mockTask = mock[Task[Any]] - when(mockTask.metrics).thenReturn(metrics) - when(mockTaskRunner.taskId).thenReturn(6) - when(mockTaskRunner.task).thenReturn(mockTask) - when(mockTaskRunner.startGCTime).thenReturn(1) - tasksMap.put(6, mockTaskRunner) + heartbeatZeroMetricTest(true) + } - invokeReportHeartbeat(executor) - assert(heartbeats.length == 2) - val updates = heartbeats(1).accumUpdates - assert(updates.length == 1 && updates(0)._1 == 6, - "Heartbeat should only send update for the one task running") - val accumsSent = updates(0)._2.length - assert(accumsSent > 0, "The nonzero accumulator we added should be sent") - assert(accumsSent == metrics.accumulators().count(!_.isZero), - "The number of accumulators sent should match the number of nonzero accumulators") - }) + test("Heartbeat should not drop zero metrics when the conf is set to false") { + heartbeatZeroMetricTest(false) } - private def withHeartbeatExecutor(f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { val conf = new SparkConf + confs.foreach { case (k, v) => conf.set(k, v) } val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) val executor = @@ -337,6 +306,53 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug method.invoke(executor) } + private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = { + val c = "spark.executor.heartbeat.dropZeroMetrics" -> dropZeroMetrics.toString + withHeartbeatExecutor(c) { (executor, heartbeats) => + // When no tasks are running, there should be no accumulators sent in heartbeat + invokeReportHeartbeat(executor) + assert(heartbeats.length == 1) + assert(heartbeats(0).accumUpdates.length == 0, + "No updates should be sent when no tasks are running") + + // When we start a task with a nonzero accumulator, that should end up in the heartbeat + val metrics = new TaskMetrics() + val nonZeroAccumulator = new LongAccumulator() + nonZeroAccumulator.add(1) + metrics.registerAccumulator(nonZeroAccumulator) + + val executorClass = classOf[Executor] + val tasksMap = { + val field = + executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks") + field.setAccessible(true) + field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]] + } + val mockTaskRunner = mock[executor.TaskRunner] + val mockTask = mock[Task[Any]] + when(mockTask.metrics).thenReturn(metrics) + when(mockTaskRunner.taskId).thenReturn(6) + when(mockTaskRunner.task).thenReturn(mockTask) + when(mockTaskRunner.startGCTime).thenReturn(1) + tasksMap.put(6, mockTaskRunner) + + invokeReportHeartbeat(executor) + assert(heartbeats.length == 2) + val updates = heartbeats(1).accumUpdates + assert(updates.length == 1 && updates(0)._1 == 6, + "Heartbeat should only send update for the one task running") + val accumsSent = updates(0)._2.length + assert(accumsSent > 0, "The nonzero accumulator we added should be sent") + if (dropZeroMetrics) { + assert(accumsSent == metrics.accumulators().count(!_.isZero), + "The number of accumulators sent should match the number of nonzero accumulators") + } else { + assert(accumsSent == metrics.accumulators().length, + "The number of accumulators sent should match the number of total accumulators") + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] From 7d0c39be53bd0f61720c2800420c9c6669924ceb Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Wed, 26 Sep 2018 15:13:38 -0700 Subject: [PATCH 5/9] refactor code per Ryan's review --- .../scala/org/apache/spark/SparkConf.scala | 8 ++--- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 34 +++++++++++++------ .../spark/internal/config/package.scala | 9 +++-- .../apache/spark/executor/ExecutorSuite.scala | 3 +- .../MesosCoarseGrainedSchedulerBackend.scala | 3 +- 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6c4c5c94cfa28..608c933d78c86 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") + val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") + s"spark.network.timeout=${executorTimeoutThreshold}ms must be no less than the value of " + + s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}ms.") } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d943087ab6b80..0a66dae94dbd0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging { // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater", - conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 93add74a03326..66d5a8d86b5c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -120,7 +121,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.getBoolean(EXECUTOR_USER_CLASS_PATH_FIRST.key, false) + private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) @@ -147,21 +148,32 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + /** + * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` + * times, it should kill itself. The default value is 60. It means we will retry to send + * heartbeats about 10 minutes because the heartbeat interval is 10s. + */ + private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES) + + /** + * Whether to drop empty accumulators from heartbeats sent to the driver. Including the empty + * accumulators (that satisfy isZero) can make the size of the heartbeat message very large. + */ + private val HEARTBEAT_DROP_ZEROES = conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES) + + /** + * Interval to send heartbeats, in milliseconds + */ + private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) + // Executor for the heartbeat task. private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, - "executor-heartbeater", conf.getTimeAsMs(EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) + "executor-heartbeater", HEARTBEAT_INTERVAL_MS) // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) - /** - * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` - * times, it should kill itself. The default value is 60. It means we will retry to send - * heartbeats about 10 minutes because the heartbeat interval is 10s. - */ - private val HEARTBEAT_MAX_FAILURES = conf.getInt(EXECUTOR_HEARTBEAT_MAX_FAILURES.key, 60) - /** * Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each * successful heartbeat will reset it to 0. @@ -800,7 +812,7 @@ private[spark] class Executor( taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) val accumulatorsToReport = - if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) { + if (HEARTBEAT_DROP_ZEROES) { taskRunner.task.metrics.accumulators().filterNot(_.isZero) } else { taskRunner.task.metrics.accumulators() @@ -813,7 +825,7 @@ private[spark] class Executor( executorUpdates) try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) + message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1d740d5e4d1bb..069fb58fcdb35 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -83,8 +83,11 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional - private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = - ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES = + ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates") + .internal() + .booleanConf + .createWithDefault(true) private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = ConfigBuilder("spark.executor.heartbeatInterval") @@ -92,7 +95,7 @@ package object config { .createWithDefaultString("10s") private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = - ConfigBuilder("spark.executor.heartbeat.maxFailures").intConf.createWithDefault(60) + ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60) private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 560b6f8e56887..32f1731dadd94 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -39,6 +39,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState +import org.apache.spark.internal.config._ import org.apache.spark.memory.MemoryManager import org.apache.spark.metrics.{JVMHeapMemory, JVMOffHeapMemory, MetricsSystem} import org.apache.spark.rdd.RDD @@ -307,7 +308,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = { - val c = "spark.executor.heartbeat.dropZeroMetrics" -> dropZeroMetrics.toString + val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString withHeartbeatExecutor(c) { (executor, heartbeats) => // When no tasks are running, there should be no accumulators sent in heartbeat invokeReportHeartbeat(executor) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 178de30f0f381..bac0246b7ddc5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true } From db14bd1cd644df2c7742e1fc4ce7eb2ab8b34f6e Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Wed, 26 Sep 2018 17:18:43 -0700 Subject: [PATCH 6/9] minor refactor --- .../org/apache/spark/executor/ExecutorSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 32f1731dadd94..44bb2f922bee0 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -254,12 +254,12 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } - test("Heartbeat should drop zero metrics") { - heartbeatZeroMetricTest(true) + test("Heartbeat should drop zero accumulator updates") { + heartbeatZeroAccumulatorUpdateTest(true) } - test("Heartbeat should not drop zero metrics when the conf is set to false") { - heartbeatZeroMetricTest(false) + test("Heartbeat should not drop zero accumulator updates when the conf is disabled") { + heartbeatZeroAccumulatorUpdateTest(false) } private def withHeartbeatExecutor(confs: (String, String)*) @@ -307,7 +307,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug method.invoke(executor) } - private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = { + private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = { val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString withHeartbeatExecutor(c) { (executor, heartbeats) => // When no tasks are running, there should be no accumulators sent in heartbeat From e22ac78b37230a4dfe2393524d78082fdd4d4252 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Thu, 27 Sep 2018 09:28:04 -0700 Subject: [PATCH 7/9] use PrivateMethodTester --- .../apache/spark/executor/ExecutorSuite.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 44bb2f922bee0..b1c9c3c767c52 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -34,6 +34,7 @@ import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar @@ -50,7 +51,8 @@ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManager, BlockManagerId} import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils} -class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually { +class ExecutorSuite extends SparkFunSuite + with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") { // mock some objects to make Executor.launchTask() happy @@ -300,18 +302,14 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug f(executor, heartbeats) } - private def invokeReportHeartbeat(executor: Executor): Unit = { - val method = classOf[Executor] - .getDeclaredMethod("org$apache$spark$executor$Executor$$reportHeartBeat") - method.setAccessible(true) - method.invoke(executor) - } - private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = { val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString withHeartbeatExecutor(c) { (executor, heartbeats) => + val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat) + // When no tasks are running, there should be no accumulators sent in heartbeat - invokeReportHeartbeat(executor) + executor.invokePrivate(reportHeartbeat()) + // invokeReportHeartbeat(executor) assert(heartbeats.length == 1) assert(heartbeats(0).accumUpdates.length == 0, "No updates should be sent when no tasks are running") @@ -337,7 +335,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug when(mockTaskRunner.startGCTime).thenReturn(1) tasksMap.put(6, mockTaskRunner) - invokeReportHeartbeat(executor) + executor.invokePrivate(reportHeartbeat()) assert(heartbeats.length == 2) val updates = heartbeats(1).accumUpdates assert(updates.length == 1 && updates(0)._1 == 6, From 9cbe14ceba323459f5079077a45e6263c13be55a Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Fri, 28 Sep 2018 11:44:53 -0700 Subject: [PATCH 8/9] Use TestMemoryManager for test to not have to mock object field --- .../apache/spark/executor/ExecutorSuite.scala | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index b1c9c3c767c52..1f8a65707b2f7 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -41,15 +41,15 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.config._ -import org.apache.spark.memory.MemoryManager -import org.apache.spark.metrics.{JVMHeapMemory, JVMOffHeapMemory, MetricsSystem} +import org.apache.spark.memory.TestMemoryManager +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.RDD import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout} import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManager, BlockManagerId} -import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread} class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { @@ -274,15 +274,6 @@ class ExecutorSuite extends SparkFunSuite new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) val executorClass = classOf[Executor] - // Set ExecutorMetricType.values to be a minimal set to avoid get null exceptions - val metricClass = - Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName() + "$") - val metricTypeValues = metricClass.getDeclaredField("values") - metricTypeValues.setAccessible(true) - metricTypeValues.set( - org.apache.spark.metrics.ExecutorMetricType, - IndexedSeq(JVMHeapMemory, JVMOffHeapMemory)) - // Save all heartbeats sent into an ArrayBuffer for verification val heartbeats = ArrayBuffer[Heartbeat]() val mockReceiver = mock[RpcEndpointRef] @@ -356,14 +347,13 @@ class ExecutorSuite extends SparkFunSuite val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] val mockMetricsSystem = mock[MetricsSystem] - val mockMemoryManager = mock[MemoryManager] val mockBlockManager = mock[BlockManager] when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) when(mockEnv.serializerManager).thenReturn(mock[SerializerManager]) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) - when(mockEnv.memoryManager).thenReturn(mockMemoryManager) + when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf)) when(mockEnv.closureSerializer).thenReturn(serializer) when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234)) when(mockEnv.blockManager).thenReturn(mockBlockManager) From f6fa33790769c14d9dde6f56e07233c2887d80a6 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Fri, 28 Sep 2018 11:53:59 -0700 Subject: [PATCH 9/9] Use getTimeAsSeconds --- core/src/main/scala/org/apache/spark/SparkConf.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 608c933d78c86..390f8246f4fce 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") - val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL) + val executorTimeoutThresholdMs = + getTimeAsSeconds("spark.network.timeout", "120s") * 1000 + val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. - require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}ms must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}ms.") + require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + + s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " + + s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") } /**