@@ -22,6 +22,7 @@ import java.util.concurrent._
2222import java .util .concurrent .atomic .{AtomicBoolean , AtomicLong }
2323
2424import scala .collection .JavaConverters ._
25+ import scala .collection .mutable
2526import scala .reflect .ClassTag
2627import scala .util .DynamicVariable
2728
@@ -55,6 +56,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
5556
5657 private val queues = new CopyOnWriteArrayList [AsyncEventQueue ]()
5758
59+ // Visible for testing.
60+ @ volatile private [scheduler] var queuedEvents = new mutable.ListBuffer [SparkListenerEvent ]()
61+
5862 /** Add a listener to queue shared by all non-internal listeners. */
5963 def addToSharedQueue (listener : SparkListenerInterface ): Unit = {
6064 addToQueue(listener, SHARED_QUEUE )
@@ -116,12 +120,37 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
116120
117121 /** Post an event to all queues. */
118122 def post (event : SparkListenerEvent ): Unit = {
119- if (! stopped.get()) {
120- val it = queues.iterator()
121- while (it.hasNext()) {
122- it.next().post(event)
123+ if (stopped.get()) {
124+ return
125+ }
126+
127+ // If the event buffer is null, it means the bus has been started and we can avoid
128+ // synchronization and post events directly to the queues. This should be the most
129+ // common case during the life of the bus.
130+ if (queuedEvents == null ) {
131+ postToQueues(event)
132+ return
133+ }
134+
135+ // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
136+ // calling start() picks up the new event.
137+ synchronized {
138+ if (! started.get()) {
139+ queuedEvents += event
140+ return
123141 }
124142 }
143+
144+ // If the bus was already started when the check above was made, just post directly to the
145+ // queues.
146+ postToQueues(event)
147+ }
148+
149+ private def postToQueues (event : SparkListenerEvent ): Unit = {
150+ val it = queues.iterator()
151+ while (it.hasNext()) {
152+ it.next().post(event)
153+ }
125154 }
126155
127156 /**
@@ -138,7 +167,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
138167 }
139168
140169 this .sparkContext = sc
141- queues.asScala.foreach(_.start(sc))
170+ queues.asScala.foreach { q =>
171+ q.start(sc)
172+ queuedEvents.foreach(q.post)
173+ }
174+ queuedEvents = null
142175 }
143176
144177 /**
0 commit comments