@@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T
2828import org .apache .spark .internal .config .UI ._
2929import org .apache .spark .io .CompressionCodec
3030import org .apache .spark .rdd ._
31+ import org .apache .spark .shuffle .FetchFailedException
3132import org .apache .spark .storage .{BlockId , StorageLevel , TestBlockId }
3233import org .apache .spark .util .Utils
3334
@@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
642643 assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
643644 }
644645 }
646+
647+ test(" checkpoint should not fail in retry" ) {
648+ withTempDir { checkpointDir =>
649+ val conf = new SparkConf ()
650+ .set(UI_ENABLED .key, " false" )
651+ sc = new SparkContext (" local[1]" , " test" , conf)
652+ sc.setCheckpointDir(checkpointDir.toString)
653+ val rdd = sc.makeRDD(1 to 200 , numSlices = 4 ).repartition(1 ).mapPartitions { iter =>
654+ iter.map { i =>
655+ if (i > 100 && TaskContext .get().stageAttemptNumber() == 0 ) {
656+ // throw new SparkException("Make first attemp failed.")
657+ // Throw FetchFailedException to explicitly trigger stage resubmission.
658+ // A normal exception will only trigger task resubmission in the same stage.
659+ throw new FetchFailedException (null , 0 , 0L , 0 , 0 , " Fake" )
660+ } else {
661+ i
662+ }
663+ }
664+ }
665+ rdd.checkpoint()
666+ assert(rdd.collect().toSeq === (1 to 200 ))
667+ // Verify that RDD is checkpointed
668+ assert(rdd.firstParent.isInstanceOf [ReliableCheckpointRDD [_]])
669+ }
670+ }
645671}
0 commit comments