@@ -23,6 +23,7 @@ import org.scalatest.Matchers
2323import org .apache .spark .ShuffleSuite .NonJavaSerializableClass
2424import org .apache .spark .rdd .{CoGroupedRDD , OrderedRDDFunctions , RDD , ShuffledRDD , SubtractedRDD }
2525import org .apache .spark .serializer .KryoSerializer
26+ import org .apache .spark .storage .{ShuffleDataBlockId , ShuffleBlockId }
2627import org .apache .spark .util .MutablePair
2728
2829abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
@@ -263,6 +264,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
263264 }
264265 }
265266 }
267+
268+ test(" [SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file" ) {
269+ val myConf = conf.clone().set(" spark.test.noStageRetry" , " false" )
270+ sc = new SparkContext (" local" , " test" , myConf)
271+ val rdd = sc.parallelize(1 to 10 , 2 ).map((_, 1 )).reduceByKey(_ + _)
272+ rdd.count()
273+
274+ // Delete one of the local shuffle blocks.
275+ val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId (0 , 0 , 0 ))
276+ val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId (0 , 0 , 0 ))
277+ assert(hashFile.exists() || sortFile.exists())
278+
279+ if (hashFile.exists()) {
280+ hashFile.delete()
281+ }
282+ if (sortFile.exists()) {
283+ sortFile.delete()
284+ }
285+
286+ // This count should retry the execution of the previous stage and rerun shuffle.
287+ rdd.count()
288+ }
266289}
267290
268291object ShuffleSuite {
0 commit comments