Skip to content

Commit 3892ec5

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-1816] LiveListenerBus dies if a listener throws an exception
The solution is to wrap a try / catch / log around the posting of each event to each listener. Author: Andrew Or <[email protected]> Closes #759 from andrewor14/listener-die and squashes the following commits: aee5107 [Andrew Or] Merge branch 'master' of github.com:apache/spark into listener-die 370939f [Andrew Or] Remove two layers of indirection 422d278 [Andrew Or] Explicitly throw an exception instead of 1 / 0 0df0e2a [Andrew Or] Try/catch and log exceptions when posting events (cherry picked from commit 5c0dafc) Signed-off-by: Patrick Wendell <[email protected]>
1 parent d6994f4 commit 3892ec5

File tree

4 files changed

+109
-29
lines changed

4 files changed

+109
-29
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
2121

2222
import org.apache.spark.Logging
23+
import org.apache.spark.util.Utils
2324

2425
/**
2526
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -42,7 +43,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
4243

4344
private val listenerThread = new Thread("SparkListenerBus") {
4445
setDaemon(true)
45-
override def run() {
46+
override def run(): Unit = Utils.logUncaughtExceptions {
4647
while (true) {
4748
eventLock.acquire()
4849
// Atomically remove and process this event
@@ -77,11 +78,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
7778
val eventAdded = eventQueue.offer(event)
7879
if (eventAdded) {
7980
eventLock.release()
80-
} else if (!queueFullErrorMessageLogged) {
81-
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
82-
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
83-
"rate at which tasks are being started by the scheduler.")
84-
queueFullErrorMessageLogged = true
81+
} else {
82+
logQueueFullErrorMessage()
8583
}
8684
}
8785

@@ -96,13 +94,18 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
9694
if (System.currentTimeMillis > finishTime) {
9795
return false
9896
}
99-
/* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
100-
* add overhead in the general case. */
97+
/* Sleep rather than using wait/notify, because this is used only for testing and
98+
* wait/notify add overhead in the general case. */
10199
Thread.sleep(10)
102100
}
103101
true
104102
}
105103

104+
/**
105+
* For testing only. Return whether the listener daemon thread is still alive.
106+
*/
107+
def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
108+
106109
/**
107110
* Return whether the event queue is empty.
108111
*
@@ -111,6 +114,23 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
111114
*/
112115
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
113116

117+
/**
118+
* Log an error message to indicate that the event queue is full. Do this only once.
119+
*/
120+
private def logQueueFullErrorMessage(): Unit = {
121+
if (!queueFullErrorMessageLogged) {
122+
if (listenerThread.isAlive) {
123+
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
124+
"This likely means one of the SparkListeners is too slow and cannot keep up with" +
125+
"the rate at which tasks are being started by the scheduler.")
126+
} else {
127+
logError("SparkListenerBus thread is dead! This means SparkListenerEvents have not" +
128+
"been (and will no longer be) propagated to listeners for some time.")
129+
}
130+
queueFullErrorMessageLogged = true
131+
}
132+
}
133+
114134
def stop() {
115135
if (!started) {
116136
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ package org.apache.spark.scheduler
2020
import scala.collection.mutable
2121
import scala.collection.mutable.ArrayBuffer
2222

23+
import org.apache.spark.Logging
24+
import org.apache.spark.util.Utils
25+
2326
/**
2427
* A SparkListenerEvent bus that relays events to its listeners
2528
*/
26-
private[spark] trait SparkListenerBus {
29+
private[spark] trait SparkListenerBus extends Logging {
2730

2831
// SparkListeners attached to this event bus
2932
protected val sparkListeners = new ArrayBuffer[SparkListener]
@@ -34,38 +37,53 @@ private[spark] trait SparkListenerBus {
3437
}
3538

3639
/**
37-
* Post an event to all attached listeners. This does nothing if the event is
38-
* SparkListenerShutdown.
40+
* Post an event to all attached listeners.
41+
* This does nothing if the event is SparkListenerShutdown.
3942
*/
4043
def postToAll(event: SparkListenerEvent) {
4144
event match {
4245
case stageSubmitted: SparkListenerStageSubmitted =>
43-
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
46+
foreachListener(_.onStageSubmitted(stageSubmitted))
4447
case stageCompleted: SparkListenerStageCompleted =>
45-
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
48+
foreachListener(_.onStageCompleted(stageCompleted))
4649
case jobStart: SparkListenerJobStart =>
47-
sparkListeners.foreach(_.onJobStart(jobStart))
50+
foreachListener(_.onJobStart(jobStart))
4851
case jobEnd: SparkListenerJobEnd =>
49-
sparkListeners.foreach(_.onJobEnd(jobEnd))
52+
foreachListener(_.onJobEnd(jobEnd))
5053
case taskStart: SparkListenerTaskStart =>
51-
sparkListeners.foreach(_.onTaskStart(taskStart))
54+
foreachListener(_.onTaskStart(taskStart))
5255
case taskGettingResult: SparkListenerTaskGettingResult =>
53-
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
56+
foreachListener(_.onTaskGettingResult(taskGettingResult))
5457
case taskEnd: SparkListenerTaskEnd =>
55-
sparkListeners.foreach(_.onTaskEnd(taskEnd))
58+
foreachListener(_.onTaskEnd(taskEnd))
5659
case environmentUpdate: SparkListenerEnvironmentUpdate =>
57-
sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
60+
foreachListener(_.onEnvironmentUpdate(environmentUpdate))
5861
case blockManagerAdded: SparkListenerBlockManagerAdded =>
59-
sparkListeners.foreach(_.onBlockManagerAdded(blockManagerAdded))
62+
foreachListener(_.onBlockManagerAdded(blockManagerAdded))
6063
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
61-
sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
64+
foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
6265
case unpersistRDD: SparkListenerUnpersistRDD =>
63-
sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
66+
foreachListener(_.onUnpersistRDD(unpersistRDD))
6467
case applicationStart: SparkListenerApplicationStart =>
65-
sparkListeners.foreach(_.onApplicationStart(applicationStart))
68+
foreachListener(_.onApplicationStart(applicationStart))
6669
case applicationEnd: SparkListenerApplicationEnd =>
67-
sparkListeners.foreach(_.onApplicationEnd(applicationEnd))
70+
foreachListener(_.onApplicationEnd(applicationEnd))
6871
case SparkListenerShutdown =>
6972
}
7073
}
74+
75+
/**
76+
* Apply the given function to all attached listeners, catching and logging any exception.
77+
*/
78+
private def foreachListener(f: SparkListener => Unit): Unit = {
79+
sparkListeners.foreach { listener =>
80+
try {
81+
f(listener)
82+
} catch {
83+
case e: Exception =>
84+
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
85+
}
86+
}
87+
}
88+
7189
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ private[spark] object Utils extends Logging {
11281128
}
11291129

11301130
/**
1131-
* Executes the given block, printing and re-throwing any uncaught exceptions.
1131+
* Execute the given block, logging and re-throwing any uncaught exception.
11321132
* This is particularly useful for wrapping code that runs in a thread, to ensure
11331133
* that exceptions are printed, and to avoid having to catch Throwable.
11341134
*/

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,16 +331,47 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
331331
}
332332
}
333333

334-
def checkNonZeroAvg(m: Traversable[Long], msg: String) {
334+
test("SparkListener moves on if a listener throws an exception") {
335+
val badListener = new BadListener
336+
val jobCounter1 = new BasicJobCounter
337+
val jobCounter2 = new BasicJobCounter
338+
val bus = new LiveListenerBus
339+
340+
// Propagate events to bad listener first
341+
bus.addListener(badListener)
342+
bus.addListener(jobCounter1)
343+
bus.addListener(jobCounter2)
344+
bus.start()
345+
346+
// Post events to all listeners, and wait until the queue is drained
347+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
348+
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
349+
350+
// The exception should be caught, and the event should be propagated to other listeners
351+
assert(bus.listenerThreadIsAlive)
352+
assert(jobCounter1.count === 5)
353+
assert(jobCounter2.count === 5)
354+
}
355+
356+
/**
357+
* Assert that the given list of numbers has an average that is greater than zero.
358+
*/
359+
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
335360
assert(m.sum / m.size.toDouble > 0.0, msg)
336361
}
337362

338-
class BasicJobCounter extends SparkListener {
363+
/**
364+
* A simple listener that counts the number of jobs observed.
365+
*/
366+
private class BasicJobCounter extends SparkListener {
339367
var count = 0
340368
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
341369
}
342370

343-
class SaveStageAndTaskInfo extends SparkListener {
371+
/**
372+
* A simple listener that saves all task infos and task metrics.
373+
*/
374+
private class SaveStageAndTaskInfo extends SparkListener {
344375
val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
345376
var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
346377

@@ -358,7 +389,10 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
358389
}
359390
}
360391

361-
class SaveTaskEvents extends SparkListener {
392+
/**
393+
* A simple listener that saves the task indices for all task events.
394+
*/
395+
private class SaveTaskEvents extends SparkListener {
362396
val startedTasks = new mutable.HashSet[Int]()
363397
val startedGettingResultTasks = new mutable.HashSet[Int]()
364398
val endedTasks = new mutable.HashSet[Int]()
@@ -377,4 +411,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
377411
startedGettingResultTasks += taskGettingResult.taskInfo.index
378412
}
379413
}
414+
415+
/**
416+
* A simple listener that throws an exception on job end.
417+
*/
418+
private class BadListener extends SparkListener {
419+
override def onJobEnd(jobEnd: SparkListenerJobEnd) = { throw new Exception }
420+
}
421+
380422
}

0 commit comments

Comments
 (0)