@@ -38,8 +38,9 @@ import org.apache.spark.rpc._
3838import org .apache .spark .scheduler .{ExecutorExited , LiveListenerBus , SlaveLost , TaskSchedulerImpl }
3939import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RegisterExecutor , RemoveExecutor }
4040import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
41+ import org .apache .spark .util .ThreadUtils
4142
42- private [spark] class KubernetesClusterSchedulerBackendSuite
43+ class KubernetesClusterSchedulerBackendSuite
4344 extends SparkFunSuite with BeforeAndAfter {
4445
4546 private val APP_ID = " test-spark-app"
@@ -121,6 +122,9 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
121122 @ Mock
122123 private var executorPodsWatch : Watch = _
123124
125+ @ Mock
126+ private var successFuture : Future [Boolean ] = _
127+
124128 private var sparkConf : SparkConf = _
125129 private var executorPodsWatcherArgument : ArgumentCaptor [Watcher [Pod ]] = _
126130 private var allocatorRunnable : ArgumentCaptor [Runnable ] = _
@@ -169,9 +173,15 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
169173 when(rpcEnv.setupEndpoint(
170174 mockitoEq(CoarseGrainedSchedulerBackend .ENDPOINT_NAME ), driverEndpoint.capture()))
171175 .thenReturn(driverEndpointRef)
176+
177+ // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
172178 when(driverEndpointRef.ask[Boolean ]
173179 (any(classOf [Any ]))
174- (any())).thenReturn(mock[Future [Boolean ]])
180+ (any())).thenReturn(successFuture)
181+ when(successFuture.failed).thenReturn(Future [Throwable ] {
182+ // emulate behavior of the Future.failed method.
183+ throw new NoSuchElementException ()
184+ }(ThreadUtils .sameThread))
175185 }
176186
177187 test(" Basic lifecycle expectations when starting and stopping the scheduler." ) {
@@ -239,13 +249,14 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
239249 verify(podOperations).create(SECOND_EXECUTOR_POD )
240250 }
241251
242- test(" Deleting executors and then running an allocator pass after finding the loss reason" +
243- " should only delete the pod once." ) {
252+ test(" Scaled down executors should be cleaned up" ) {
244253 sparkConf
245254 .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
246255 .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
247256 val scheduler = newSchedulerBackend()
248257 scheduler.start()
258+
259+ // The scheduler backend spins up one executor pod.
249260 requestExecutorRunnable.getValue.run()
250261 when(podOperations.create(any(classOf [Pod ])))
251262 .thenAnswer(AdditionalAnswers .returnsFirstArg())
@@ -258,6 +269,8 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
258269 when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq .empty)
259270 driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext ])
260271 .apply(registerFirstExecutorMessage)
272+
273+ // Request that there are 0 executors and trigger deletion from driver.
261274 scheduler.doRequestTotalExecutors(0 )
262275 requestExecutorRunnable.getAllValues.asScala.last.run()
263276 scheduler.doKillExecutors(Seq (" 1" ))
@@ -268,6 +281,9 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
268281 val exitedPod = exitPod(FIRST_EXECUTOR_POD , 0 )
269282 executorPodsWatcherArgument.getValue.eventReceived(Action .DELETED , exitedPod)
270283 allocatorRunnable.getValue.run()
284+
285+ // No more deletion attempts of the executors.
286+ // This is graceful termination and should not be detected as a failure.
271287 verify(podOperations, times(1 )).delete(FIRST_EXECUTOR_POD )
272288 verify(driverEndpointRef, times(1 )).ask[Boolean ](
273289 RemoveExecutor (" 1" , ExecutorExited (
@@ -277,10 +293,11 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
277293 s " explicit termination request. " )))
278294 }
279295
280- test(" Executors that disconnect from application errors are noted as exits caused by app ." ) {
296+ test(" Executors that fail should not be deleted ." ) {
281297 sparkConf
282298 .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
283299 .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
300+
284301 val scheduler = newSchedulerBackend()
285302 scheduler.start()
286303 expectPodCreationWithId(1 , FIRST_EXECUTOR_POD )
@@ -298,6 +315,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
298315 executorPodsWatcherArgument.getValue.eventReceived(
299316 Action .ERROR , exitPod(FIRST_EXECUTOR_POD , 1 ))
300317
318+ // A replacement executor should be created but the error pod should persist.
301319 expectPodCreationWithId(2 , SECOND_EXECUTOR_POD )
302320 scheduler.doRequestTotalExecutors(1 )
303321 requestExecutorRunnable.getValue.run()
@@ -311,11 +329,11 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
311329 verify(podOperations, never()).delete(FIRST_EXECUTOR_POD )
312330 }
313331
314- test(" Executors should only try to get the loss reason a number of times before giving up and" +
315- " removing the executor." ) {
332+ test(" Executors disconnected due to unknown reasons are deleted and replaced." ) {
316333 sparkConf
317334 .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
318335 .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
336+
319337 val scheduler = newSchedulerBackend()
320338 scheduler.start()
321339 expectPodCreationWithId(1 , FIRST_EXECUTOR_POD )
@@ -329,11 +347,13 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
329347 when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq .empty)
330348 driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext ])
331349 .apply(registerFirstExecutorMessage)
350+
332351 driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
333352 1 to KubernetesClusterSchedulerBackend .MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ =>
334353 allocatorRunnable.getValue.run()
335354 verify(podOperations, never()).delete(FIRST_EXECUTOR_POD )
336355 }
356+
337357 expectPodCreationWithId(2 , SECOND_EXECUTOR_POD )
338358 allocatorRunnable.getValue.run()
339359 verify(podOperations).delete(FIRST_EXECUTOR_POD )
0 commit comments