Skip to content

Commit bee046a

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7539][CARMEL-4980] Cache the RejectedExecutionException if the executor pool is shutdown (apache#189)
1 parent abda820 commit bee046a

File tree

2 files changed

+54
-37
lines changed

2 files changed

+54
-37
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.io.NotSerializableException
2121
import java.util.Properties
22-
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledFuture, TimeoutException, TimeUnit}
23-
import java.util.concurrent.{Future => JFutrue}
22+
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFutrue, RejectedExecutionException, ScheduledFuture, TimeoutException, TimeUnit}
2423
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
2524

2625
import scala.annotation.tailrec
@@ -1908,7 +1907,16 @@ private[spark] class DAGScheduler(
19081907
}
19091908
}
19101909
if (asyncTaskSubmission) {
1911-
broadcastPool.submit(broadCastTask)
1910+
try {
1911+
broadcastPool.submit(broadCastTask)
1912+
} catch {
1913+
case _: RejectedExecutionException
1914+
if broadcastPool.isShutdown && SparkConf.isAnalyticsCluster =>
1915+
logWarning("Failed to broadcast as executor pool was shutdown")
1916+
broadCastTask.run()
1917+
case e: Throwable =>
1918+
throw e
1919+
}
19121920
} else {
19131921
broadCastTask.run()
19141922
}

core/src/main/scala/org/apache/spark/scheduler/cluster/AnalyticsCoarseGrainedSchedulerBackend.scala

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import java.util.concurrent.{CopyOnWriteArrayList, ScheduledExecutorService, TimeUnit}
20+
import java.util.concurrent.{CopyOnWriteArrayList, RejectedExecutionException, ScheduledExecutorService, TimeUnit}
2121
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
2222
import javax.annotation.concurrent.GuardedBy
2323

@@ -413,23 +413,27 @@ class AnalyticsCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val r
413413
return
414414
}
415415

416-
val f = Future {
417-
val startTime = System.currentTimeMillis()
418-
val taskDescs = scheduler.resourceOffers(offers)
419-
val tasks = taskDescs.flatten
420-
if (!tasks.isEmpty) {
421-
launchTasks(tasks)
422-
logInfo(s"makeOffers launched tasks size: ${tasks.size}," +
423-
s" take ${System.currentTimeMillis() - startTime}ms")
424-
}
425-
}(offerExecutionContext)
426-
f.onComplete {
427-
case Failure(ex) =>
428-
// TODO handle task launch failure, either retry or abort
429-
logError(s"makeOffers error with exception: ${ex.getMessage}", ex)
430-
case _ =>
431-
}(offerExecutionContext)
432-
416+
try {
417+
val f = Future {
418+
val startTime = System.currentTimeMillis()
419+
val taskDescs = scheduler.resourceOffers(offers)
420+
val tasks = taskDescs.flatten
421+
if (!tasks.isEmpty) {
422+
launchTasks(tasks)
423+
logInfo(s"makeOffers launched tasks size: ${tasks.size}," +
424+
s" take ${System.currentTimeMillis() - startTime}ms")
425+
}
426+
}(offerExecutionContext)
427+
f.onComplete {
428+
case Failure(ex) =>
429+
// TODO handle task launch failure, either retry or abort
430+
logError(s"makeOffers error with exception: ${ex.getMessage}", ex)
431+
case _ =>
432+
}(offerExecutionContext)
433+
} catch {
434+
case _: RejectedExecutionException if offerExecutionContext.isShutdown =>
435+
logWarning("Failed to makeOffers as executor pool was shutdown")
436+
}
433437
}
434438

435439
private def buildWorkerOffer(executorId: String, executorData: ExecutorData) = {
@@ -490,22 +494,27 @@ class AnalyticsCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val r
490494
return
491495
}
492496

493-
val f = Future {
494-
val startTime = System.currentTimeMillis()
495-
val taskDescs = scheduler.resourceOffers(offers)
496-
val tasks = taskDescs.flatten
497-
if (tasks.nonEmpty) {
498-
launchTasks(tasks)
499-
logInfo(s"makeBatchOffers launched tasks size: ${tasks.size}," +
500-
s" take ${System.currentTimeMillis() - startTime}ms")
501-
}
502-
}(offerExecutionContext)
503-
f.onComplete {
504-
case Failure(ex) =>
505-
// TODO handle task launch failure, either retry or abort
506-
logError(s"makeBatchOffers error with exception: ${ex.getMessage}", ex)
507-
case _ =>
508-
}(offerExecutionContext)
497+
try {
498+
val f = Future {
499+
val startTime = System.currentTimeMillis()
500+
val taskDescs = scheduler.resourceOffers(offers)
501+
val tasks = taskDescs.flatten
502+
if (tasks.nonEmpty) {
503+
launchTasks(tasks)
504+
logInfo(s"makeBatchOffers launched tasks size: ${tasks.size}," +
505+
s" take ${System.currentTimeMillis() - startTime}ms")
506+
}
507+
}(offerExecutionContext)
508+
f.onComplete {
509+
case Failure(ex) =>
510+
// TODO handle task launch failure, either retry or abort
511+
logError(s"makeBatchOffers error with exception: ${ex.getMessage}", ex)
512+
case _ =>
513+
}(offerExecutionContext)
514+
} catch {
515+
case _: RejectedExecutionException if offerExecutionContext.isShutdown =>
516+
logWarning("Failed to makeOffers as executor pool was shutdown")
517+
}
509518
}
510519
}
511520

0 commit comments

Comments
 (0)