Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.util.Utils
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one param per line now that they don't fit

bus: LiveListenerBus)
extends SparkListenerBus
with Logging {

Expand Down Expand Up @@ -97,6 +98,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
stopped.set(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is the right thing to do. What would the old bus do here?

I remember there's code to catch exceptions from listeners and just log them, so that other listeners in the same queue are not affected. Not that the event writer queue would suffer from that, but others might.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old bus would stop the entire spark context (details below).

I dunno what the right behavior is either -- I figured this was the intention given the logInfo. Alternatively we could (a) stop the entire spark context, (b) skip this particular event and keep going or (c) stop the one listener which happened to be active on the interrupt, but keep the queue active (if there were more listeners).

more details on the 2.2 behavior:

ListenerBus.postToAll wouldn't catch the event.

And the polling thread in LiveListenerBus wraps everything in Utils.tryOrStopSparkContext.

I did a similar test on branch-2.2: squito@72951bd

18/05/17 21:38:23.446 SparkListenerBus ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/05/17 21:38:23.448 SparkListenerBus ERROR Utils: throw uncaught fatal error in thread SparkListenerBus
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
        at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
        at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

bus.removeQueue(name)
// we're not going to process any more events in this queue, so might as well clear it
eventQueue.clear()
eventCount.set(0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
queue.addListener(listener)

case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics)
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
Expand All @@ -111,6 +111,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}
}

private[scheduler] def removeQueue(queue: String): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it?

queues.asScala.find(_.name == queue).foreach { q =>
queues.remove(q)
}
}

def removeListener(listener: SparkListenerInterface): Unit = synchronized {
// Remove listener from all queues it was added to, and stop queues that have become empty.
queues.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,38 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}

test("interrupt within listener is handled correctly") {
val conf = new SparkConf(false)
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
val bus = new LiveListenerBus(conf)
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener = new InterruptingListener
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener)
bus.addToStatusQueue(counter2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)

bus.start(mockSparkContext, mockMetricsSystem)

// after we post one event, the shared queue should get stopped because of the interrupt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/get stopped/stop

bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 1)
assert(counter2.count === 1)

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter2.count === 6)

// Make sure stopping works -- this requires putting a poison pill in all active queues, which
// would fail if our interrupted queue was still active, as its queue would be full.
bus.stop()
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down Expand Up @@ -547,6 +579,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception }
}

/**
* A simple listener that interrupts on job end.
*/
private class InterruptingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
Thread.currentThread().interrupt()
}
}
}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
Expand Down