-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20863] Add metrics/instrumentation to LiveListenerBus #18083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
a1fb5a8
a46c247
378206e
37a1a7d
3b713a3
60c7448
dcecdae
4a083de
d1a5e99
b8164b2
f36fbaa
76b669c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,16 @@ package org.apache.spark.scheduler | |
| import java.util.concurrent._ | ||
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.util.DynamicVariable | ||
|
|
||
| import org.apache.spark.SparkContext | ||
| import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkContext} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.metrics.MetricsSystem | ||
| import org.apache.spark.metrics.source.Source | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| /** | ||
|
|
@@ -33,15 +39,20 @@ import org.apache.spark.util.Utils | |
| * has started will events be actually propagated to all attached listeners. This listener bus | ||
| * is stopped when `stop()` is called, and it will drop further events after stopping. | ||
| */ | ||
| private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { | ||
| private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { | ||
|
|
||
| self => | ||
|
|
||
| import LiveListenerBus._ | ||
|
|
||
| private var sparkContext: SparkContext = _ | ||
|
|
||
| // Cap the capacity of the event queue so we get an explicit error (rather than | ||
| // an OOM exception) if it's perpetually being added to more quickly than it's being drained. | ||
| private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( | ||
| sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) | ||
| private val eventQueue = | ||
| new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) | ||
|
|
||
| private[spark] val metrics = new LiveListenerBusMetrics(eventQueue) | ||
|
|
||
| // Indicate if `start()` is called | ||
| private val started = new AtomicBoolean(false) | ||
|
|
@@ -67,6 +78,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |
| setDaemon(true) | ||
| override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { | ||
| LiveListenerBus.withinListenerThread.withValue(true) { | ||
| val timer = metrics.eventProcessingTime | ||
| while (true) { | ||
| eventLock.acquire() | ||
| self.synchronized { | ||
|
|
@@ -82,7 +94,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |
| } | ||
| return | ||
| } | ||
| postToAll(event) | ||
| val timerContext = timer.time() | ||
| try { | ||
| postToAll(event) | ||
| } finally { | ||
| timerContext.stop() | ||
| } | ||
| } finally { | ||
| self.synchronized { | ||
| processingEvent = false | ||
|
|
@@ -93,16 +110,23 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |
| } | ||
| } | ||
|
|
||
| override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { | ||
| metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) | ||
| } | ||
|
|
||
| /** | ||
| * Start sending events to attached listeners. | ||
| * | ||
| * This first sends out all buffered events posted before this listener bus has started, then | ||
| * listens for any additional events asynchronously while the listener bus is still running. | ||
| * This should only be called once. | ||
| * | ||
| * @param sc Used to stop the SparkContext in case the listener thread dies. | ||
| */ | ||
| def start(): Unit = { | ||
| def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { | ||
| if (started.compareAndSet(false, true)) { | ||
| sparkContext = sc | ||
| metricsSystem.registerSource(metrics) | ||
| listenerThread.start() | ||
| } else { | ||
| throw new IllegalStateException(s"$name already started!") | ||
|
|
@@ -115,12 +139,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |
| logError(s"$name has already stopped! Dropping event $event") | ||
| return | ||
| } | ||
| metrics.numEventsPosted.inc() | ||
| val eventAdded = eventQueue.offer(event) | ||
| if (eventAdded) { | ||
| eventLock.release() | ||
| } else { | ||
| onDropEvent(event) | ||
| droppedEventsCounter.incrementAndGet() | ||
| } | ||
|
|
||
| val droppedEvents = droppedEventsCounter.get | ||
|
|
@@ -200,6 +224,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa | |
| * Note: `onDropEvent` can be called in any thread. | ||
| */ | ||
| def onDropEvent(event: SparkListenerEvent): Unit = { | ||
| metrics.numDroppedEvents.inc() | ||
| droppedEventsCounter.incrementAndGet() | ||
| if (logDroppedEvent.compareAndSet(false, true)) { | ||
| // Only log the following message once to avoid duplicated annoying logs. | ||
| logError("Dropping SparkListenerEvent because no remaining room in event queue. " + | ||
|
|
@@ -217,3 +243,61 @@ private[spark] object LiveListenerBus { | |
| val name = "SparkListenerBus" | ||
| } | ||
|
|
||
| private[spark] | ||
| class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { | ||
| override val sourceName: String = "LiveListenerBus" | ||
| override val metricRegistry: MetricRegistry = new MetricRegistry | ||
|
|
||
| /** | ||
| * The total number of events posted to the LiveListenerBus. This is a count of the total number | ||
| * of events which have been produced by the application and sent to the listener bus, NOT a | ||
| * count of the number of events which have been processed and delivered to listeners (or dropped | ||
| * without being delivered). | ||
| */ | ||
| val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) | ||
|
|
||
| /** | ||
| * The total number of events that were dropped without being delivered to listeners. | ||
| */ | ||
| val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) | ||
|
|
||
| /** | ||
| * The amount of time taken to post a single event to all listeners. | ||
| */ | ||
| val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) | ||
|
|
||
| /** | ||
| * The number of messages waiting in the queue. | ||
| */ | ||
| val queueSize: Gauge[Int] = { | ||
| metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ | ||
| override def getValue: Int = queue.size() | ||
| }) | ||
| } | ||
|
|
||
| // Guarded by synchronization. | ||
| private val perListenerClassTimers = mutable.Map[String, Timer]() | ||
|
|
||
| /** | ||
| * Returns a timer tracking the processing time of the given listener class. | ||
| * events processed by that listener. This method is thread-safe. | ||
| */ | ||
| def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { | ||
| synchronized { | ||
| val className = cls.getName | ||
| val maxTimed = 128 | ||
|
||
| perListenerClassTimers.get(className).orElse { | ||
| if (perListenerClassTimers.size == maxTimed) { | ||
| logError(s"Not measuring processing time for listener class $className because a " + | ||
| s"maximum of $maxTimed listener classes are already timed.") | ||
| None | ||
| } else { | ||
| perListenerClassTimers(className) = | ||
| metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) | ||
| perListenerClassTimers.get(className) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ | |
| import scala.reflect.ClassTag | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import com.codahale.metrics.Timer | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * An event bus which posts events to its listeners. | ||
| */ | ||
| private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | ||
|
|
||
| private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] | ||
|
||
|
|
||
| // Marked `private[spark]` for access in tests. | ||
| private[spark] val listeners = new CopyOnWriteArrayList[L] | ||
| private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava | ||
|
|
||
| /** | ||
| * Returns a CodaHale metrics Timer for measuring the listener's event processing time. | ||
| * This method is intended to be overridden by subclasses. | ||
| */ | ||
| protected def getTimer(listener: L): Option[Timer] = None | ||
|
|
||
| /** | ||
| * Add a listener to listen events. This method is thread-safe and can be called in any thread. | ||
| */ | ||
| final def addListener(listener: L): Unit = { | ||
| listeners.add(listener) | ||
| listenersPlusTimers.add((listener, getTimer(listener).orNull)) | ||
| } | ||
|
|
||
| /** | ||
| * Remove a listener and it won't receive any events. This method is thread-safe and can be called | ||
| * in any thread. | ||
| */ | ||
| final def removeListener(listener: L): Unit = { | ||
| listeners.remove(listener) | ||
| listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => | ||
| listenersPlusTimers.remove(listenerAndTimer) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this is a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only reason that Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this |
||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | |
| // JavaConverters can create a JIterableWrapper if we use asScala. | ||
| // However, this method will be called frequently. To avoid the wrapper cost, here we use | ||
| // Java Iterator directly. | ||
| val iter = listeners.iterator | ||
| val iter = listenersPlusTimers.iterator | ||
| while (iter.hasNext) { | ||
| val listener = iter.next() | ||
| val listenerAndMaybeTimer = iter.next() | ||
| val listener = listenerAndMaybeTimer._1 | ||
| val maybeTimer = listenerAndMaybeTimer._2 | ||
| val maybeTimerContext = if (maybeTimer != null) { | ||
| maybeTimer.time() | ||
| } else { | ||
| null | ||
| } | ||
| try { | ||
| doPostEvent(listener, event) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) | ||
| } finally { | ||
| if (maybeTimerContext != null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. simpler with an option |
||
| maybeTimerContext.stop() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this metric? Users can easily get it by looking at the
spark.scheduler.listenerbus.eventqueue.sizeconfig.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the queue's capacity that's fixed. In https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html size refers to the number of items currently in the queue, whereas capacity refers to the maximum number of items that the queue can hold. I think the
spark.scheduler.listenerbus.eventqueue.sizeconfiguration is confusingly named.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see