From a689f527e6b9bf626cef0e318a37e7aa3c008bc3 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 17 May 2018 15:45:52 -0500 Subject: [PATCH 1/6] [SPARK-24309][CORE] AsyncEventQueue should stop on interrupt. EventListeners can interrupt the event queue thread. In particular, when the EventLoggingListener writes to hdfs, hdfs can interrupt the thread. When there is an interrupt, the queue should be removed and stop accepting any more events. Before this change, the queue would continue to take more events (till it was full), and then would not stop when the application was complete because the PoisonPill couldn't be added. Added a unit test which failed before this change. --- .../spark/scheduler/AsyncEventQueue.scala | 8 +++- .../spark/scheduler/LiveListenerBus.scala | 8 +++- .../spark/scheduler/SparkListenerSuite.scala | 40 +++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index c1fedd63f6a9..51cd9302cc8b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -34,7 +34,8 @@ import org.apache.spark.util.Utils * Delivery will only begin when the `start()` method is called. The `stop()` method should be * called when no more events need to be delivered. */ -private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) +private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics, + bus: LiveListenerBus) extends SparkListenerBus with Logging { @@ -97,6 +98,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi } catch { case ie: InterruptedException => logInfo(s"Stopping listener queue $name.", ie) + stopped.set(true) + bus.removeQueue(name) + // we're not going to process any more events in this queue, so might as well clear it + eventQueue.clear() + eventCount.set(0) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index ba6387a8f08a..cd897a5917ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -102,7 +102,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { queue.addListener(listener) case None => - val newQueue = new AsyncEventQueue(queue, conf, metrics) + val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) @@ -111,6 +111,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } } + private[scheduler] def removeQueue(queue: String): Unit = synchronized { + queues.asScala.find(_.name == queue).foreach { q => + queues.remove(q) + } + } + def removeListener(listener: SparkListenerInterface): Unit = synchronized { // Remove listener from all queues it was added to, and stop queues that have become empty. queues.asScala diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index fa47a52bbbc4..7046abb87d57 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -489,6 +489,38 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) } + test("interrupt within listener is handled correctly") { + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val interruptingListener = new InterruptingListener + bus.addToSharedQueue(counter1) + bus.addToSharedQueue(interruptingListener) + bus.addToStatusQueue(counter2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + + bus.start(mockSparkContext, mockMetricsSystem) + + // after we post one event, the shared queue should get stopped because of the interrupt + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 1) + assert(counter2.count === 1) + + // posting more events should be fine, they'll just get processed from the OK queue. + (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(counter2.count === 6) + + // Make sure stopping works -- this requires putting a poison pill in all active queues, which + // would fail if our interrupted queue was still active, as its queue would be full. + bus.stop() + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ @@ -547,6 +579,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception } } + /** + * A simple listener that interrupts on job end. + */ + private class InterruptingListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + Thread.currentThread().interrupt() + } + } } // These classes can't be declared inside of the SparkListenerSuite class because we don't want From 0a44c06c2ab94bf3d46e6e676d6295d4b0adb933 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 17 May 2018 21:26:41 -0500 Subject: [PATCH 2/6] review feedback --- .../scala/org/apache/spark/scheduler/AsyncEventQueue.scala | 5 ++++- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 51cd9302cc8b..4857ca53b500 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -34,7 +34,10 @@ import org.apache.spark.util.Utils * Delivery will only begin when the `start()` method is called. The `stop()` method should be * called when no more events need to be delivered. */ -private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics, +private class AsyncEventQueue( + val name: String, + conf: SparkConf, + metrics: LiveListenerBusMetrics, bus: LiveListenerBus) extends SparkListenerBus with Logging { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 7046abb87d57..cb2e0e266a65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -504,7 +504,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.start(mockSparkContext, mockMetricsSystem) - // after we post one event, the shared queue should get stopped because of the interrupt + // after we post one event, the shared queue should stop because of the interrupt bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) From 4a1f65750a26ed097517f80f52c72304819eb58a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 18 May 2018 10:06:49 -0500 Subject: [PATCH 3/6] only remove the active listener for an interrupt --- .../spark/scheduler/AsyncEventQueue.scala | 40 +++++++++---------- .../org/apache/spark/util/ListenerBus.scala | 14 +++++++ .../spark/scheduler/SparkListenerSuite.scala | 19 ++++++--- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 4857ca53b500..17b26d037be9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -85,28 +85,18 @@ private class AsyncEventQueue( } private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { - try { - var next: SparkListenerEvent = eventQueue.take() - while (next != POISON_PILL) { - val ctx = processingTime.time() - try { - super.postToAll(next) - } finally { - ctx.stop() - } - eventCount.decrementAndGet() - next = eventQueue.take() + var next: SparkListenerEvent = eventQueue.take() + while (next != POISON_PILL) { + val ctx = processingTime.time() + try { + super.postToAll(next) + } finally { + ctx.stop() } eventCount.decrementAndGet() - } catch { - case ie: InterruptedException => - logInfo(s"Stopping listener queue $name.", ie) - stopped.set(true) - bus.removeQueue(name) - // we're not going to process any more events in this queue, so might as well clear it - eventQueue.clear() - eventCount.set(0) + next = eventQueue.take() } + eventCount.decrementAndGet() } override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { @@ -139,7 +129,11 @@ private class AsyncEventQueue( eventCount.incrementAndGet() eventQueue.put(POISON_PILL) } - dispatchThread.join() + // this thread might be trying to stop itself as part of error handling -- we can't join + // in that case. + if (Thread.currentThread() != dispatchThread) { + dispatchThread.join() + } } def post(event: SparkListenerEvent): Unit = { @@ -196,6 +190,12 @@ private class AsyncEventQueue( true } + override def removeListenerOnError(listener: SparkListenerInterface): Unit = { + // the listener failed in an unrecoverably way, we want to remove it from the entire + // LiveListenerBus (potentially stopping a queue if its empty) + bus.removeListener(listener) + } + } private object AsyncEventQueue { diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index b25a731401f2..577c4103d382 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -60,6 +60,15 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { } } + /** + * This can be overriden by subclasses if there is any extra cleanup to do when removing a + * listener. In particular AsyncEventQueues can clean up queues in the LiveListenerBus. + */ + def removeListenerOnError(listener: L): Unit = { + removeListener(listener) + } + + /** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. @@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { } try { doPostEvent(listener, event) + if (Thread.interrupted()) { + logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + + s"Removing that listener.") + removeListenerOnError(listener) + } } catch { case NonFatal(e) if !isIgnorableException(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index cb2e0e266a65..9723e7d9379e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -495,25 +495,32 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(conf) val counter1 = new BasicJobCounter() val counter2 = new BasicJobCounter() - val interruptingListener = new InterruptingListener + val interruptingListener1 = new InterruptingListener + val interruptingListener2 = new InterruptingListener bus.addToSharedQueue(counter1) - bus.addToSharedQueue(interruptingListener) + bus.addToSharedQueue(interruptingListener1) bus.addToStatusQueue(counter2) - assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + bus.addToEventLogQueue(interruptingListener2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 2) bus.start(mockSparkContext, mockMetricsSystem) - // after we post one event, the shared queue should stop because of the interrupt + // after we post one event, both interrupting listeners should get removed, and the + // event log queue should be removed bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) - assert(bus.findListenersByClass[BasicJobCounter]().size === 1) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 0) + assert(counter1.count === 1) assert(counter2.count === 1) // posting more events should be fine, they'll just get processed from the OK queue. (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(counter1.count === 6) assert(counter2.count === 6) // Make sure stopping works -- this requires putting a poison pill in all active queues, which From 09d55afa4167460e732b2f4acb3cdde6029cf952 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 20 May 2018 21:24:03 -0500 Subject: [PATCH 4/6] handle InterruptedException as well --- .../org/apache/spark/util/ListenerBus.scala | 4 + .../spark/scheduler/SparkListenerSuite.scala | 81 ++++++++++--------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 577c4103d382..95c7a858944f 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -95,6 +95,10 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { removeListenerOnError(listener) } } catch { + case ie: InterruptedException => + logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + + s"Removing that listener.", ie) + removeListenerOnError(listener) case NonFatal(e) if !isIgnorableException(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 9723e7d9379e..6ffd1e84f7ad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -489,43 +489,46 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) } - test("interrupt within listener is handled correctly") { - val conf = new SparkConf(false) - .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) - val bus = new LiveListenerBus(conf) - val counter1 = new BasicJobCounter() - val counter2 = new BasicJobCounter() - val interruptingListener1 = new InterruptingListener - val interruptingListener2 = new InterruptingListener - bus.addToSharedQueue(counter1) - bus.addToSharedQueue(interruptingListener1) - bus.addToStatusQueue(counter2) - bus.addToEventLogQueue(interruptingListener2) - assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) - assert(bus.findListenersByClass[BasicJobCounter]().size === 2) - assert(bus.findListenersByClass[InterruptingListener]().size === 2) - - bus.start(mockSparkContext, mockMetricsSystem) - - // after we post one event, both interrupting listeners should get removed, and the - // event log queue should be removed - bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) - assert(bus.findListenersByClass[BasicJobCounter]().size === 2) - assert(bus.findListenersByClass[InterruptingListener]().size === 0) - assert(counter1.count === 1) - assert(counter2.count === 1) + Seq(true, false).foreach { throwInterruptedException => + val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" + test(s"interrupt within listener is handled correctly: $suffix") { + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val interruptingListener1 = new InterruptingListener(throwInterruptedException) + val interruptingListener2 = new InterruptingListener(throwInterruptedException) + bus.addToSharedQueue(counter1) + bus.addToSharedQueue(interruptingListener1) + bus.addToStatusQueue(counter2) + bus.addToEventLogQueue(interruptingListener2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 2) - // posting more events should be fine, they'll just get processed from the OK queue. - (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(counter1.count === 6) - assert(counter2.count === 6) + bus.start(mockSparkContext, mockMetricsSystem) - // Make sure stopping works -- this requires putting a poison pill in all active queues, which - // would fail if our interrupted queue was still active, as its queue would be full. - bus.stop() + // after we post one event, both interrupting listeners should get removed, and the + // event log queue should be removed + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 0) + assert(counter1.count === 1) + assert(counter2.count === 1) + + // posting more events should be fine, they'll just get processed from the OK queue. + (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(counter1.count === 6) + assert(counter2.count === 6) + + // Make sure stopping works -- this requires putting a poison pill in all active queues, which + // would fail if our interrupted queue was still active, as its queue would be full. + bus.stop() + } } /** @@ -589,9 +592,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match /** * A simple listener that interrupts on job end. */ - private class InterruptingListener extends SparkListener { + private class InterruptingListener(val throwInterruptedException: Boolean) extends SparkListener { override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - Thread.currentThread().interrupt() + if (throwInterruptedException) { + throw new InterruptedException("got interrupted") + } else { + Thread.currentThread().interrupt() + } } } } From fc8a1977b337afd29ebdbedcd6d199abe787b421 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 21 May 2018 13:55:19 -0500 Subject: [PATCH 5/6] review feedback --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 6 ------ core/src/main/scala/org/apache/spark/util/ListenerBus.scala | 6 +++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index cd897a5917ca..d135190d1e91 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -111,12 +111,6 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } } - private[scheduler] def removeQueue(queue: String): Unit = synchronized { - queues.asScala.find(_.name == queue).foreach { q => - queues.remove(q) - } - } - def removeListener(listener: SparkListenerInterface): Unit = synchronized { // Remove listener from all queues it was added to, and stop queues that have become empty. queues.asScala diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 95c7a858944f..d4474a90b26f 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -90,9 +90,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { try { doPostEvent(listener, event) if (Thread.interrupted()) { - logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + - s"Removing that listener.") - removeListenerOnError(listener) + // We want to throw the InterruptedException right away so we can associate the interrupt + // with this listener, as opposed to waiting for a queue.take() etc. to detect it. + throw new InterruptedException() } } catch { case ie: InterruptedException => From 008d14d2ead6b6d7ba74fa56a5002c35857435c3 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 21 May 2018 14:03:54 -0500 Subject: [PATCH 6/6] grammar --- .../main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 17b26d037be9..e2b6df460059 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -192,7 +192,7 @@ private class AsyncEventQueue( override def removeListenerOnError(listener: SparkListenerInterface): Unit = { // the listener failed in an unrecoverably way, we want to remove it from the entire - // LiveListenerBus (potentially stopping a queue if its empty) + // LiveListenerBus (potentially stopping a queue if it is empty) bus.removeListener(listener) }