From 0c3338cd645f5824f08fe37fd7174e25c416529b Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 28 Sep 2017 00:33:18 +0300 Subject: [PATCH 1/7] [SPARK-22150][CORE] preventing too early removal of checkpoints in case of dependant RDDs --- .../spark/util/PeriodicCheckpointer.scala | 7 +++-- .../util/PeriodicRDDCheckpointerSuite.scala | 28 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index ce06e18879a49..338a65e276f5d 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -98,8 +98,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. var canDelete = true - while (checkpointQueue.size > 1 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists. + // Do not remove previous checkpoint and its data until materializing newData. + // Early removal may lead to FileNotFoundExceptions in case of newData + // depends on materialized checkpointQueue.head + while (checkpointQueue.size > 2 && canDelete) { + // Delete the oldest checkpoint only if the next checkpoint exists and materialized. if (isCheckpointed(checkpointQueue.head)) { removeCheckpointFile() } else { diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index f9e1b791c86ea..0b16de93d4156 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.utils +package org.apache.spark.util import org.apache.hadoop.fs.Path @@ -23,7 +23,6 @@ import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext { @@ -79,6 +78,31 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + + test("Checkpointing of dependent RDD should not fail when materializing it") { + val tempDir = Utils.createTempDir() + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + + Utils.deleteRecursively(tempDir) + } } private object PeriodicRDDCheckpointerSuite { From 972ef04c6402104f41c18798f83f17e84ca634b2 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:10:48 +0300 Subject: [PATCH 2/7] [SPARK-22150][CORE] checking whether two checkpoints have the same checkpointed RDD as their parent to prevent early removal --- .../rdd/util/PeriodicRDDCheckpointer.scala | 34 +++- .../spark/util/PeriodicCheckpointer.scala | 55 ++++-- .../util/PeriodicRDDCheckpointerSuite.scala | 169 ++++++++++++++++-- .../util/PeriodicGraphCheckpointer.scala | 8 + 4 files changed, 230 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index facbb830a60d8..aafc9efa02c1f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,6 +17,9 @@ package org.apache.spark.rdd.util +import scala.collection.mutable +import scala.collection.Set + import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -73,8 +76,6 @@ import org.apache.spark.util.PeriodicCheckpointer * * @param checkpointInterval RDDs will be checkpointed at this interval * @tparam T RDD element type - * - * TODO: Move this out of MLlib? */ private[spark] class PeriodicRDDCheckpointer[T]( checkpointInterval: Int, @@ -96,4 +97,33 @@ private[spark] class PeriodicRDDCheckpointer[T]( override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = { data.getCheckpointFile.map(x => x) } + + override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(newData), Set(oldData)) + } + +} + +private[spark] object PeriodicRDDCheckpointer { + + def rddDeps(rdd: RDD[_]): Set[RDD[_]] = { + val parents = new mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + parents.add(rdd) + rdd.dependencies.foreach(dep => visit(dep.rdd)) + } + visit(rdd) + parents + } + + def haveCommonCheckpoint(rdds1: Set[_ <: RDD[_]], rdds2: Set[_ <: RDD[_]]): Boolean = { + val deps1 = rdds1.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + val deps2 = rdds2.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + deps1.intersect(deps2).exists(_.isCheckpointed) + } + } diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index 338a65e276f5d..9c416a950175f 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -97,18 +97,7 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpoint(newData) checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. - var canDelete = true - // Do not remove previous checkpoint and its data until materializing newData. - // Early removal may lead to FileNotFoundExceptions in case of newData - // depends on materialized checkpointQueue.head - while (checkpointQueue.size > 2 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists and materialized. - if (isCheckpointed(checkpointQueue.head)) { - removeCheckpointFile() - } else { - canDelete = false - } - } + deleteAllCheckpointsButLast() } } @@ -130,6 +119,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( /** Get list of checkpoint files for this given Dataset */ protected def getCheckpointFiles(data: T): Iterable[String] + /** + * Checks whether the two datasets depend on the same checkpointed data. + */ + protected def haveCommonCheckpoint(newData: T, oldData: T): Boolean + /** * Call this to unpersist the Dataset. */ @@ -140,22 +134,38 @@ private[spark] abstract class PeriodicCheckpointer[T]( } } + /** + * Gets last checkpoint if it is available. + */ + def getLastCheckpoint: Option[T] = { + checkpointQueue.lastOption + } + /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { - while (checkpointQueue.nonEmpty) { - removeCheckpointFile() - } + deleteAllCheckpoints(_ => true) + } + + /** + * Deletes all the checkpoints which match the given predicate. + */ + def deleteAllCheckpoints(f: T => Boolean): Unit = { + val checkpoints = checkpointQueue.dequeueAll(f) + checkpoints.foreach(removeCheckpointFile) } /** * Call this at the end to delete any remaining checkpoint files, except for the last checkpoint. - * Note that there may not be any checkpoints at all. + * Note that there may not be any checkpoints at all and in case there are more than one + * checkpoint, all the checkpoints, the last one depends on, will not be deleted. */ def deleteAllCheckpointsButLast(): Unit = { - while (checkpointQueue.size > 1) { - removeCheckpointFile() + getLastCheckpoint.foreach { last => + deleteAllCheckpoints { item => + item != last && !haveCommonCheckpoint(last, item) + } } } @@ -174,7 +184,14 @@ private[spark] abstract class PeriodicCheckpointer[T]( private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. - getCheckpointFiles(old).foreach( + removeCheckpointFile(old) + } + + /** + * Removes checkpoint files of the provided Dataset. + */ + private def removeCheckpointFile(item: T): Unit = { + getCheckpointFiles(item).foreach( PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration)) } } diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index 0b16de93d4156..c16f1acf417b4 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import org.apache.hadoop.fs.Path import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.rdd.MapPartitionsRDD import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel @@ -79,29 +80,158 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + test("Getting RDD dependencies should return RDD itself") { + val rdd = sc.emptyRDD[Int] + assert(PeriodicRDDCheckpointer.rddDeps(rdd) == Set(rdd)) + } + + test("Getting RDD dependencies should return all the DAG RDDs") { + val data = 0 until 10 + val initialRdd = sc.parallelize(data) + val targetRdd = data.foldLeft(initialRdd) { (rdd, num) => + rdd.filter(_ == num) + } + + val deps = PeriodicRDDCheckpointer.rddDeps(targetRdd) + assert(deps.size == data.size + 1) + assert(deps.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) == data.size) + } + + test("Common checkpoint should be found when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + + assert(PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("Common checkpoint should not be found when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = createRDD(sc) + rdd2.checkpoint() + rdd2.count() + + assert(!PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + test("Checkpointing of dependent RDD should not fail when materializing it") { val tempDir = Utils.createTempDir() - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - rdd2.count() + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() - checkpointer.deleteAllCheckpoints() - Seq(rdd1, rdd2).foreach { rdd => - confirmCheckpointRemoved(rdd) + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + } finally { + Utils.deleteRecursively(tempDir) } + } - Utils.deleteRecursively(tempDir) + test("deleteAllCheckpointsButLast should retain last checkpoint only when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = createRDD(sc) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1).foreach(confirmCheckpointRemoved) + Seq(rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpointsButLast should retain last checkpoint and dependent checkpoints " + + "when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1, rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpoints should remove all the checkpoints") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach(confirmCheckpointRemoved) + } finally { + Utils.deleteRecursively(tempDir) + } } } @@ -159,6 +289,15 @@ private object PeriodicRDDCheckpointerSuite { } } + def confirmCheckpointExists(rdd: RDD[_]): Unit = { + val hadoopConf = rdd.sparkContext.hadoopConfiguration + rdd.getCheckpointFile.foreach { checkpointFile => + val path = new Path(checkpointFile) + val fs = path.getFileSystem(hadoopConf) + assert(fs.exists(path), "RDD checkpoint file should not have been removed") + } + } + /** * Check checkpointed status of rdd. * @param gIndex Index of rdd in order inserted into checkpointer (from 1). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 539b66f747cc9..a1a15d37c16ef 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.util import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.PeriodicCheckpointer @@ -103,4 +104,11 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = { data.getCheckpointFiles } + + override protected def haveCommonCheckpoint( + newData: Graph[VD, ED], oldData: Graph[VD, ED]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint( + Set(newData.vertices, newData.edges), Set(oldData.vertices, oldData.edges)) + } + } From 4ff1624bf3383c23000745170baa50ec56647975 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:22:19 +0300 Subject: [PATCH 3/7] [SPARK-22150][CORE] respecting scala style settings --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index aafc9efa02c1f..d5c771b55227e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,8 +17,8 @@ package org.apache.spark.rdd.util -import scala.collection.mutable import scala.collection.Set +import scala.collection.mutable import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -125,5 +125,4 @@ private[spark] object PeriodicRDDCheckpointer { } deps1.intersect(deps2).exists(_.isCheckpointed) } - } From f9156cb01c62035ff136a0d06fd464b66a1b27c8 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 18:57:01 +0300 Subject: [PATCH 4/7] [SPARK-22150][CORE] Updatating scaladocs to clarify the new behaviour of checkpoints --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index d5c771b55227e..fc490a24280a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -41,12 +41,13 @@ import org.apache.spark.util.PeriodicCheckpointer * - Unpersist RDDs from queue until there are at most 3 persisted RDDs. * - If using checkpointing and the checkpoint interval has been reached, * - Checkpoint the new RDD, and put in a queue of checkpointed RDDs. - * - Remove older checkpoints. + * - Remove older checkpoints except for created one and all the checkpoints it depends on. * * WARNINGS: * - This class should NOT be copied (since copies may conflict on which RDDs should be * checkpointed). - * - This class removes checkpoint files once later RDDs have been checkpointed. + * - This class removes checkpoint files once later RDDs have been checkpointed and do not + * have dependencies, the files to remove have been created for. * However, references to the older RDDs will still return isCheckpointed = true. * * Example usage: From 1a0b5c1d186eee98a187625c7b112ae656cfee63 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 19:12:34 +0300 Subject: [PATCH 5/7] [SPARK-22150][CORE] Updatating scaladocs to clarify why its checkpoint files may be preserved in case of checkpointing dependent RDDs --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index fc490a24280a6..e29a77cc0361b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -47,7 +47,9 @@ import org.apache.spark.util.PeriodicCheckpointer * - This class should NOT be copied (since copies may conflict on which RDDs should be * checkpointed). * - This class removes checkpoint files once later RDDs have been checkpointed and do not - * have dependencies, the files to remove have been created for. + * have dependencies, the files to remove have been created for (removing checkpoint files + * of prior RDDs, the later ones depend on, may fail with `FileNotFoundException` in case + * the later RDDs are not yet materialized). * However, references to the older RDDs will still return isCheckpointed = true. * * Example usage: From 6d11404e0bdb31cd5e0d3f7d882fc6e54cf475f9 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 19:16:20 +0300 Subject: [PATCH 6/7] [SPARK-22150][CORE] Updatating scaladocs of PeriodicGraphCheckpointer to clarify the new behaviour --- .../spark/graphx/util/PeriodicGraphCheckpointer.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index a1a15d37c16ef..4230f05c8e45d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -39,12 +39,15 @@ import org.apache.spark.util.PeriodicCheckpointer * - Unpersist graphs from queue until there are at most 3 persisted graphs. * - If using checkpointing and the checkpoint interval has been reached, * - Checkpoint the new graph, and put in a queue of checkpointed graphs. - * - Remove older checkpoints. + * - Remove older checkpoints except for created one and all the checkpoints it depends on. * * WARNINGS: * - This class should NOT be copied (since copies may conflict on which Graphs should be * checkpointed). - * - This class removes checkpoint files once later graphs have been checkpointed. + * - This class removes checkpoint files once later graphs have been checkpointed and do not + * have dependencies, the files to remove have been created for (removing checkpoint files + * of prior graphs, the later ones depend on, may fail with `FileNotFoundException` in case + * the later graphs are not yet materialized). * However, references to the older graphs will still return isCheckpointed = true. * * Example usage: From e33b32b04a391f43f35d50aea5cf9f0446252255 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 29 Mar 2018 21:27:31 +0300 Subject: [PATCH 7/7] [SPARK-22150][CORE] tests to check that checkpoint files of parent checkpointed RDD are removed too --- .../rdd/util/PeriodicRDDCheckpointer.scala | 2 +- .../util/PeriodicRDDCheckpointerSuite.scala | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index e29a77cc0361b..bb3bf743d7cab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -98,7 +98,7 @@ private[spark] class PeriodicRDDCheckpointer[T]( override protected def unpersist(data: RDD[T]): Unit = data.unpersist(blocking = false) override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = { - data.getCheckpointFile.map(x => x) + PeriodicRDDCheckpointer.rddDeps(data).flatMap(_.getCheckpointFile) } override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index c16f1acf417b4..9ba8b8bc4a416 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -233,6 +233,43 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } } + + test("deleteAllCheckpoints should remove all the checkpoint files when " + + "there is just parent checkpointed RDD") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[(Int, Int)](checkpointInterval, sc) + val rdd1 = sc.makeRDD((0 until 10).map(i => i -> i)).setName("rdd1") + + // rdd1 is not materialized yet, checkpointer(update=1, checkpointInterval=2) + checkpointer.update(rdd1) + // rdd2 depends on rdd1 + val rdd2 = rdd1.filter(_ => true).setName("rdd2") + + // rdd1 is materialized, checkpointer(update=2, checkpointInterval=2) + checkpointer.update(rdd1) + // rdd3 depends on rdd1 + val rdd3 = rdd1.filter(_ => true).setName("rdd3") + + // rdd3 is not materialized yet, checkpointer(update=3, checkpointInterval=2) + checkpointer.update(rdd3) + // rdd3 is materialized, rdd1 is removed, checkpointer(update=4, checkpointInterval=2) + checkpointer.update(rdd3) + + // should not fail + rdd2.count() + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2, rdd3).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + } finally { + Utils.deleteRecursively(tempDir) + } + } } private object PeriodicRDDCheckpointerSuite {