diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 7cd2b862216ee..11e2c475d9b45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -46,8 +46,18 @@ private class AsyncEventQueue( // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( - conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, + // if no such conf is specified, use the value specified in + // LISTENER_BUS_EVENT_QUEUE_CAPACITY + private[scheduler] def capacity: Int = { + val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + assert(queuesize > 0, s"capacity for event queue $name must be greater than 0, " + + s"but $queuesize is configured.") + queuesize + } + + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; // this allows that method to return only when the events in the queue have been fully diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d135190d1e919..302ebd30da228 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -236,6 +236,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) { queues.asScala.map(_.name).toSet } + // For testing only. + private[scheduler] def getQueueCapacity(name: String): Option[Int] = { + queues.asScala.find(_.name == name).map(_.capacity) + } } private[spark] object LiveListenerBus { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a7869d3251ebc..8903e1054f53d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -532,6 +532,34 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + test("event queue size can be configued through spark conf") { + // configure the shared queue size to be 1, event log queue size to be 2, + // and listner bus event queue size to be 5 + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", "1") + .set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2") + + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val counter3 = new BasicJobCounter() + + // add a new shared, status and event queue + bus.addToSharedQueue(counter1) + bus.addToStatusQueue(counter2) + bus.addToEventLogQueue(counter3) + + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) + // check the size of shared queue is 1 as configured + assert(bus.getQueueCapacity(SHARED_QUEUE) == Some(1)) + // no specific size of status queue is configured, + // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY + assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == Some(5)) + // check the size of event log queue is 5 as configured + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2)) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */