From 0c3338cd645f5824f08fe37fd7174e25c416529b Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 28 Sep 2017 00:33:18 +0300 Subject: [PATCH 01/12] [SPARK-22150][CORE] preventing too early removal of checkpoints in case of dependant RDDs --- .../spark/util/PeriodicCheckpointer.scala | 7 +++-- .../util/PeriodicRDDCheckpointerSuite.scala | 28 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index ce06e18879a49..338a65e276f5d 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -98,8 +98,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. var canDelete = true - while (checkpointQueue.size > 1 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists. + // Do not remove previous checkpoint and its data until materializing newData. + // Early removal may lead to FileNotFoundExceptions in case of newData + // depends on materialized checkpointQueue.head + while (checkpointQueue.size > 2 && canDelete) { + // Delete the oldest checkpoint only if the next checkpoint exists and materialized. if (isCheckpointed(checkpointQueue.head)) { removeCheckpointFile() } else { diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index f9e1b791c86ea..0b16de93d4156 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.utils +package org.apache.spark.util import org.apache.hadoop.fs.Path @@ -23,7 +23,6 @@ import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext { @@ -79,6 +78,31 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + + test("Checkpointing of dependent RDD should not fail when materializing it") { + val tempDir = Utils.createTempDir() + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + + Utils.deleteRecursively(tempDir) + } } private object PeriodicRDDCheckpointerSuite { From 972ef04c6402104f41c18798f83f17e84ca634b2 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:10:48 +0300 Subject: [PATCH 02/12] [SPARK-22150][CORE] checking whether two checkpoints have the same checkpointed RDD as their parent to prevent early removal --- .../rdd/util/PeriodicRDDCheckpointer.scala | 34 +++- .../spark/util/PeriodicCheckpointer.scala | 55 ++++-- .../util/PeriodicRDDCheckpointerSuite.scala | 169 ++++++++++++++++-- .../util/PeriodicGraphCheckpointer.scala | 8 + 4 files changed, 230 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index facbb830a60d8..aafc9efa02c1f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,6 +17,9 @@ package org.apache.spark.rdd.util +import scala.collection.mutable +import scala.collection.Set + import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -73,8 +76,6 @@ import org.apache.spark.util.PeriodicCheckpointer * * @param checkpointInterval RDDs will be checkpointed at this interval * @tparam T RDD element type - * - * TODO: Move this out of MLlib? */ private[spark] class PeriodicRDDCheckpointer[T]( checkpointInterval: Int, @@ -96,4 +97,33 @@ private[spark] class PeriodicRDDCheckpointer[T]( override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = { data.getCheckpointFile.map(x => x) } + + override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(newData), Set(oldData)) + } + +} + +private[spark] object PeriodicRDDCheckpointer { + + def rddDeps(rdd: RDD[_]): Set[RDD[_]] = { + val parents = new mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + parents.add(rdd) + rdd.dependencies.foreach(dep => visit(dep.rdd)) + } + visit(rdd) + parents + } + + def haveCommonCheckpoint(rdds1: Set[_ <: RDD[_]], rdds2: Set[_ <: RDD[_]]): Boolean = { + val deps1 = rdds1.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + val deps2 = rdds2.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + deps1.intersect(deps2).exists(_.isCheckpointed) + } + } diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index 338a65e276f5d..9c416a950175f 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -97,18 +97,7 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpoint(newData) checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. - var canDelete = true - // Do not remove previous checkpoint and its data until materializing newData. - // Early removal may lead to FileNotFoundExceptions in case of newData - // depends on materialized checkpointQueue.head - while (checkpointQueue.size > 2 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists and materialized. - if (isCheckpointed(checkpointQueue.head)) { - removeCheckpointFile() - } else { - canDelete = false - } - } + deleteAllCheckpointsButLast() } } @@ -130,6 +119,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( /** Get list of checkpoint files for this given Dataset */ protected def getCheckpointFiles(data: T): Iterable[String] + /** + * Checks whether the two datasets depend on the same checkpointed data. + */ + protected def haveCommonCheckpoint(newData: T, oldData: T): Boolean + /** * Call this to unpersist the Dataset. */ @@ -140,22 +134,38 @@ private[spark] abstract class PeriodicCheckpointer[T]( } } + /** + * Gets last checkpoint if it is available. + */ + def getLastCheckpoint: Option[T] = { + checkpointQueue.lastOption + } + /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { - while (checkpointQueue.nonEmpty) { - removeCheckpointFile() - } + deleteAllCheckpoints(_ => true) + } + + /** + * Deletes all the checkpoints which match the given predicate. + */ + def deleteAllCheckpoints(f: T => Boolean): Unit = { + val checkpoints = checkpointQueue.dequeueAll(f) + checkpoints.foreach(removeCheckpointFile) } /** * Call this at the end to delete any remaining checkpoint files, except for the last checkpoint. - * Note that there may not be any checkpoints at all. + * Note that there may not be any checkpoints at all and in case there are more than one + * checkpoint, all the checkpoints, the last one depends on, will not be deleted. */ def deleteAllCheckpointsButLast(): Unit = { - while (checkpointQueue.size > 1) { - removeCheckpointFile() + getLastCheckpoint.foreach { last => + deleteAllCheckpoints { item => + item != last && !haveCommonCheckpoint(last, item) + } } } @@ -174,7 +184,14 @@ private[spark] abstract class PeriodicCheckpointer[T]( private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. - getCheckpointFiles(old).foreach( + removeCheckpointFile(old) + } + + /** + * Removes checkpoint files of the provided Dataset. + */ + private def removeCheckpointFile(item: T): Unit = { + getCheckpointFiles(item).foreach( PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration)) } } diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index 0b16de93d4156..c16f1acf417b4 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import org.apache.hadoop.fs.Path import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.rdd.MapPartitionsRDD import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel @@ -79,29 +80,158 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + test("Getting RDD dependencies should return RDD itself") { + val rdd = sc.emptyRDD[Int] + assert(PeriodicRDDCheckpointer.rddDeps(rdd) == Set(rdd)) + } + + test("Getting RDD dependencies should return all the DAG RDDs") { + val data = 0 until 10 + val initialRdd = sc.parallelize(data) + val targetRdd = data.foldLeft(initialRdd) { (rdd, num) => + rdd.filter(_ == num) + } + + val deps = PeriodicRDDCheckpointer.rddDeps(targetRdd) + assert(deps.size == data.size + 1) + assert(deps.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) == data.size) + } + + test("Common checkpoint should be found when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + + assert(PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("Common checkpoint should not be found when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = createRDD(sc) + rdd2.checkpoint() + rdd2.count() + + assert(!PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + test("Checkpointing of dependent RDD should not fail when materializing it") { val tempDir = Utils.createTempDir() - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - rdd2.count() + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() - checkpointer.deleteAllCheckpoints() - Seq(rdd1, rdd2).foreach { rdd => - confirmCheckpointRemoved(rdd) + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + } finally { + Utils.deleteRecursively(tempDir) } + } - Utils.deleteRecursively(tempDir) + test("deleteAllCheckpointsButLast should retain last checkpoint only when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = createRDD(sc) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1).foreach(confirmCheckpointRemoved) + Seq(rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpointsButLast should retain last checkpoint and dependent checkpoints " + + "when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1, rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpoints should remove all the checkpoints") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach(confirmCheckpointRemoved) + } finally { + Utils.deleteRecursively(tempDir) + } } } @@ -159,6 +289,15 @@ private object PeriodicRDDCheckpointerSuite { } } + def confirmCheckpointExists(rdd: RDD[_]): Unit = { + val hadoopConf = rdd.sparkContext.hadoopConfiguration + rdd.getCheckpointFile.foreach { checkpointFile => + val path = new Path(checkpointFile) + val fs = path.getFileSystem(hadoopConf) + assert(fs.exists(path), "RDD checkpoint file should not have been removed") + } + } + /** * Check checkpointed status of rdd. * @param gIndex Index of rdd in order inserted into checkpointer (from 1). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 539b66f747cc9..a1a15d37c16ef 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.util import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.PeriodicCheckpointer @@ -103,4 +104,11 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = { data.getCheckpointFiles } + + override protected def haveCommonCheckpoint( + newData: Graph[VD, ED], oldData: Graph[VD, ED]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint( + Set(newData.vertices, newData.edges), Set(oldData.vertices, oldData.edges)) + } + } From 4ff1624bf3383c23000745170baa50ec56647975 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:22:19 +0300 Subject: [PATCH 03/12] [SPARK-22150][CORE] respecting scala style settings --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index aafc9efa02c1f..d5c771b55227e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,8 +17,8 @@ package org.apache.spark.rdd.util -import scala.collection.mutable import scala.collection.Set +import scala.collection.mutable import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -125,5 +125,4 @@ private[spark] object PeriodicRDDCheckpointer { } deps1.intersect(deps2).exists(_.isCheckpointed) } - } From f2386b61a47abf19b8ca6cea7e0e5c7da9baf7d6 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 28 Sep 2017 00:33:18 +0300 Subject: [PATCH 04/12] [SPARK-22150][CORE] preventing too early removal of checkpoints in case of dependant RDDs --- .../spark/util/PeriodicCheckpointer.scala | 7 +++-- .../util/PeriodicRDDCheckpointerSuite.scala | 28 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index ce06e18879a49..338a65e276f5d 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -98,8 +98,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. var canDelete = true - while (checkpointQueue.size > 1 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists. + // Do not remove previous checkpoint and its data until materializing newData. + // Early removal may lead to FileNotFoundExceptions in case of newData + // depends on materialized checkpointQueue.head + while (checkpointQueue.size > 2 && canDelete) { + // Delete the oldest checkpoint only if the next checkpoint exists and materialized. if (isCheckpointed(checkpointQueue.head)) { removeCheckpointFile() } else { diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index f9e1b791c86ea..0b16de93d4156 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.utils +package org.apache.spark.util import org.apache.hadoop.fs.Path @@ -23,7 +23,6 @@ import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext { @@ -79,6 +78,31 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + + test("Checkpointing of dependent RDD should not fail when materializing it") { + val tempDir = Utils.createTempDir() + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + + Utils.deleteRecursively(tempDir) + } } private object PeriodicRDDCheckpointerSuite { From aa2bedae74999694b0a9992986e85d3f9feab5f6 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:10:48 +0300 Subject: [PATCH 05/12] [SPARK-22150][CORE] checking whether two checkpoints have the same checkpointed RDD as their parent to prevent early removal --- .../rdd/util/PeriodicRDDCheckpointer.scala | 34 +++- .../spark/util/PeriodicCheckpointer.scala | 55 ++++-- .../util/PeriodicRDDCheckpointerSuite.scala | 169 ++++++++++++++++-- .../util/PeriodicGraphCheckpointer.scala | 8 + 4 files changed, 230 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index facbb830a60d8..aafc9efa02c1f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,6 +17,9 @@ package org.apache.spark.rdd.util +import scala.collection.mutable +import scala.collection.Set + import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -73,8 +76,6 @@ import org.apache.spark.util.PeriodicCheckpointer * * @param checkpointInterval RDDs will be checkpointed at this interval * @tparam T RDD element type - * - * TODO: Move this out of MLlib? */ private[spark] class PeriodicRDDCheckpointer[T]( checkpointInterval: Int, @@ -96,4 +97,33 @@ private[spark] class PeriodicRDDCheckpointer[T]( override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = { data.getCheckpointFile.map(x => x) } + + override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(newData), Set(oldData)) + } + +} + +private[spark] object PeriodicRDDCheckpointer { + + def rddDeps(rdd: RDD[_]): Set[RDD[_]] = { + val parents = new mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + parents.add(rdd) + rdd.dependencies.foreach(dep => visit(dep.rdd)) + } + visit(rdd) + parents + } + + def haveCommonCheckpoint(rdds1: Set[_ <: RDD[_]], rdds2: Set[_ <: RDD[_]]): Boolean = { + val deps1 = rdds1.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + val deps2 = rdds2.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => + set ++= rddDeps(rdd) + } + deps1.intersect(deps2).exists(_.isCheckpointed) + } + } diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index 338a65e276f5d..9c416a950175f 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -97,18 +97,7 @@ private[spark] abstract class PeriodicCheckpointer[T]( checkpoint(newData) checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. - var canDelete = true - // Do not remove previous checkpoint and its data until materializing newData. - // Early removal may lead to FileNotFoundExceptions in case of newData - // depends on materialized checkpointQueue.head - while (checkpointQueue.size > 2 && canDelete) { - // Delete the oldest checkpoint only if the next checkpoint exists and materialized. - if (isCheckpointed(checkpointQueue.head)) { - removeCheckpointFile() - } else { - canDelete = false - } - } + deleteAllCheckpointsButLast() } } @@ -130,6 +119,11 @@ private[spark] abstract class PeriodicCheckpointer[T]( /** Get list of checkpoint files for this given Dataset */ protected def getCheckpointFiles(data: T): Iterable[String] + /** + * Checks whether the two datasets depend on the same checkpointed data. + */ + protected def haveCommonCheckpoint(newData: T, oldData: T): Boolean + /** * Call this to unpersist the Dataset. */ @@ -140,22 +134,38 @@ private[spark] abstract class PeriodicCheckpointer[T]( } } + /** + * Gets last checkpoint if it is available. + */ + def getLastCheckpoint: Option[T] = { + checkpointQueue.lastOption + } + /** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { - while (checkpointQueue.nonEmpty) { - removeCheckpointFile() - } + deleteAllCheckpoints(_ => true) + } + + /** + * Deletes all the checkpoints which match the given predicate. + */ + def deleteAllCheckpoints(f: T => Boolean): Unit = { + val checkpoints = checkpointQueue.dequeueAll(f) + checkpoints.foreach(removeCheckpointFile) } /** * Call this at the end to delete any remaining checkpoint files, except for the last checkpoint. - * Note that there may not be any checkpoints at all. + * Note that there may not be any checkpoints at all and in case there are more than one + * checkpoint, all the checkpoints, the last one depends on, will not be deleted. */ def deleteAllCheckpointsButLast(): Unit = { - while (checkpointQueue.size > 1) { - removeCheckpointFile() + getLastCheckpoint.foreach { last => + deleteAllCheckpoints { item => + item != last && !haveCommonCheckpoint(last, item) + } } } @@ -174,7 +184,14 @@ private[spark] abstract class PeriodicCheckpointer[T]( private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. - getCheckpointFiles(old).foreach( + removeCheckpointFile(old) + } + + /** + * Removes checkpoint files of the provided Dataset. + */ + private def removeCheckpointFile(item: T): Unit = { + getCheckpointFiles(item).foreach( PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration)) } } diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index 0b16de93d4156..c16f1acf417b4 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import org.apache.hadoop.fs.Path import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.rdd.MapPartitionsRDD import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel @@ -79,29 +80,158 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } + test("Getting RDD dependencies should return RDD itself") { + val rdd = sc.emptyRDD[Int] + assert(PeriodicRDDCheckpointer.rddDeps(rdd) == Set(rdd)) + } + + test("Getting RDD dependencies should return all the DAG RDDs") { + val data = 0 until 10 + val initialRdd = sc.parallelize(data) + val targetRdd = data.foldLeft(initialRdd) { (rdd, num) => + rdd.filter(_ == num) + } + + val deps = PeriodicRDDCheckpointer.rddDeps(targetRdd) + assert(deps.size == data.size + 1) + assert(deps.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) == data.size) + } + + test("Common checkpoint should be found when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + + assert(PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("Common checkpoint should not be found when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + sc.setCheckpointDir(tempDir.toURI.toString) + + val rdd1 = createRDD(sc) + rdd1.checkpoint() + rdd1.count() + + val rdd2 = createRDD(sc) + rdd2.checkpoint() + rdd2.count() + + assert(!PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) + } finally { + Utils.deleteRecursively(tempDir) + } + } + test("Checkpointing of dependent RDD should not fail when materializing it") { val tempDir = Utils.createTempDir() - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - rdd2.count() + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + rdd2.count() - checkpointer.deleteAllCheckpoints() - Seq(rdd1, rdd2).foreach { rdd => - confirmCheckpointRemoved(rdd) + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + } finally { + Utils.deleteRecursively(tempDir) } + } - Utils.deleteRecursively(tempDir) + test("deleteAllCheckpointsButLast should retain last checkpoint only when RDDs are unrelated") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = createRDD(sc) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1).foreach(confirmCheckpointRemoved) + Seq(rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpointsButLast should retain last checkpoint and dependent checkpoints " + + "when RDDs are related") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpointsButLast() + Seq(rdd1, rdd2).foreach(confirmCheckpointExists) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("deleteAllCheckpoints should remove all the checkpoints") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) + + val rdd1 = createRDD(sc) + checkpointer.update(rdd1) + checkpointer.update(rdd1) + rdd1.count() + + val rdd2 = rdd1.filter(_ => true) + checkpointer.update(rdd2) + checkpointer.update(rdd2) + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2).foreach(confirmCheckpointRemoved) + } finally { + Utils.deleteRecursively(tempDir) + } } } @@ -159,6 +289,15 @@ private object PeriodicRDDCheckpointerSuite { } } + def confirmCheckpointExists(rdd: RDD[_]): Unit = { + val hadoopConf = rdd.sparkContext.hadoopConfiguration + rdd.getCheckpointFile.foreach { checkpointFile => + val path = new Path(checkpointFile) + val fs = path.getFileSystem(hadoopConf) + assert(fs.exists(path), "RDD checkpoint file should not have been removed") + } + } + /** * Check checkpointed status of rdd. * @param gIndex Index of rdd in order inserted into checkpointer (from 1). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 539b66f747cc9..a1a15d37c16ef 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.util import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.PeriodicCheckpointer @@ -103,4 +104,11 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = { data.getCheckpointFiles } + + override protected def haveCommonCheckpoint( + newData: Graph[VD, ED], oldData: Graph[VD, ED]): Boolean = { + PeriodicRDDCheckpointer.haveCommonCheckpoint( + Set(newData.vertices, newData.edges), Set(oldData.vertices, oldData.edges)) + } + } From 6406aea3bc87c1f3a9460bbc2ae1af67d7c0c294 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 16:22:19 +0300 Subject: [PATCH 06/12] [SPARK-22150][CORE] respecting scala style settings --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index aafc9efa02c1f..d5c771b55227e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -17,8 +17,8 @@ package org.apache.spark.rdd.util -import scala.collection.mutable import scala.collection.Set +import scala.collection.mutable import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -125,5 +125,4 @@ private[spark] object PeriodicRDDCheckpointer { } deps1.intersect(deps2).exists(_.isCheckpointed) } - } From 4a55cda79e61e7eec67ae9545beb0c38eca7b11b Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Mon, 2 Oct 2017 17:43:27 +0300 Subject: [PATCH 07/12] [SPARK-22184][CORE][GRAPHX] retain all the checkpoints the last one depends on --- .../util/PeriodicRDDCheckpointerSuite.scala | 163 ------------------ .../org/apache/spark/graphx/Pregel.scala | 38 +++- .../spark/graphx/LocalSparkContext.scala | 3 +- .../org/apache/spark/graphx/PregelSuite.scala | 53 ++++++ 4 files changed, 90 insertions(+), 167 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index c16f1acf417b4..074799b9a5857 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -79,160 +79,6 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } - - test("Getting RDD dependencies should return RDD itself") { - val rdd = sc.emptyRDD[Int] - assert(PeriodicRDDCheckpointer.rddDeps(rdd) == Set(rdd)) - } - - test("Getting RDD dependencies should return all the DAG RDDs") { - val data = 0 until 10 - val initialRdd = sc.parallelize(data) - val targetRdd = data.foldLeft(initialRdd) { (rdd, num) => - rdd.filter(_ == num) - } - - val deps = PeriodicRDDCheckpointer.rddDeps(targetRdd) - assert(deps.size == data.size + 1) - assert(deps.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) == data.size) - } - - test("Common checkpoint should be found when RDDs are related") { - val tempDir = Utils.createTempDir() - try { - sc.setCheckpointDir(tempDir.toURI.toString) - - val rdd1 = createRDD(sc) - rdd1.checkpoint() - rdd1.count() - - val rdd2 = rdd1.filter(_ => true) - - assert(PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("Common checkpoint should not be found when RDDs are unrelated") { - val tempDir = Utils.createTempDir() - try { - sc.setCheckpointDir(tempDir.toURI.toString) - - val rdd1 = createRDD(sc) - rdd1.checkpoint() - rdd1.count() - - val rdd2 = createRDD(sc) - rdd2.checkpoint() - rdd2.count() - - assert(!PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(rdd1), Set(rdd2))) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("Checkpointing of dependent RDD should not fail when materializing it") { - val tempDir = Utils.createTempDir() - try { - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) - - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() - - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - rdd2.count() - - checkpointer.deleteAllCheckpoints() - Seq(rdd1, rdd2).foreach { rdd => - confirmCheckpointRemoved(rdd) - } - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("deleteAllCheckpointsButLast should retain last checkpoint only when RDDs are unrelated") { - val tempDir = Utils.createTempDir() - try { - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) - - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() - - val rdd2 = createRDD(sc) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - - checkpointer.deleteAllCheckpointsButLast() - Seq(rdd1).foreach(confirmCheckpointRemoved) - Seq(rdd2).foreach(confirmCheckpointExists) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("deleteAllCheckpointsButLast should retain last checkpoint and dependent checkpoints " + - "when RDDs are related") { - val tempDir = Utils.createTempDir() - try { - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) - - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() - - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - - checkpointer.deleteAllCheckpointsButLast() - Seq(rdd1, rdd2).foreach(confirmCheckpointExists) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("deleteAllCheckpoints should remove all the checkpoints") { - val tempDir = Utils.createTempDir() - try { - val checkpointInterval = 2 - sc.setCheckpointDir(tempDir.toURI.toString) - - val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, sc) - - val rdd1 = createRDD(sc) - checkpointer.update(rdd1) - checkpointer.update(rdd1) - rdd1.count() - - val rdd2 = rdd1.filter(_ => true) - checkpointer.update(rdd2) - checkpointer.update(rdd2) - - checkpointer.deleteAllCheckpoints() - Seq(rdd1, rdd2).foreach(confirmCheckpointRemoved) - } finally { - Utils.deleteRecursively(tempDir) - } - } } private object PeriodicRDDCheckpointerSuite { @@ -289,15 +135,6 @@ private object PeriodicRDDCheckpointerSuite { } } - def confirmCheckpointExists(rdd: RDD[_]): Unit = { - val hadoopConf = rdd.sparkContext.hadoopConfiguration - rdd.getCheckpointFile.foreach { checkpointFile => - val path = new Path(checkpointFile) - val fs = path.getFileSystem(hadoopConf) - assert(fs.exists(path), "RDD checkpoint file should not have been removed") - } - } - /** * Check checkpointed status of rdd. * @param gIndex Index of rdd in order inserted into checkpointer (from 1). diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 755c6febc48e6..42632a7c13573 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -170,8 +170,42 @@ object Pregel extends Logging { i += 1 } messageCheckpointer.unpersistDataSet() - graphCheckpointer.deleteAllCheckpoints() - messageCheckpointer.deleteAllCheckpoints() + + // in case of low system resources when cached RDDs are going to be evicted from memory + // there may be a chance that it will be necessary to read checkpointed files from disk, + // in that case all the checkpoints, the resulting graph depends on, should not be deleted + import PeriodicRDDCheckpointer._ + + val graphDeps = rddDeps(g.vertices) ++ rddDeps(g.edges) + val lastGraphCheckpoint = graphCheckpointer.getLastCheckpoint + val lastGraphCheckpointDeps = lastGraphCheckpoint match { + case Some(value) => rddDeps(value.vertices) ++ rddDeps(value.edges) + case _ => Set.empty + } + + val messagesDeps = rddDeps(messages) + val lastMessagesCheckpoint = messageCheckpointer.getLastCheckpoint + val lastMessagesCheckpointDeps = lastMessagesCheckpoint match { + case Some(value) => rddDeps(value) + case _ => Set.empty + } + + graphCheckpointer.deleteAllCheckpoints { item => + val itemDeps = rddDeps(item.vertices) ++ rddDeps(item.edges) + !lastGraphCheckpoint.exists(_ eq item) && + !haveCommonCheckpoint(itemDeps, lastGraphCheckpointDeps) && + !haveCommonCheckpoint(messagesDeps, itemDeps) && + !haveCommonCheckpoint(messagesDeps, lastGraphCheckpointDeps) + } + + messageCheckpointer.deleteAllCheckpoints { item => + val itemDeps = rddDeps(item) + !lastMessagesCheckpoint.exists(_ eq item) && + !haveCommonCheckpoint(itemDeps, lastMessagesCheckpointDeps) && + !haveCommonCheckpoint(graphDeps, itemDeps) && + !haveCommonCheckpoint(graphDeps, lastMessagesCheckpointDeps) + } + g } // end of apply diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 66c4747fec268..fdce727cd2a13 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -26,8 +26,7 @@ import org.apache.spark.SparkContext */ trait LocalSparkContext { /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ - def withSpark[T](f: SparkContext => T): T = { - val conf = new SparkConf() + def withSpark[T](f: SparkContext => T)(implicit conf: SparkConf = new SparkConf()): T = { GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext("local", "test", conf) try { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala index 90a9ac613ef9d..85f78c9abd4ba 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.graphx +import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils class PregelSuite extends SparkFunSuite with LocalSparkContext { @@ -52,4 +54,55 @@ class PregelSuite extends SparkFunSuite with LocalSparkContext { chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet) } } + + test("should preserve intermediate checkpoint files when there are even amount of iterations") { + withEvictedGraph(iterations = 4) { _ => } + } + + test("should preserve intermediate checkpoint files when there are odd amount of iterations") { + withEvictedGraph(iterations = 5) { _ => } + } + + test("preserve last checkpoint files when there are even amount of iterations") { + withEvictedGraph(iterations = 4) { graph => + graph.vertices.count() + graph.edges.count() + } + } + + test("preserve last checkpoint files when there are odd amount of iterations") { + withEvictedGraph(iterations = 5) { graph => + graph.vertices.count() + graph.edges.count() + } + } + + private def withEvictedGraph(iterations: Int)(f: Graph[Long, Int] => Unit): Unit = { + implicit val conf: SparkConf = new SparkConf() + .set("spark.graphx.pregel.checkpointInterval", "2") + // set testing memory to evict cached RDDs from it and force + // reading checkpointed RDDs from disk + .set("spark.testing.reservedMemory", "128") + .set("spark.testing.memory", "256") + withSpark { sc => + val dir = Utils.createTempDir().getCanonicalFile + try { + sc.setCheckpointDir(dir.toURI.toString) + val edges = (1 to iterations).map(x => (x: VertexId, x + 1: VertexId)) + val graph = Pregel(Graph.fromEdgeTuples(sc.parallelize(edges, 3), 0L), 1L)( + (vid, attr, msg) => if (vid == msg) msg else attr, + et => + if (et.dstId != et.dstAttr && et.srcId < et.dstId) { + Iterator((et.dstId, et.srcAttr + 1)) + } else { + Iterator.empty + }, + (a: Long, b: Long) => math.max(a, b)) + f(graph) + } finally { + Utils.deleteRecursively(dir) + } + } + } + } From f9156cb01c62035ff136a0d06fd464b66a1b27c8 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 18:57:01 +0300 Subject: [PATCH 08/12] [SPARK-22150][CORE] Updatating scaladocs to clarify the new behaviour of checkpoints --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index d5c771b55227e..fc490a24280a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -41,12 +41,13 @@ import org.apache.spark.util.PeriodicCheckpointer * - Unpersist RDDs from queue until there are at most 3 persisted RDDs. * - If using checkpointing and the checkpoint interval has been reached, * - Checkpoint the new RDD, and put in a queue of checkpointed RDDs. - * - Remove older checkpoints. + * - Remove older checkpoints except for created one and all the checkpoints it depends on. * * WARNINGS: * - This class should NOT be copied (since copies may conflict on which RDDs should be * checkpointed). - * - This class removes checkpoint files once later RDDs have been checkpointed. + * - This class removes checkpoint files once later RDDs have been checkpointed and do not + * have dependencies, the files to remove have been created for. * However, references to the older RDDs will still return isCheckpointed = true. * * Example usage: From 1a0b5c1d186eee98a187625c7b112ae656cfee63 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 19:12:34 +0300 Subject: [PATCH 09/12] [SPARK-22150][CORE] Updatating scaladocs to clarify why its checkpoint files may be preserved in case of checkpointing dependent RDDs --- .../org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index fc490a24280a6..e29a77cc0361b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -47,7 +47,9 @@ import org.apache.spark.util.PeriodicCheckpointer * - This class should NOT be copied (since copies may conflict on which RDDs should be * checkpointed). * - This class removes checkpoint files once later RDDs have been checkpointed and do not - * have dependencies, the files to remove have been created for. + * have dependencies, the files to remove have been created for (removing checkpoint files + * of prior RDDs, the later ones depend on, may fail with `FileNotFoundException` in case + * the later RDDs are not yet materialized). * However, references to the older RDDs will still return isCheckpointed = true. * * Example usage: From 6d11404e0bdb31cd5e0d3f7d882fc6e54cf475f9 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Tue, 27 Mar 2018 19:16:20 +0300 Subject: [PATCH 10/12] [SPARK-22150][CORE] Updatating scaladocs of PeriodicGraphCheckpointer to clarify the new behaviour --- .../spark/graphx/util/PeriodicGraphCheckpointer.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index a1a15d37c16ef..4230f05c8e45d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -39,12 +39,15 @@ import org.apache.spark.util.PeriodicCheckpointer * - Unpersist graphs from queue until there are at most 3 persisted graphs. * - If using checkpointing and the checkpoint interval has been reached, * - Checkpoint the new graph, and put in a queue of checkpointed graphs. - * - Remove older checkpoints. + * - Remove older checkpoints except for created one and all the checkpoints it depends on. * * WARNINGS: * - This class should NOT be copied (since copies may conflict on which Graphs should be * checkpointed). - * - This class removes checkpoint files once later graphs have been checkpointed. + * - This class removes checkpoint files once later graphs have been checkpointed and do not + * have dependencies, the files to remove have been created for (removing checkpoint files + * of prior graphs, the later ones depend on, may fail with `FileNotFoundException` in case + * the later graphs are not yet materialized). * However, references to the older graphs will still return isCheckpointed = true. * * Example usage: From e33b32b04a391f43f35d50aea5cf9f0446252255 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 29 Mar 2018 21:27:31 +0300 Subject: [PATCH 11/12] [SPARK-22150][CORE] tests to check that checkpoint files of parent checkpointed RDD are removed too --- .../rdd/util/PeriodicRDDCheckpointer.scala | 2 +- .../util/PeriodicRDDCheckpointerSuite.scala | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index e29a77cc0361b..bb3bf743d7cab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -98,7 +98,7 @@ private[spark] class PeriodicRDDCheckpointer[T]( override protected def unpersist(data: RDD[T]): Unit = data.unpersist(blocking = false) override protected def getCheckpointFiles(data: RDD[T]): Iterable[String] = { - data.getCheckpointFile.map(x => x) + PeriodicRDDCheckpointer.rddDeps(data).flatMap(_.getCheckpointFile) } override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index c16f1acf417b4..9ba8b8bc4a416 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -233,6 +233,43 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext Utils.deleteRecursively(tempDir) } } + + test("deleteAllCheckpoints should remove all the checkpoint files when " + + "there is just parent checkpointed RDD") { + val tempDir = Utils.createTempDir() + try { + val checkpointInterval = 2 + sc.setCheckpointDir(tempDir.toURI.toString) + + val checkpointer = new PeriodicRDDCheckpointer[(Int, Int)](checkpointInterval, sc) + val rdd1 = sc.makeRDD((0 until 10).map(i => i -> i)).setName("rdd1") + + // rdd1 is not materialized yet, checkpointer(update=1, checkpointInterval=2) + checkpointer.update(rdd1) + // rdd2 depends on rdd1 + val rdd2 = rdd1.filter(_ => true).setName("rdd2") + + // rdd1 is materialized, checkpointer(update=2, checkpointInterval=2) + checkpointer.update(rdd1) + // rdd3 depends on rdd1 + val rdd3 = rdd1.filter(_ => true).setName("rdd3") + + // rdd3 is not materialized yet, checkpointer(update=3, checkpointInterval=2) + checkpointer.update(rdd3) + // rdd3 is materialized, rdd1 is removed, checkpointer(update=4, checkpointInterval=2) + checkpointer.update(rdd3) + + // should not fail + rdd2.count() + + checkpointer.deleteAllCheckpoints() + Seq(rdd1, rdd2, rdd3).foreach { rdd => + confirmCheckpointRemoved(rdd) + } + } finally { + Utils.deleteRecursively(tempDir) + } + } } private object PeriodicRDDCheckpointerSuite { From 8fe09de003829ff4718cd9459960b28f604cfb47 Mon Sep 17 00:00:00 2001 From: Sergey Zhemzhitsky Date: Thu, 29 Mar 2018 21:57:25 +0300 Subject: [PATCH 12/12] [SPARK-22184][CORE][GRAPHX] removing checkpoint files of parent checkpointed RDDs too --- .../rdd/util/PeriodicRDDCheckpointer.scala | 28 ------------------- .../util/PeriodicGraphCheckpointer.scala | 9 +++++- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index 755bf45a9ff52..bb3bf743d7cab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -107,34 +107,6 @@ private[spark] class PeriodicRDDCheckpointer[T]( } -private[spark] object PeriodicRDDCheckpointer { - - def rddDeps(rdd: RDD[_]): Set[RDD[_]] = { - val parents = new mutable.HashSet[RDD[_]] - def visit(rdd: RDD[_]) { - parents.add(rdd) - rdd.dependencies.foreach(dep => visit(dep.rdd)) - } - visit(rdd) - parents - } - - def haveCommonCheckpoint(rdds1: Set[_ <: RDD[_]], rdds2: Set[_ <: RDD[_]]): Boolean = { - val deps1 = rdds1.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => - set ++= rddDeps(rdd) - } - val deps2 = rdds2.foldLeft(new mutable.HashSet[RDD[_]]()) { (set, rdd) => - set ++= rddDeps(rdd) - } - deps1.intersect(deps2).exists(_.isCheckpointed) - } - - override protected def haveCommonCheckpoint(newData: RDD[T], oldData: RDD[T]): Boolean = { - PeriodicRDDCheckpointer.haveCommonCheckpoint(Set(newData), Set(oldData)) - } - -} - private[spark] object PeriodicRDDCheckpointer { def rddDeps(rdd: RDD[_]): Set[RDD[_]] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 4230f05c8e45d..a57d2226a3dc3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -105,7 +105,14 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def unpersist(data: Graph[VD, ED]): Unit = data.unpersist(blocking = false) override protected def getCheckpointFiles(data: Graph[VD, ED]): Iterable[String] = { - data.getCheckpointFiles + val verticesFiles = PeriodicRDDCheckpointer + .rddDeps(data.vertices) + .flatMap(_.getCheckpointFile) + val edgesFiles = PeriodicRDDCheckpointer + .rddDeps(data.edges) + .flatMap(_.getCheckpointFile) + + verticesFiles ++ edgesFiles } override protected def haveCommonCheckpoint(