Skip to content

Commit 8f463ba

Browse files
Marcelo VanzinWilliam Montaz
authored andcommitted
[SPARK-22850][CORE] Ensure queued events are delivered to all event queues.
The code in LiveListenerBus was queueing events before start in the queues themselves; so in situations like the following: bus.post(someEvent) bus.addToEventLogQueue(listener) bus.start() "someEvent" would not be delivered to "listener" if that was the first listener in the queue, because the queue wouldn't exist when the event was posted. This change buffers the events before starting the bus in the bus itself, so that they can be delivered to all registered queues when the bus is started. Also tweaked the unit tests to cover the behavior above. Author: Marcelo Vanzin <[email protected]> Closes apache#20039 from vanzin/SPARK-22850.
1 parent ec5cce3 commit 8f463ba

File tree

2 files changed

+44
-7
lines changed

2 files changed

+44
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent._
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2323

2424
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
2526
import scala.reflect.ClassTag
2627
import scala.util.DynamicVariable
2728

@@ -55,6 +56,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
5556

5657
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
5758

59+
// Visible for testing.
60+
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
61+
5862
/** Add a listener to queue shared by all non-internal listeners. */
5963
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
6064
addToQueue(listener, SHARED_QUEUE)
@@ -116,12 +120,37 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
116120

117121
/** Post an event to all queues. */
118122
def post(event: SparkListenerEvent): Unit = {
119-
if (!stopped.get()) {
120-
val it = queues.iterator()
121-
while (it.hasNext()) {
122-
it.next().post(event)
123+
if (stopped.get()) {
124+
return
125+
}
126+
127+
// If the event buffer is null, it means the bus has been started and we can avoid
128+
// synchronization and post events directly to the queues. This should be the most
129+
// common case during the life of the bus.
130+
if (queuedEvents == null) {
131+
postToQueues(event)
132+
return
133+
}
134+
135+
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
136+
// calling start() picks up the new event.
137+
synchronized {
138+
if (!started.get()) {
139+
queuedEvents += event
140+
return
123141
}
124142
}
143+
144+
// If the bus was already started when the check above was made, just post directly to the
145+
// queues.
146+
postToQueues(event)
147+
}
148+
149+
private def postToQueues(event: SparkListenerEvent): Unit = {
150+
val it = queues.iterator()
151+
while (it.hasNext()) {
152+
it.next().post(event)
153+
}
125154
}
126155

127156
/**
@@ -138,7 +167,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
138167
}
139168

140169
this.sparkContext = sc
141-
queues.asScala.foreach(_.start(sc))
170+
queues.asScala.foreach { q =>
171+
q.start(sc)
172+
queuedEvents.foreach(q.post)
173+
}
174+
queuedEvents = null
142175
}
143176

144177
/**

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler
1919

2020
import java.util.concurrent.Semaphore
2121

22-
import scala.collection.mutable
2322
import scala.collection.JavaConverters._
23+
import scala.collection.mutable
2424

2525
import org.mockito.Mockito
2626
import org.scalatest.Matchers
@@ -58,20 +58,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
5858
sc = new SparkContext("local", "SparkListenerSuite", conf)
5959
val counter = new BasicJobCounter
6060
val bus = new LiveListenerBus(conf)
61-
bus.addToSharedQueue(counter)
6261

6362
// Post five events:
6463
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
6564

6665
// Five messages should be marked as received and queued, but no messages should be posted to
6766
// listeners yet because the the listener bus hasn't been started.
67+
assert(bus.queuedEvents.size === 5)
68+
bus.addToSharedQueue(counter)
6869
assert(counter.count === 0)
6970

7071
// Starting listener bus should flush all buffered events
7172
bus.start(mockSparkContext)
7273
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
7374
assert(counter.count === 5)
7475

76+
// After the bus is started, there should be no more queued events.
77+
assert(bus.queuedEvents === null)
78+
7579
// After listener bus has stopped, posting events should not increment counter
7680
bus.stop()
7781
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

0 commit comments

Comments
 (0)