Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.util.DynamicVariable

import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
}
}

override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = {
if (listener.getClass.getName.startsWith("org.apache.spark")) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why creating listener just for "spark" listener ? We may want timings even for "third-party" listeners. It is even more important in my mind, for these listeners because they can be much less optimized and so bring a huge performance penalty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accounted for in a later commit. All listeners are now captured.

metrics.perListenerTimers.size()
Some(metrics.perListenerTimers(listener.getClass.getSimpleName))
} else {
None
}
}

/**
* Start sending events to attached listeners.
*
Expand Down Expand Up @@ -270,5 +280,16 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten
override def getValue: Int = queue.size()
})
}

/**
* Mapping from fully-qualified listener class name to a timer tracking the processing time of
* events processed by that listener.
*/
val perListenerTimers: LoadingCache[String, Timer] =
CacheBuilder.newBuilder().build[String, Timer](new CacheLoader[String, Timer] {
override def load(listenerName: String): Timer = {
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", listenerName))
}
})
}

33 changes: 28 additions & 5 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.codahale.metrics.Timer

import org.apache.spark.internal.Logging

/**
* An event bus which posts events to its listeners.
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use Option[Timmer] as value type?


// Marked `private[spark]` for access in tests.
private[spark] val listeners = new CopyOnWriteArrayList[L]
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

/**
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
*/
protected def createTimer(listener: L): Option[Timer] = None

/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)
listenersPlusTimers.add((listener, createTimer(listener).orNull))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keeping the option in the collection instead of putting null ?

}

/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a CopyOnWriteArrayList, shall we just do a filter and create a new array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason that CopyOnWriteArrayList was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add a synchronized to guard the listenersPlusTimers field itself.

Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this removeListener() method any further.

}
}

/**
Expand All @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listener = iter.next()
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
var maybeTimerContext = if (maybeTimer != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With an option (instead of null value) it would be much simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just premature optimization. I'll undo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there is a cost here: allocating a new Option on every postToAll is going to create more allocations and method calls. Thus I'm going to leave this unchanged.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed ! But you can put the option in the collection listenersPlusTimers (instead of doing a orNull when you create the timer) and so you can use it without having to recreate one each time in the postToAll method

maybeTimer.time()
} else {
null
}
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. simpler with an option

maybeTimerContext.stop()
}
}
}
}
Expand Down