@@ -26,6 +26,10 @@ import org.scalatest.Matchers
2626
2727import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext , SparkException , SparkFunSuite }
2828import org .apache .spark .executor .TaskMetrics
29+ import org .apache .spark .network .buffer .ManagedBuffer
30+ import org .apache .spark .shuffle .IndexShuffleBlockResolver
31+ import org .apache .spark .shuffle .sort .SortShuffleManager
32+ import org .apache .spark .storage .ShuffleBlockId
2933import org .apache .spark .util .{ResetSystemProperties , RpcUtils }
3034
3135class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
@@ -215,28 +219,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
215219 }
216220
217221 test(" local metrics" ) {
218- sc = new SparkContext (" local" , " SparkListenerSuite" )
222+ val conf = new SparkConf ()
223+ .setMaster(" local" ).setAppName(" SparkListenerSuite" )
224+ .set(" spark.shuffle.manager" , classOf [SlowShuffleManager ].getName)
225+ sc = new SparkContext (conf)
219226 val listener = new SaveStageAndTaskInfo
220227 sc.addSparkListener(listener)
221228 sc.addSparkListener(new StatsReportListener )
222- // just to make sure some of the tasks take a noticeable amount of time
223- val w = { i : Int =>
224- if (i == 0 ) {
225- Thread .sleep(100 )
226- }
227- i
228- }
229229
230230 val numSlices = 16
231- val d = sc.parallelize(0 to 1e3 .toInt, numSlices).map(w)
231+ val d = sc.parallelize(0 to 1e3 .toInt, numSlices)
232232 d.count()
233233 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
234234 listener.stageInfos.size should be (1 )
235235
236- val d2 = d.map { i => w(i) -> i * 2 }.setName(" shuffle input 1" )
237- val d3 = d.map { i => w(i) -> (0 to (i % 5 )) }.setName(" shuffle input 2" )
236+ val d2 = d.map { i => i -> i * 2 }.setName(" shuffle input 1" )
237+ val d3 = d.map { i => i -> (0 to (i % 5 )) }.setName(" shuffle input 2" )
238238 val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
239- w(k) -> (v1.size, v2.size)
239+ k -> (v1.size, v2.size)
240240 }
241241 d4.setName(" A Cogroup" )
242242 d4.collectAsMap()
@@ -255,13 +255,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
255255 taskInfoMetrics.map(_._2.executorDeserializeTime),
256256 stageInfo + " executorDeserializeTime" )
257257
258- /* Test is disabled (SEE SPARK-2208)
259258 if (stageInfo.rddInfos.exists(_.name == d4.name)) {
260259 checkNonZeroAvg(
261260 taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
262261 stageInfo + " fetchWaitTime" )
263262 }
264- */
265263
266264 taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
267265 taskMetrics.resultSize should be > (0L )
@@ -337,7 +335,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
337335 listener.wait(remainingWait)
338336 remainingWait = finishTime - System .currentTimeMillis
339337 }
340- assert(! listener.startedTasks.isEmpty )
338+ assert(listener.startedTasks.nonEmpty )
341339 }
342340
343341 f.cancel()
@@ -476,3 +474,15 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
476474 var count = 0
477475 override def onJobEnd (job : SparkListenerJobEnd ): Unit = count += 1
478476}
477+
478+ /** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */
479+ private class SlowShuffleManager (conf : SparkConf ) extends SortShuffleManager (conf) {
480+
481+ override val shuffleBlockResolver = new IndexShuffleBlockResolver (conf) {
482+
483+ override def getBlockData (blockId : ShuffleBlockId ): ManagedBuffer = {
484+ Thread .sleep(10 )
485+ super .getBlockData(blockId)
486+ }
487+ }
488+ }
0 commit comments