Skip to content

Commit bde85f8

Browse files
committed
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus
## What changes were proposed in this pull request? Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#15220 from zsxwing/SPARK-17649.
1 parent f234b7c commit bde85f8

File tree

1 file changed

+25
-1
lines changed

1 file changed

+25
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.util.concurrent._
21-
import java.util.concurrent.atomic.AtomicBoolean
21+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2222

2323
import scala.util.DynamicVariable
2424

@@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
5757
// Indicate if `stop()` is called
5858
private val stopped = new AtomicBoolean(false)
5959

60+
/** A counter for dropped events. It will be reset every time we log it. */
61+
private val droppedEventsCounter = new AtomicLong(0L)
62+
63+
/** When `droppedEventsCounter` was logged last time in milliseconds. */
64+
@volatile private var lastReportTimestamp = 0L
65+
6066
// Indicate if we are processing some event
6167
// Guarded by `self`
6268
private var processingEvent = false
@@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
123129
eventLock.release()
124130
} else {
125131
onDropEvent(event)
132+
droppedEventsCounter.incrementAndGet()
133+
}
134+
135+
val droppedEvents = droppedEventsCounter.get
136+
if (droppedEvents > 0) {
137+
// Don't log too frequently
138+
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
139+
// There may be multiple threads trying to decrease droppedEventsCounter.
140+
// Use "compareAndSet" to make sure only one thread can win.
141+
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
142+
// then that thread will update it.
143+
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
144+
val prevLastReportTimestamp = lastReportTimestamp
145+
lastReportTimestamp = System.currentTimeMillis()
146+
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
147+
new java.util.Date(prevLastReportTimestamp))
148+
}
149+
}
126150
}
127151
}
128152

0 commit comments

Comments
 (0)