From a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 23 May 2017 14:49:35 -0700 Subject: [PATCH 01/10] WIP --- .../scala/org/apache/spark/SparkContext.scala | 7 +- .../spark/scheduler/LiveListenerBus.scala | 71 ++++++++++++--- .../scheduler/EventLoggingListenerSuite.scala | 4 +- .../spark/scheduler/SparkListenerSuite.scala | 89 +++++++++++++++---- .../BlockManagerReplicationSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 2 +- .../spark/ui/storage/StorageTabSuite.scala | 5 +- .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- 8 files changed, 143 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7dbceb9c5c1a..cc44cdf1c5bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -195,6 +195,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _conf: SparkConf = _ private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None + private var _listenerBus: LiveListenerBus = _ private var _env: SparkEnv = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ @@ -247,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging { def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus(this) + private[spark] def listenerBus: LiveListenerBus = _listenerBus // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -423,6 +424,8 @@ class SparkContext(config: SparkConf) extends Logging { if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") + _listenerBus = new LiveListenerBus(_conf) + // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. _jobProgressListener = new JobProgressListener(_conf) @@ -2389,7 +2392,7 @@ class SparkContext(config: SparkConf) extends Logging { } } - listenerBus.start() + listenerBus.start(this, _env.metricsSystem) _listenerBusStarted = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 5533f7b1f236..982cadf3d659 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -22,8 +22,12 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.DynamicVariable -import org.apache.spark.{SparkContext, SparkException} +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} + +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.config._ +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.util.Utils /** @@ -33,25 +37,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { self => import LiveListenerBus._ + private var sparkContext: SparkContext = _ + // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() - private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - - private def validateAndGetQueueSize(): Int = { - val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) - if (queueSize <= 0) { - throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") - } - queueSize + private val eventQueue = { + val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) + require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!") + new LinkedBlockingQueue[SparkListenerEvent](capacity) } + private[spark] val metrics = new LiveListenerBusMetrics(eventQueue) + // Indicate if `start()` is called private val started = new AtomicBoolean(false) // Indicate if `stop()` is called @@ -76,6 +79,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { + val timer = metrics.eventProcessingTime while (true) { eventLock.acquire() self.synchronized { @@ -91,7 +95,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa } return } - postToAll(event) + val timerContext = timer.time() + try { + postToAll(event) + } finally { + timerContext.stop() + } } finally { self.synchronized { processingEvent = false @@ -109,9 +118,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * + * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(): Unit = { + def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { if (started.compareAndSet(false, true)) { + sparkContext = sc + metricsSystem.registerSource(metrics) listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } + metrics.numEventsReceived.inc() val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) + metrics.numDroppedEvents.inc() droppedEventsCounter.incrementAndGet() } @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. + */ + val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { + metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() + }) + } +} + diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152..d4640e4dba66 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -155,14 +155,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus(sc) + val listenerBus = new LiveListenerBus(conf) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start() + listenerBus.start(sc, sc.env.metricsSystem) listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 80c7e0bfee6e..cf32440bb172 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -22,10 +22,13 @@ import java.util.concurrent.Semaphore import scala.collection.mutable import scala.collection.JavaConverters._ +import org.mockito.Mockito import org.scalatest.Matchers import org.apache.spark._ import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_SIZE +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -36,14 +39,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val jobCompletionTime = 1421191296660L + private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext]) + private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem]) + test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(sc.conf) bus.addListener(listener) // Starting listener bus should flush all buffered events - bus.start() + bus.start(sc, sc.env.metricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) @@ -52,35 +58,50 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("basic creation and shutdown of LiveListenerBus") { - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val conf = new SparkConf() val counter = new BasicJobCounter - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(conf) bus.addListener(counter) - // Listener bus hasn't started yet, so posting events should not increment counter + // Metrics are initially empty. + assert(bus.metrics.numEventsReceived.getCount === 0) + assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.eventProcessingTime.getCount === 0) + + // Post five events: (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } + + // Five messages should be marked as received and queued, but no messages should be posted to + // listeners yet because the the listener bus hasn't been started. + assert(bus.metrics.numEventsReceived.getCount === 5) + assert(bus.metrics.queueSize.getValue === 5) assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) + Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.eventProcessingTime.getCount === 5) // After listener bus has stopped, posting events should not increment counter bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) + assert(bus.metrics.numEventsReceived.getCount === 5) // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus(sc) - bus.start() - bus.start() + val bus = new LiveListenerBus(conf) + bus.start(mockSparkContext, mockMetricsSystem) + bus.start(mockSparkContext, mockMetricsSystem) } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(conf) bus.stop() } } @@ -107,12 +128,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match drained = true } } - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(new SparkConf()) val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -138,6 +158,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("metrics for dropped listener events") { + val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_SIZE, 1)) + + val listenerStarted = new Semaphore(0) + val listenerWait = new Semaphore(0) + + bus.addListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + listenerStarted.release() + listenerWait.acquire() + } + }) + + bus.start(mockSparkContext, mockMetricsSystem) + + // Post a message to the listener bus and wait for processing to begin: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + listenerStarted.acquire() + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.numDroppedEvents.getCount === 0) + + // If we post an additional message then it should remain in the queue because the listener is + // busy processing the first event: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + assert(bus.metrics.queueSize.getValue === 1) + assert(bus.metrics.numDroppedEvents.getCount === 0) + + // The queue is now full, so any additional events posted to the listener will be dropped: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + assert(bus.metrics.queueSize.getValue === 1) + assert(bus.metrics.numDroppedEvents.getCount === 1) + + + // Allow the the remaining events to be processed so we can stop the listener bus: + listenerWait.release(2) + bus.stop() + } + test("basic creation of StageInfo") { sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo @@ -354,14 +412,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(new SparkConf()) // Propagate events to bad listener first bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index c100803279ea..dd61dcd11bcd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -100,7 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1e7bcdb6740f..3abc833f8482 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE when(sc.conf).thenReturn(conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index f6c8418ba3ac..66dda382eb65 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage._ /** * Test various functionality in the StorageListener that supports the StorageTab. */ -class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter { +class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { private var bus: LiveListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ @@ -43,8 +43,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn before { val conf = new SparkConf() - sc = new SparkContext("local", "test", conf) - bus = new LiveListenerBus(sc) + bus = new LiveListenerBus(conf) storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 3c4a2716caf9..24c4ada24014 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -50,7 +50,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) extends SparkFunSuite with BeforeAndAfter with Matchers - with LocalSparkContext with Logging { import WriteAheadLogBasedBlockHandler._ @@ -92,7 +91,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf) From a46c24766fc2d533be82cc709948b37383e68121 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 23 May 2017 22:01:55 -0700 Subject: [PATCH 02/10] Fix test compilation --- .../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 24c4ada24014..fe65353b9d50 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -88,7 +88,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) - sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus(conf))), conf, true) From 378206efb9f5c9628a678ba7defb536252f5cbcb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 24 May 2017 10:25:57 -0700 Subject: [PATCH 03/10] Mock in EventLoggingListenerSuite --- .../apache/spark/scheduler/EventLoggingListenerSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index d4640e4dba66..4cae6c61118a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,12 +25,14 @@ import scala.io.Source import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ +import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.io._ +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{JsonProtocol, Utils} /** @@ -162,10 +164,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start(sc, sc.env.metricsSystem) + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) + listenerBus.stop() eventLogger.stop() // Verify file contains exactly the two events logged From 37a1a7d3a66454efa29d59d1f02d0777916807a3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 May 2017 11:31:02 -0700 Subject: [PATCH 04/10] Address Wenchen's review comments. --- .../spark/scheduler/LiveListenerBus.scala | 17 +++++++++-------- .../spark/scheduler/SparkListenerSuite.scala | 6 +++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 982cadf3d659..168a67aaf6d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -136,14 +136,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { logError(s"$name has already stopped! Dropping event $event") return } - metrics.numEventsReceived.inc() + metrics.numEventsPosted.inc() val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) - metrics.numDroppedEvents.inc() - droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get @@ -223,6 +221,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { * Note: `onDropEvent` can be called in any thread. */ def onDropEvent(event: SparkListenerEvent): Unit = { + metrics.numDroppedEvents.inc() + droppedEventsCounter.incrementAndGet() if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. logError("Dropping SparkListenerEvent because no remaining room in event queue. " + @@ -245,11 +245,12 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten override val metricRegistry: MetricRegistry = new MetricRegistry /** - * The total number of events posted to the LiveListenerBus. This counts the number of times - * that `post()` is called, which might be less than the total number of events processed in - * case events are dropped. + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). */ - val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) /** * The total number of events that were dropped without being delivered to listeners. @@ -262,7 +263,7 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) /** - * The number of of messages waiting in the queue. + * The number of messages waiting in the queue. */ val queueSize: Gauge[Int] = { metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index cf32440bb172..d42486ce4d1e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -64,7 +64,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.addListener(counter) // Metrics are initially empty. - assert(bus.metrics.numEventsReceived.getCount === 0) + assert(bus.metrics.numEventsPosted.getCount === 0) assert(bus.metrics.numDroppedEvents.getCount === 0) assert(bus.metrics.queueSize.getValue === 0) assert(bus.metrics.eventProcessingTime.getCount === 0) @@ -74,7 +74,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Five messages should be marked as received and queued, but no messages should be posted to // listeners yet because the the listener bus hasn't been started. - assert(bus.metrics.numEventsReceived.getCount === 5) + assert(bus.metrics.numEventsPosted.getCount === 5) assert(bus.metrics.queueSize.getValue === 5) assert(counter.count === 0) @@ -90,7 +90,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) - assert(bus.metrics.numEventsReceived.getCount === 5) + assert(bus.metrics.numEventsPosted.getCount === 5) // Listener bus must not be started twice intercept[IllegalStateException] { From 3b713a3d45d5f92db7561ff322dc2fe5ef911295 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 May 2017 15:41:43 -0700 Subject: [PATCH 05/10] Add per-listener timing statistics. --- .../spark/scheduler/LiveListenerBus.scala | 21 ++++++++++++ .../org/apache/spark/util/ListenerBus.scala | 33 ++++++++++++++++--- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 168a67aaf6d9..b247de0a991d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.DynamicVariable import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.config._ @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } + override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = { + if (listener.getClass.getName.startsWith("org.apache.spark")) { + metrics.perListenerTimers.size() + Some(metrics.perListenerTimers(listener.getClass.getSimpleName)) + } else { + None + } + } + /** * Start sending events to attached listeners. * @@ -270,5 +280,16 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten override def getValue: Int = queue.size() }) } + + /** + * Mapping from fully-qualified listener class name to a timer tracking the processing time of + * events processed by that listener. + */ + val perListenerTimers: LoadingCache[String, Timer] = + CacheBuilder.newBuilder().build[String, Timer](new CacheLoader[String, Timer] { + override def load(listenerName: String): Timer = { + metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", listenerName)) + } + }) } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index fa5ad4e8d81e..3660f21c7ae5 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -23,6 +23,8 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** @@ -30,14 +32,22 @@ import org.apache.spark.internal.Logging */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] + // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava + + /** + * Returns a CodaHale metrics Timer for measuring the listener's event processing time. + * This method is intended to be overridden by subclasses. + */ + protected def createTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { - listeners.add(listener) + listenersPlusTimers.add((listener, createTimer(listener).orNull)) } /** @@ -45,7 +55,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * in any thread. */ final def removeListener(listener: L): Unit = { - listeners.remove(listener) + listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => + listenersPlusTimers.remove(listenerAndTimer) + } } /** @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. - val iter = listeners.iterator + val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { + maybeTimer.time() + } else { + null + } try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } finally { + if (maybeTimerContext != null) { + maybeTimerContext.stop() + } } } } From 60c7448d2cff7dd809f9d75ff48b31e21b88a915 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 May 2017 18:19:40 -0700 Subject: [PATCH 06/10] Protect against registering thousands of listener classes. --- .../spark/scheduler/LiveListenerBus.scala | 43 ++++++++++++------- .../org/apache/spark/util/ListenerBus.scala | 4 +- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index b247de0a991d..89df7613a7f7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -20,12 +20,13 @@ package org.apache.spark.scheduler import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.collection.mutable import scala.util.DynamicVariable import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -112,13 +113,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } - override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = { - if (listener.getClass.getName.startsWith("org.apache.spark")) { - metrics.perListenerTimers.size() - Some(metrics.perListenerTimers(listener.getClass.getSimpleName)) - } else { - None - } + override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { + metrics.getTimerForListener(listener) } /** @@ -250,7 +246,8 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } -private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { override val sourceName: String = "LiveListenerBus" override val metricRegistry: MetricRegistry = new MetricRegistry @@ -281,15 +278,29 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten }) } + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + /** - * Mapping from fully-qualified listener class name to a timer tracking the processing time of - * events processed by that listener. + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. */ - val perListenerTimers: LoadingCache[String, Timer] = - CacheBuilder.newBuilder().build[String, Timer](new CacheLoader[String, Timer] { - override def load(listenerName: String): Timer = { - metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", listenerName)) + def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { + synchronized { + val className = listener.getClass.getName + val maxTimed = 128 + perListenerClassTimers.get(className).orElse { + if (perListenerClassTimers.size == maxTimed) { + logError(s"Not measuring processing time for listener class $className because a " + + s"maximum of $maxTimed listener classes are already timed.") + None + } else { + perListenerClassTimers(className) = + metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) + perListenerClassTimers.get(className) + } } - }) + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 3660f21c7ae5..d435173fcb3c 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -41,13 +41,13 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * Returns a CodaHale metrics Timer for measuring the listener's event processing time. * This method is intended to be overridden by subclasses. */ - protected def createTimer(listener: L): Option[Timer] = None + protected def getTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { - listenersPlusTimers.add((listener, createTimer(listener).orNull)) + listenersPlusTimers.add((listener, getTimer(listener).orNull)) } /** From 4a083decb7e817fab49f25f4f0fe119352525aa7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jun 2017 13:12:10 -0700 Subject: [PATCH 07/10] Minor cleanups. --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 7 ++----- .../src/main/scala/org/apache/spark/util/ListenerBus.scala | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index ce89629878d0..fa4562f2b005 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -49,11 +49,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val eventQueue = { - val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) - require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_CAPACITY.key} must be > 0!") - new LinkedBlockingQueue[SparkListenerEvent](capacity) - } + private val eventQueue = + new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) private[spark] val metrics = new LiveListenerBusMetrics(eventQueue) diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index d435173fcb3c..8cc6bc09524f 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -73,7 +73,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { val listenerAndMaybeTimer = iter.next() val listener = listenerAndMaybeTimer._1 val maybeTimer = listenerAndMaybeTimer._2 - var maybeTimerContext = if (maybeTimer != null) { + val maybeTimerContext = if (maybeTimer != null) { maybeTimer.time() } else { null From d1a5e991fb7fc3e7f93090c23d8088be8b650f61 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jun 2017 13:20:37 -0700 Subject: [PATCH 08/10] Add test for per-listener-class timer; rename method. --- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 6 +++--- .../org/apache/spark/scheduler/SparkListenerSuite.scala | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index fa4562f2b005..5c43374fe5bb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -111,7 +111,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListener(listener) + metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) } /** @@ -282,9 +282,9 @@ class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with * Returns a timer tracking the processing time of the given listener class. * events processed by that listener. This method is thread-safe. */ - def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { synchronized { - val className = listener.getClass.getName + val className = cls.getName val maxTimed = 128 perListenerClassTimers.get(className).orElse { if (perListenerClassTimers.size == maxTimed) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 661b202af0bc..f3d0bc19675f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -92,6 +92,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(counter.count === 5) assert(bus.metrics.numEventsPosted.getCount === 5) + // Make sure per-listener-class timers were created: + assert(bus.metrics.getTimerForListenerClass( + classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5) + // Listener bus must not be started twice intercept[IllegalStateException] { val bus = new LiveListenerBus(conf) From f36fbaaa4d0a69de163e22f9a7514adc02e837b5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Jun 2017 14:28:38 -0700 Subject: [PATCH 09/10] Use Option. --- .../main/scala/org/apache/spark/util/ListenerBus.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 8cc6bc09524f..76a56298aaeb 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { - private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])] // Marked `private[spark]` for access in tests. private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava @@ -47,7 +47,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { - listenersPlusTimers.add((listener, getTimer(listener).orNull)) + listenersPlusTimers.add((listener, getTimer(listener))) } /** @@ -73,8 +73,8 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { val listenerAndMaybeTimer = iter.next() val listener = listenerAndMaybeTimer._1 val maybeTimer = listenerAndMaybeTimer._2 - val maybeTimerContext = if (maybeTimer != null) { - maybeTimer.time() + val maybeTimerContext = if (maybeTimer.isDefined) { + maybeTimer.get.time() } else { null } From 76b669ca6eb35a0cce4291702baa5d1f60adb467 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Jun 2017 14:50:58 -0700 Subject: [PATCH 10/10] Add configuration. --- .../org/apache/spark/internal/config/package.scala | 6 ++++++ .../org/apache/spark/scheduler/LiveListenerBus.scala | 11 +++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4ad04b04c312..7827e6760f35 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -158,6 +158,12 @@ package object config { .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .createWithDefault(10000) + private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED = + ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed") + .internal() + .intConf + .createWithDefault(128) + // This property sets the root namespace for metrics reporting private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 5c43374fe5bb..f0887e090b95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -52,7 +52,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) - private[spark] val metrics = new LiveListenerBusMetrics(eventQueue) + private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue) // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -243,8 +243,11 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } -private[spark] -class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { +private[spark] class LiveListenerBusMetrics( + conf: SparkConf, + queue: LinkedBlockingQueue[_]) + extends Source with Logging { + override val sourceName: String = "LiveListenerBus" override val metricRegistry: MetricRegistry = new MetricRegistry @@ -285,7 +288,7 @@ class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { synchronized { val className = cls.getName - val maxTimed = 128 + val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) perListenerClassTimers.get(className).orElse { if (perListenerClassTimers.size == maxTimed) { logError(s"Not measuring processing time for listener class $className because a " +