Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private[spark] object TestUtils {
try {
body(listener)
} finally {
sc.listenerBus.waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
sc.listenerBus.waitUntilEmpty()
sc.listenerBus.removeListener(listener)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
metricsSystem.registerSource(metrics)
}

/**
* For testing only. Wait until there are no more events in the queue, or until the default
* wait time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
* Exposed for testing.
*/
@throws(classOf[TimeoutException])
def waitUntilEmpty(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it private[spark]?

waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
}

/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[spark] object AccumulatorSuite {
sc.addSparkListener(listener)
testBody
// wait until all events have been processed before proceeding to assert things
sc.listenerBus.waitUntilEmpty(10 * 1000)
sc.listenerBus.waitUntilEmpty()
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
val isSet = accums.exists { a =>
a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {

private def post(event: SparkListenerEvent): Unit = {
listenerBus.post(event)
listenerBus.waitUntilEmpty(1000)
listenerBus.waitUntilEmpty()
}

test("initialize dynamic allocation in SparkContext") {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ object ShuffleSuite {

job

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
AggregatedShuffleMetrics(recordsWritten, recordsRead, bytesWritten, bytesRead)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(sc.getRDDStorageInfo.length === 0)
rdd.collect()
sc.listenerBus.waitUntilEmpty(10000)
sc.listenerBus.waitUntilEmpty()
eventually(timeout(10.seconds), interval(100.milliseconds)) {
assert(sc.getRDDStorageInfo.length === 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
x
}.collect()
sc.listenerBus.waitUntilEmpty(10000)
sc.listenerBus.waitUntilEmpty()
// As executors will send the metrics of running tasks via heartbeat, we can use this to check
// whether there is any running task.
eventually(timeout(10.seconds)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import org.apache.spark.util.SparkConfWithEnv

class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
private val WAIT_TIMEOUT_MILLIS = 10000

test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,1024]", "test")

Expand All @@ -41,7 +38,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
Expand All @@ -61,7 +58,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
val listener = listeners(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
.reduceByKey(_ + _)
.saveAsTextFile(tmpFile.toURI.toString)

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
assert(inputRead == numRecords)

assert(outputWritten == numBuckets)
Expand Down Expand Up @@ -243,7 +243,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val taskMetrics = new ArrayBuffer[Long]()

// Avoid receiving earlier taskEnd events
sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()

sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
Expand All @@ -253,7 +253,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

job

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
taskMetrics.sum
}

Expand Down Expand Up @@ -293,7 +293,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

try {
rdd.saveAsTextFile(outPath.toString)
sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
assert(taskBytesWritten.length == 2)
val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))

sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
sc.listenerBus.waitUntilEmpty()
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, is this reducing the original timeout because it was 1 minutes before (private val executorUpTimeout = 1.minute)? It seems that this file's two instances are the only place to reduce the timeout unlike the other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. My bad that's 60000 ms. I'll just roll them back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rolled back.

assert(executorAddedCount === 3)
}

Expand Down Expand Up @@ -251,7 +251,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
assert(execResources(GPU).assignedAddrs.isEmpty)
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
sc.listenerBus.waitUntilEmpty()
assert(executorAddedCount === 3)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def applicationAttemptId(): Option[String] = None
}

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

/**
* Listeners which records some information to verify in UTs. Getter-kind methods in this class
* ensures the value is returned after ensuring there's no event to process, as well as the
Expand Down Expand Up @@ -230,7 +227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
_endedTasks.toSet
}

private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty()
}

var sparkListener: EventInfoRecordingListener = null
Expand Down Expand Up @@ -839,7 +836,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val testRdd = new MyRDD(sc, 0, Nil)
val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty,
resultHandler, properties)
sc.listenerBus.waitUntilEmpty(1000L)
sc.listenerBus.waitUntilEmpty()
assert(assertionError.get() === null)
}

Expand Down Expand Up @@ -957,7 +954,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeNextResultStageWithSuccess(1, 1)

// Confirm job finished successfully
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -994,7 +991,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
} else {
// Stage should have been aborted and removed from running stages
assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
jobResult match {
case JobFailed(reason) =>
Expand Down Expand Up @@ -1116,7 +1113,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeNextResultStageWithSuccess(2, 1)

assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
assert(results === Map(0 -> 42))
}
Expand Down Expand Up @@ -1175,7 +1172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Complete the result stage.
completeNextResultStageWithSuccess(1, 1)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -1204,7 +1201,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Complete the result stage.
completeNextResultStageWithSuccess(1, 0)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assertDataStructuresEmpty()
}

Expand All @@ -1230,7 +1227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
null))

// Assert the stage has been cancelled.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " +
"from a failed barrier ResultStage."))
}
Expand Down Expand Up @@ -2668,7 +2665,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
sc.parallelize(1 to tasks, tasks).foreach { _ =>
accum.add(1L)
}
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(foundCount.get() === tasks)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

import LiveListenerBus._

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext])
Expand All @@ -65,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

sc.listenerBus.addToSharedQueue(listener)
sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
sc.stop()

assert(listener.sparkExSeen)
Expand Down Expand Up @@ -97,7 +94,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(counter.count === 5)
assert(sharedQueueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)
Expand Down Expand Up @@ -223,7 +220,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd2.setName("Target RDD")
rdd2.count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()

listener.stageInfos.size should be {1}
val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
Expand All @@ -248,7 +245,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")

rdd1.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
Expand All @@ -257,7 +254,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd2.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3}
Expand All @@ -266,7 +263,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd3.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
Expand All @@ -282,7 +279,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1))

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()

listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
Expand Down Expand Up @@ -310,7 +307,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val numSlices = 16
val d = sc.parallelize(0 to 10000, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
Expand All @@ -321,7 +318,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
Expand Down Expand Up @@ -372,7 +369,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(maxRpcMessageSize).toArray)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
Expand All @@ -388,7 +385,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
Expand Down Expand Up @@ -443,7 +440,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()

// The exception should be caught, and the event should be propagated to other listeners
assert(jobCounter1.count === 5)
Expand Down Expand Up @@ -513,7 +510,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
Expand All @@ -522,7 +519,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(counter1.count === 6)
assert(counter2.count === 6)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSparkSessi
sparkContext.addSparkListener(jobListener)
try {
val result = f
sparkContext.listenerBus.waitUntilEmpty(10000L)
sparkContext.listenerBus.waitUntilEmpty()
assert(numJobTrigered === 0)
result
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,7 @@ class DataFrameSuite extends QueryTest with SharedSparkSession {

val df = spark.read.json(path.getCanonicalPath)
assert(df.columns === Array("i", "p"))
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(numJobs == 1)
}
}
Expand Down
Loading