Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def isStopped: Boolean = stopped.get()

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
private[spark] val listenerBus = new LiveListenerBus(this)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
Expand Down Expand Up @@ -2147,7 +2147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start(this)
listenerBus.start()
_listenerBusStarted = true
}

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.internal

import java.util.concurrent.TimeUnit

import org.apache.spark.SparkException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove I think this is unused now.

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit

Expand Down Expand Up @@ -103,4 +104,16 @@ package object config {
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
Copy link
Contributor

@JoshRosen JoshRosen May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another post-hoc review/complaint: I think that size might be misleading in this context where we're talking about a queue, since the size of a queue refers to the number of elements currently in the queue while its capacity refers to the maximum size that the queue can reach. This configuration name caused confusion in https://github.com/apache/spark/pull/18083/files/378206efb9f5c9628a678ba7defb536252f5cbcb#r118413115

Instead, it might have been better to call it capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Capacity would have been a better choice.

.intConf
.transform((x: Int) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transform wasn't really meant to be a validator. We had discussed adding validator functionality to the Config stuff but I think that should be done separately. So for now lets pull this out and just do the check in the LiveListenerBus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The method is intended for something else. Having the validator functionality here would be good to do so. I will make the changes and fix the tests.

if (x <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
} else {
x
}
})
.createWithDefault(10000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.util.DynamicVariable

import org.apache.spark.internal.config._
import org.apache.spark.SparkContext
import org.apache.spark.util.Utils

Expand All @@ -32,18 +33,16 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus extends SparkListenerBus {
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modifying LiveListenerBus now and noticed that we're passing in sparkContext even though we only use it to access conf. I think it would have been better to just pass in conf here. It would make the initialization order constraints a lot clearer, too: right now it's not immediately clear why eventQueue needs to be a lazy val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we also use this to tear down sparkContext when the listener dies. I still have a weak preference for the old code, however, since I think the lifecycle was clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I recall and as you mentioned, a ref to sparkContext was already being used to tear down when the listener died because of an uncaught Exception. The idea was to refactor the code to make sparkContext available at instantiation time and access the conf from it rather than passing it separately. This defn is cyclic and lazy val was used to ensure that conf was accessed only after sc was intialized.


self =>

import LiveListenerBus._

private var sparkContext: SparkContext = null

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down Expand Up @@ -96,11 +95,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext): Unit = {
def start(): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
listenerBus.start(sc)
listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L

test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite")
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(listener)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

Expand All @@ -53,15 +53,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)

Expand All @@ -72,14 +72,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.start(sc)
bus.start(sc)
val bus = new LiveListenerBus(sc)
bus.start()
bus.start()
}

// ... or stopped before starting
intercept[IllegalStateException] {
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.stop()
}
}
Expand Down Expand Up @@ -107,11 +107,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
}

val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))

listenerStarted.acquire()
Expand Down Expand Up @@ -353,13 +353,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)

// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
bus.start(sc)
bus.start()

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
conf.set("spark.storage.cachedPeersTtl", "10")

master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)
allStores.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.driver.port", rpcEnv.address.port.toString)

master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)

val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
Expand All @@ -43,8 +43,9 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener(new SparkConf())
val conf = new SparkConf()
bus = new LiveListenerBus(new SparkContext("local", "test", conf))
storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class ReceivedBlockHandlerSuite
conf.set("spark.driver.port", rpcEnv.address.port.toString)

blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext("local", "test", conf)))), conf, true)

storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
Expand Down