Skip to content

Commit 2c3f83c

Browse files
CodingCataarondav
authored andcommitted
[SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop
https://issues.apache.org/jira/browse/SPARK-4012 This patch is a resubmission for apache#2864 What I am proposing in this patch is that ***when the exception is thrown from an infinite loop, we should stop the SparkContext, instead of let JVM throws exception forever*** So, in the infinite loops where we originally wrapped with a ` logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark component is stopped Early stopped JVM process is helpful for HA scheme design, for example, The user has a script checking the existence of the pid of the Spark Streaming driver for monitoring the availability; with the code before this patch, the JVM process is still available but not functional when the exceptions are thrown andrewor14, srowen , mind taking further consideration about the change? Author: CodingCat <[email protected]> Closes apache#5004 from CodingCat/SPARK-4012-1 and squashes the following commits: 589276a [CodingCat] throw fatal error again 3c72cd8 [CodingCat] address the comments 6087864 [CodingCat] revise comments 6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process 6322959 [CodingCat] exit JVM process when the exception is thrown from an infinite loop
1 parent 645cf3f commit 2c3f83c

File tree

9 files changed

+51
-16
lines changed

9 files changed

+51
-16
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
145145
}
146146

147147
/** Keep cleaning RDD, shuffle, and broadcast state. */
148-
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
148+
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
149149
while (!stopped) {
150150
try {
151151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17361736
}
17371737
}
17381738

1739-
listenerBus.start()
1739+
listenerBus.start(this)
17401740
}
17411741

17421742
/** Post the application start event */

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
9393
*/
9494
private def getRunner(operateFun: () => Unit): Runnable = {
9595
new Runnable() {
96-
override def run() = Utils.logUncaughtExceptions {
96+
override def run() = Utils.tryOrExit {
9797
operateFun()
9898
}
9999
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl(
145145
import sc.env.actorSystem.dispatcher
146146
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
147147
SPECULATION_INTERVAL milliseconds) {
148-
Utils.tryOrExit { checkSpeculatableTasks() }
148+
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
149149
}
150150
}
151151
}

core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent._
2121
import java.util.concurrent.atomic.AtomicBoolean
2222

2323
import com.google.common.annotations.VisibleForTesting
24+
import org.apache.spark.SparkContext
2425

2526
/**
2627
* Asynchronously passes events to registered listeners.
@@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
3839

3940
self =>
4041

42+
private var sparkContext: SparkContext = null
43+
4144
/* Cap the capacity of the event queue so we get an explicit error (rather than
4245
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
4346
private val EVENT_QUEUE_CAPACITY = 10000
@@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
5760

5861
private val listenerThread = new Thread(name) {
5962
setDaemon(true)
60-
override def run(): Unit = Utils.logUncaughtExceptions {
63+
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
6164
while (true) {
6265
eventLock.acquire()
6366
self.synchronized {
@@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
8992
* This first sends out all buffered events posted before this listener bus has started, then
9093
* listens for any additional events asynchronously while the listener bus is still running.
9194
* This should only be called once.
95+
*
96+
* @param sc Used to stop the SparkContext in case the listener thread dies.
9297
*/
93-
def start() {
98+
def start(sc: SparkContext) {
9499
if (started.compareAndSet(false, true)) {
100+
sparkContext = sc
95101
listenerThread.start()
96102
} else {
97103
throw new IllegalStateException(s"$name already started!")

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,8 @@ private[spark] object Utils extends Logging {
11461146
/**
11471147
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
11481148
* default UncaughtExceptionHandler
1149+
*
1150+
* NOTE: This method is to be called by the spark-started JVM process.
11491151
*/
11501152
def tryOrExit(block: => Unit) {
11511153
try {
@@ -1156,6 +1158,32 @@ private[spark] object Utils extends Logging {
11561158
}
11571159
}
11581160

1161+
/**
1162+
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
1163+
* exception
1164+
*
1165+
* NOTE: This method is to be called by the driver-side components to avoid stopping the
1166+
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
1167+
* spark-started JVM process .
1168+
*/
1169+
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
1170+
try {
1171+
block
1172+
} catch {
1173+
case e: ControlThrowable => throw e
1174+
case t: Throwable =>
1175+
val currentThreadName = Thread.currentThread().getName
1176+
if (sc != null) {
1177+
logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
1178+
sc.stop()
1179+
}
1180+
if (!NonFatal(t)) {
1181+
logError(s"throw uncaught fatal error in thread $currentThreadName", t)
1182+
throw t
1183+
}
1184+
}
1185+
}
1186+
11591187
/**
11601188
* Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
11611189
* exceptions as IOException. This is used when implementing Externalizable and Serializable's

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import scala.io.Source
2525

2626
import org.apache.hadoop.fs.Path
2727
import org.json4s.jackson.JsonMethods._
28-
import org.scalatest.{BeforeAndAfter, FunSuite}
28+
import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite}
2929

30-
import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
30+
import org.apache.spark._
3131
import org.apache.spark.deploy.SparkHadoopUtil
3232
import org.apache.spark.io._
3333
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
3939
* logging events, whether the parsing of the file names is correct, and whether the logged events
4040
* can be read and deserialized into actual SparkListenerEvents.
4141
*/
42-
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging {
42+
class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
43+
with Logging {
4344
import EventLoggingListenerSuite._
4445

4546
private val fileSystem = Utils.getHadoopFileSystem("/",
@@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
144145

145146
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
146147
eventLogger.start()
147-
listenerBus.start()
148+
listenerBus.start(sc)
148149
listenerBus.addListener(eventLogger)
149150
listenerBus.postToAll(applicationStart)
150151
listenerBus.postToAll(applicationEnd)

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
4646
assert(counter.count === 0)
4747

4848
// Starting listener bus should flush all buffered events
49-
bus.start()
49+
bus.start(sc)
5050
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
5151
assert(counter.count === 5)
5252

@@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
5858
// Listener bus must not be started twice
5959
intercept[IllegalStateException] {
6060
val bus = new LiveListenerBus
61-
bus.start()
62-
bus.start()
61+
bus.start(sc)
62+
bus.start(sc)
6363
}
6464

6565
// ... or stopped before starting
@@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
9696
val blockingListener = new BlockingListener
9797

9898
bus.addListener(blockingListener)
99-
bus.start()
99+
bus.start(sc)
100100
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
101101

102102
listenerStarted.acquire()
@@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
347347
bus.addListener(badListener)
348348
bus.addListener(jobCounter1)
349349
bus.addListener(jobCounter2)
350-
bus.start()
350+
bus.start(sc)
351351

352352
// Post events to all listeners, and wait until the queue is drained
353353
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6161
}
6262
}), "JobScheduler")
6363

64-
listenerBus.start()
64+
listenerBus.start(ssc.sparkContext)
6565
receiverTracker = new ReceiverTracker(ssc)
6666
receiverTracker.start()
6767
jobGenerator.start()

0 commit comments

Comments
 (0)