Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,12 @@ package object config {
private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed")
.internal()
.doc("The number of listeners that have timers to track the elapsed time of" +
"processing events. If 0 is set, disables this feature. If -1 is set," +
"it sets no limit to the number.")
.version("2.3.0")
.intConf
.checkValue(_ >= -1, "The number of listeners should be larger than -1.")
.createWithDefault(128)

private[spark] val LISTENER_BUS_LOG_SLOW_EVENT_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,14 @@ private[spark] class LiveListenerBusMetrics(conf: SparkConf)
val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)
perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
logError(s"Not measuring processing time for listener class $className because a " +
s"maximum of $maxTimed listener classes are already timed.")
if (maxTimed != 0) {
// Explicitly disabled.
logError(s"Not measuring processing time for listener class $className because a " +
s"maximum of $maxTimed listener classes are already timed.")
}
None
} else {
// maxTimed is either -1 (no limit), or an explicit number.
perListenerClassTimers(className) =
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))
perListenerClassTimers.get(className)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,22 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2))
}

test("SPARK-39973: Suppress error logs when the number of timers is set to 0") {
sc = new SparkContext(
"local",
"SparkListenerSuite",
new SparkConf().set(
LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED.key, 0.toString))
val testAppender = new LogAppender("Error logger for timers")
withLogAppender(testAppender) {
sc.addSparkListener(new SparkListener { })
sc.addSparkListener(new SparkListener { })
}
assert(!testAppender.loggingEvents
.exists(_.getMessage.getFormattedMessage.contains(
"Not measuring processing time for listener")))
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down