Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

Expand Down Expand Up @@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
Expand Down Expand Up @@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}

Expand Down