Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.{MutablePair, Utils}

abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
Expand Down Expand Up @@ -277,19 +277,45 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Delete one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
assert(hashFile.exists() || sortFile.exists())
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))

if (hashFile.exists()) {
hashFile.delete()
}
if (sortFile.exists()) {
sortFile.delete()
}
if (indexFile.exists()) {
indexFile.delete()
}

// This count should retry the execution of the previous stage and rerun shuffle.
rdd.count()
}

test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)

// Cannot find one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())

rdd.count()

// Can find one of the local shuffle blocks.
val hashExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleBlockId(0, 0, 0))
val sortExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
}

test("metrics for shuffle without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val numRecords = 10000
Expand Down