From 5b224a7a248dfdb1eede40e76b48b7438bab1cc7 Mon Sep 17 00:00:00 2001 From: Sumeet Gajjar Date: Fri, 2 Apr 2021 23:01:49 -0700 Subject: [PATCH 1/3] Do not reregister BlockManager with Executor is shutting down --- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/executor/ExecutorSuite.scala | 68 ++++++++++++++----- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3865c9c987b1..8fc1c80cb7f6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -996,7 +996,7 @@ private[spark] class Executor( try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) - if (response.reregisterBlockManager) { + if (!executorShutdown.get && response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 97ffb36062db..9a6c33515d81 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -270,29 +270,36 @@ class ExecutorSuite extends SparkFunSuite heartbeatZeroAccumulatorUpdateTest(false) } + private def withMockHeartbeatReceiverRef(executor: Executor) + (func: RpcEndpointRef => Unit): Unit = { + val executorClass = classOf[Executor] + val mockReceiverRef = mock[RpcEndpointRef] + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiverRef) + + func(mockReceiverRef) + } + private def withHeartbeatExecutor(confs: (String, String)*) - (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { val conf = new SparkConf confs.foreach { case (k, v) => conf.set(k, v) } val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) withExecutor("id", "localhost", SparkEnv.get) { executor => - val executorClass = classOf[Executor] - - // Save all heartbeats sent into an ArrayBuffer for verification - val heartbeats = ArrayBuffer[Heartbeat]() - val mockReceiver = mock[RpcEndpointRef] - when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) - .thenAnswer((invocation: InvocationOnMock) => { - val args = invocation.getArguments() - heartbeats += args(0).asInstanceOf[Heartbeat] - HeartbeatResponse(false) - }) - val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") - receiverRef.setAccessible(true) - receiverRef.set(executor, mockReceiver) + withMockHeartbeatReceiverRef(executor) { mockReceiverRef => + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer((invocation: InvocationOnMock) => { + val args = invocation.getArguments() + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + }) - f(executor, heartbeats) + f(executor, heartbeats) + } } } @@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0) } + test("SPARK-34949: do not re-register BlockManager when executor is shutting down") { + val reregisterInvoked = new AtomicBoolean(false) + val mockBlockManager = mock[BlockManager] + when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) => + reregisterInvoked.getAndSet(true) + } + val conf = new SparkConf(false).setAppName("test").setMaster("local[2]") + val mockEnv = createMockEnv(conf, new JavaSerializer(conf)) + when(mockEnv.blockManager).thenReturn(mockBlockManager) + + withExecutor("id", "localhost", mockEnv) { executor => + withMockHeartbeatReceiverRef(executor) { mockReceiverRef => + when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer { + (_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true) + } + val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat")) + executor.invokePrivate(reportHeartbeat()) + assert(reregisterInvoked.get(), + "BlockManager.reregister not invoked when reregisterBlockManager was true") + + reregisterInvoked.getAndSet(false) + executor.stop() + executor.invokePrivate(reportHeartbeat()) + assert(!reregisterInvoked.get(), + "BlockManager.reregister should not be invoked when executor is stopping") + } + } + } + test("SPARK-33587: isFatalError") { def errorInThreadPool(e: => Throwable): Throwable = { intercept[Throwable] { From c941914374d41829f8c540308014e12252e7f20f Mon Sep 17 00:00:00 2001 From: Sumeet Gajjar Date: Sat, 3 Apr 2021 00:25:21 -0700 Subject: [PATCH 2/3] Fix indentation --- .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 9a6c33515d81..ba49ce2a2318 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -271,7 +271,7 @@ class ExecutorSuite extends SparkFunSuite } private def withMockHeartbeatReceiverRef(executor: Executor) - (func: RpcEndpointRef => Unit): Unit = { + (func: RpcEndpointRef => Unit): Unit = { val executorClass = classOf[Executor] val mockReceiverRef = mock[RpcEndpointRef] val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") @@ -282,7 +282,7 @@ class ExecutorSuite extends SparkFunSuite } private def withHeartbeatExecutor(confs: (String, String)*) - (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { val conf = new SparkConf confs.foreach { case (k, v) => conf.set(k, v) } val serializer = new JavaSerializer(conf) From 69a784a1d3f6b92dea704661100160df137f1078 Mon Sep 17 00:00:00 2001 From: Sumeet Gajjar Date: Mon, 5 Apr 2021 11:11:45 -0700 Subject: [PATCH 3/3] Fix assertion clue --- .../scala/org/apache/spark/executor/ExecutorSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index ba49ce2a2318..a237447b0fa2 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -440,14 +440,14 @@ class ExecutorSuite extends SparkFunSuite } val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat")) executor.invokePrivate(reportHeartbeat()) - assert(reregisterInvoked.get(), - "BlockManager.reregister not invoked when reregisterBlockManager was true") + assert(reregisterInvoked.get(), "BlockManager.reregister should be invoked " + + "on HeartbeatResponse(reregisterBlockManager = true) when executor is not shutting down") reregisterInvoked.getAndSet(false) executor.stop() executor.invokePrivate(reportHeartbeat()) assert(!reregisterInvoked.get(), - "BlockManager.reregister should not be invoked when executor is stopping") + "BlockManager.reregister should not be invoked when executor is shutting down") } } }