Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ import org.apache.spark.util.Utils
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
private class AsyncEventQueue(
val name: String,
conf: SparkConf,
metrics: LiveListenerBusMetrics,
bus: LiveListenerBus)
extends SparkListenerBus
with Logging {

Expand Down Expand Up @@ -81,23 +85,18 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
}

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
next = eventQueue.take()
}
eventCount.decrementAndGet()
}

override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
Expand Down Expand Up @@ -130,7 +129,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
eventCount.incrementAndGet()
eventQueue.put(POISON_PILL)
}
dispatchThread.join()
// this thread might be trying to stop itself as part of error handling -- we can't join
// in that case.
if (Thread.currentThread() != dispatchThread) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok to leave this, but this doesn't happen anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does still happen, we need this. We see the interrupt in postToAll, which is in the queue thread. If it fails, we call removeListenerOnError. If that results in the queue being empty, we stop the queue.

"spark-listener-group-eventLog" #20 daemon prio=5 os_prio=31 tid=0x00007f831379e800 nid=0x6303 in Object.wait() [0x0000000129226000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
        at java.lang.Thread.join(Thread.java:1245)
        - locked <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
        at java.lang.Thread.join(Thread.java:1319)
        at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:123)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:121)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.LiveListenerBus.removeListener(LiveListenerBus.scala:121)
        - locked <0x0000000780475fe8> (a org.apache.spark.scheduler.LiveListenerBus)
        at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:196)
        at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:37)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:101)

dispatchThread.join()
}
}

def post(event: SparkListenerEvent): Unit = {
Expand Down Expand Up @@ -187,6 +190,12 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
true
}

override def removeListenerOnError(listener: SparkListenerInterface): Unit = {
// the listener failed in an unrecoverably way, we want to remove it from the entire
// LiveListenerBus (potentially stopping a queue if it is empty)
bus.removeListener(listener)
}

}

private object AsyncEventQueue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
queue.addListener(listener)

case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics)
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
}
}

/**
* This can be overriden by subclasses if there is any extra cleanup to do when removing a
* listener. In particular AsyncEventQueues can clean up queues in the LiveListenerBus.
*/
def removeListenerOnError(listener: L): Unit = {
removeListener(listener)
}


/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
Expand All @@ -80,7 +89,16 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
}
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.

But... is this correct? Your test just calls Thread.interrupt(), which just sets the interrupt flag. But what if the listener calls an operation that would actually turn that into an InterrruptedException? You're not catching that here anywhere.

scala> Thread.currentThread().interrupt()

scala> Thread.sleep(10)
java.lang.InterruptedException: sleep interrupted
  at java.lang.Thread.sleep(Native Method)
  ... 33 elided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree, I will handle this too -- but I'll wait to update till there is agreement on the right overall approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.

If spark were to explicitly interrupt, then I think we'd also set some other flag indicating a reason, eg. val requestedQueueStop: AtomicBoolean so it shouldn't be hard to distinguish.

I've pushed an update to handle InterruptedException as well.

// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,48 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}

Seq(true, false).foreach { throwInterruptedException =>
val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted"
test(s"interrupt within listener is handled correctly: $suffix") {
val conf = new SparkConf(false)
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
val bus = new LiveListenerBus(conf)
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener1 = new InterruptingListener(throwInterruptedException)
val interruptingListener2 = new InterruptingListener(throwInterruptedException)
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener1)
bus.addToStatusQueue(counter2)
bus.addToEventLogQueue(interruptingListener2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 2)

bus.start(mockSparkContext, mockMetricsSystem)

// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
assert(counter1.count === 1)
assert(counter2.count === 1)

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter1.count === 6)
assert(counter2.count === 6)

// Make sure stopping works -- this requires putting a poison pill in all active queues, which
// would fail if our interrupted queue was still active, as its queue would be full.
bus.stop()
}
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down Expand Up @@ -547,6 +589,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception }
}

/**
* A simple listener that interrupts on job end.
*/
private class InterruptingListener(val throwInterruptedException: Boolean) extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (throwInterruptedException) {
throw new InterruptedException("got interrupted")
} else {
Thread.currentThread().interrupt()
}
}
}
}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
Expand Down