Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
Expand Down Expand Up @@ -215,28 +219,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}

test("local metrics") {
sc = new SparkContext("local", "SparkListenerSuite")
val conf = new SparkConf()
.setMaster("local").setAppName("SparkListenerSuite")
.set("spark.shuffle.manager", classOf[SlowShuffleManager].getName)
sc = new SparkContext(conf)
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
// just to make sure some of the tasks take a noticeable amount of time
val w = { i: Int =>
if (i == 0) {
Thread.sleep(100)
}
i
}

val numSlices = 16
val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w)
val d = sc.parallelize(0 to 1e3.toInt, numSlices)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
w(k) -> (v1.size, v2.size)
k -> (v1.size, v2.size)
}
d4.setName("A Cogroup")
d4.collectAsMap()
Expand All @@ -255,13 +255,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")

/* Test is disabled (SEE SPARK-2208)
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
checkNonZeroAvg(
taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
*/

taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0L)
Expand Down Expand Up @@ -337,7 +335,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.wait(remainingWait)
remainingWait = finishTime - System.currentTimeMillis
}
assert(!listener.startedTasks.isEmpty)
assert(listener.startedTasks.nonEmpty)
}

f.cancel()
Expand Down Expand Up @@ -476,3 +474,15 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}

/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */
private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) {

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) {

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
Thread.sleep(100)
super.getBlockData(blockId)
}
}
}