From 5542a7a6b596a2a09f8d134d3dd58c142f6049d5 Mon Sep 17 00:00:00 2001 From: yunzoud Date: Mon, 29 Jul 2019 10:50:31 -0700 Subject: [PATCH 1/5] configurable event queue size --- .../spark/scheduler/AsyncEventQueue.scala | 9 ++++++-- .../spark/scheduler/LiveListenerBus.scala | 7 ++++++ .../spark/scheduler/SparkListenerSuite.scala | 22 +++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 7cd2b862216ee..92c5d917b3ce7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -46,8 +46,13 @@ private class AsyncEventQueue( // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( - conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, + // if no such conf is specified, use the value specified in + // LISTENER_BUS_EVENT_QUEUE_CAPACITY + def capacity: Int = conf.getInt( + s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; // this allows that method to return only when the events in the queue have been fully diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d135190d1e919..01c82597aa9d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -236,6 +236,13 @@ private[spark] class LiveListenerBus(conf: SparkConf) { queues.asScala.map(_.name).toSet } + // For testing only. + private[scheduler] def getQueueCapacity(name: String): Int = { + queues.asScala.find(_.name == name) match { + case Some(queue) => queue.capacity + case None => -1 + } + } } private[spark] object LiveListenerBus { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a7869d3251ebc..6d6b8ef4a96b2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -532,6 +532,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + test("event queue size can be configued through spark conf") { + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + .set("spark.scheduler.listenerbus.eventqueue.shared.capacity", "1") + .set("spark.scheduler.listenerbus.eventqueue.eventLog.capacity", "2") + + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val counter3 = new BasicJobCounter() + + bus.addToSharedQueue(counter1) + bus.addToStatusQueue(counter2) + bus.addToEventLogQueue(counter3) + + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) + // check the size of each queue + assert(bus.getQueueCapacity(SHARED_QUEUE) == 1) + assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == 5) + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == 2) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ From 36995f19fefdd612167b7422dffc22dbd5807075 Mon Sep 17 00:00:00 2001 From: yunzoud Date: Tue, 30 Jul 2019 10:36:30 -0700 Subject: [PATCH 2/5] add protect field --- .../main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 92c5d917b3ce7..af46ced0eec5c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -49,7 +49,7 @@ private class AsyncEventQueue( // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, // if no such conf is specified, use the value specified in // LISTENER_BUS_EVENT_QUEUE_CAPACITY - def capacity: Int = conf.getInt( + protected def capacity: Int = conf.getInt( s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) From 4ab040cb5639d0f183b229f3e4d28d663ca52ed4 Mon Sep 17 00:00:00 2001 From: yunzoud Date: Wed, 31 Jul 2019 10:51:41 -0700 Subject: [PATCH 3/5] address feedback --- .../org/apache/spark/scheduler/AsyncEventQueue.scala | 11 ++++++++--- .../org/apache/spark/scheduler/LiveListenerBus.scala | 5 +---- .../apache/spark/scheduler/SparkListenerSuite.scala | 12 +++++++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index af46ced0eec5c..702a1dcf9aa2f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -49,9 +49,14 @@ private class AsyncEventQueue( // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, // if no such conf is specified, use the value specified in // LISTENER_BUS_EVENT_QUEUE_CAPACITY - protected def capacity: Int = conf.getInt( - s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", - conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + private[scheduler] def capacity: Int = { + val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + assert(queuesize > 0, s"capacity for event queue $name must be greater than 0," + + s"but $queuesize is configured.") + queuesize + } + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 01c82597aa9d2..fcd9766ed2f33 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -238,10 +238,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) { // For testing only. private[scheduler] def getQueueCapacity(name: String): Int = { - queues.asScala.find(_.name == name) match { - case Some(queue) => queue.capacity - case None => -1 - } + queues.asScala.find(_.name == name).map(_.capacity).getOrElse(-1) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6d6b8ef4a96b2..4f3f896d26f45 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -533,24 +533,30 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("event queue size can be configued through spark conf") { + // configure the shared queue size to be 1, event log queue size to be 2, + // and listner bus event queue size to be 5 val conf = new SparkConf(false) .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) - .set("spark.scheduler.listenerbus.eventqueue.shared.capacity", "1") - .set("spark.scheduler.listenerbus.eventqueue.eventLog.capacity", "2") + .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", "1") + .set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2") val bus = new LiveListenerBus(conf) val counter1 = new BasicJobCounter() val counter2 = new BasicJobCounter() val counter3 = new BasicJobCounter() + // add a new shared, status and event queue bus.addToSharedQueue(counter1) bus.addToStatusQueue(counter2) bus.addToEventLogQueue(counter3) assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) - // check the size of each queue + // check the size of shared queue is 1 as configured assert(bus.getQueueCapacity(SHARED_QUEUE) == 1) + // no specific size of status queue is configured, + // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == 5) + // check the size of event log queue is 5 as configured assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == 2) } From 261beb078f3a972771f8109c5a108181e6888f05 Mon Sep 17 00:00:00 2001 From: yunzoud Date: Fri, 2 Aug 2019 09:55:36 -0700 Subject: [PATCH 4/5] address feedback --- .../org/apache/spark/scheduler/AsyncEventQueue.scala | 2 +- .../org/apache/spark/scheduler/LiveListenerBus.scala | 4 ++-- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 702a1dcf9aa2f..11e2c475d9b45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -52,7 +52,7 @@ private class AsyncEventQueue( private[scheduler] def capacity: Int = { val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) - assert(queuesize > 0, s"capacity for event queue $name must be greater than 0," + + assert(queuesize > 0, s"capacity for event queue $name must be greater than 0, " + s"but $queuesize is configured.") queuesize } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index fcd9766ed2f33..302ebd30da228 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -237,8 +237,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) { } // For testing only. - private[scheduler] def getQueueCapacity(name: String): Int = { - queues.asScala.find(_.name == name).map(_.capacity).getOrElse(-1) + private[scheduler] def getQueueCapacity(name: String): Option[Int] = { + queues.asScala.find(_.name == name).map(_.capacity) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 4f3f896d26f45..f0b119fa68119 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -552,12 +552,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) // check the size of shared queue is 1 as configured - assert(bus.getQueueCapacity(SHARED_QUEUE) == 1) + assert(bus.getQueueCapacity(SHARED_QUEUE).isDefined) + assert(bus.getQueueCapacity(SHARED_QUEUE).get == 1) // no specific size of status queue is configured, // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY - assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == 5) + assert(bus.getQueueCapacity(APP_STATUS_QUEUE).isDefined) + assert(bus.getQueueCapacity(APP_STATUS_QUEUE).get == 5) // check the size of event log queue is 5 as configured - assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == 2) + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE).isDefined) + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE).get == 2) } /** From 1c6f69ab9e55684178cf9a5229022eb765cab717 Mon Sep 17 00:00:00 2001 From: yunzoud Date: Fri, 2 Aug 2019 10:04:59 -0700 Subject: [PATCH 5/5] simply code --- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index f0b119fa68119..8903e1054f53d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -552,15 +552,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) // check the size of shared queue is 1 as configured - assert(bus.getQueueCapacity(SHARED_QUEUE).isDefined) - assert(bus.getQueueCapacity(SHARED_QUEUE).get == 1) + assert(bus.getQueueCapacity(SHARED_QUEUE) == Some(1)) // no specific size of status queue is configured, // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY - assert(bus.getQueueCapacity(APP_STATUS_QUEUE).isDefined) - assert(bus.getQueueCapacity(APP_STATUS_QUEUE).get == 5) + assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == Some(5)) // check the size of event log queue is 5 as configured - assert(bus.getQueueCapacity(EVENT_LOG_QUEUE).isDefined) - assert(bus.getQueueCapacity(EVENT_LOG_QUEUE).get == 2) + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2)) } /**