From 38349819b96cda6fca0134e30ba1ec178148fff5 Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 13:51:58 -0400 Subject: [PATCH 01/18] test --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d0eca1ddea283..6c2507b9491b6 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, -and Spark Streaming for stream processing. +and Spark Streaming for stream processing . From 166fd6db95fd19b12e55ae73fb81495b4b9a11ed Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 14:13:22 -0400 Subject: [PATCH 02/18] period do checkpoint in pregel --- .../spark/util}/PeriodicCheckpointer.scala | 5 +++-- .../spark/util}/PeriodicRDDCheckpointer.scala | 2 +- .../org/apache/spark/graphx/GraphOps.scala | 4 +++- .../org/apache/spark/graphx/Pregel.scala | 20 +++++++++++++++++-- .../util}/PeriodicGraphCheckpointer.scala | 6 ++++-- .../org/apache/spark/ml/clustering/LDA.scala | 2 +- .../ml/tree/impl/GradientBoostedTrees.scala | 2 +- .../spark/mllib/clustering/LDAOptimizer.scala | 2 +- .../impl/PeriodicGraphCheckpointerSuite.scala | 1 + .../impl/PeriodicRDDCheckpointerSuite.scala | 2 +- 10 files changed, 34 insertions(+), 12 deletions(-) rename {mllib/src/main/scala/org/apache/spark/mllib/impl => core/src/main/scala/org/apache/spark/util}/PeriodicCheckpointer.scala (98%) rename {mllib/src/main/scala/org/apache/spark/mllib/impl => core/src/main/scala/org/apache/spark/util}/PeriodicRDDCheckpointer.scala (98%) rename {mllib/src/main/scala/org/apache/spark/mllib/impl => graphx/src/main/scala/org/apache/spark/graphx/util}/PeriodicGraphCheckpointer.scala (95%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala similarity index 98% rename from mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala rename to core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index 4dd498cd91b4e..aab84348d9b90 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -15,7 +15,8 @@ * limitations under the License. */ -package org.apache.spark.mllib.impl +// package org.apache.spark.mllib.impl +package org.apache.spark.util import scala.collection.mutable @@ -58,7 +59,7 @@ import org.apache.spark.storage.StorageLevel * @param sc SparkContext for the Datasets given to this checkpointer * @tparam T Dataset type, such as RDD[Double] */ -private[mllib] abstract class PeriodicCheckpointer[T]( +private[spark] abstract class PeriodicCheckpointer[T]( val checkpointInterval: Int, val sc: SparkContext) extends Logging { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala similarity index 98% rename from mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala rename to core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala index 145dc22b7428e..d6fff2f51439b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.mllib.impl +package org.apache.spark.util import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 475bccf9bfc76..e78597c823557 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -362,12 +362,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, + checkpointInterval: Int = 25, activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { - Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) + Pregel(graph, initialMsg, maxIterations, activeDirection, + checkpointInterval)(vprog, sendMsg, mergeMsg) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 646462b4a8350..f22728ee19094 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -19,7 +19,10 @@ package org.apache.spark.graphx import scala.reflect.ClassTag +import org.apache.spark.graphx.util.PeriodicGraphCheckpointer import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.util.PeriodicRDDCheckpointer /** * Implements a Pregel-like bulk-synchronous message-passing API. @@ -113,7 +116,8 @@ object Pregel extends Logging { (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Either) + activeDirection: EdgeDirection = EdgeDirection.Either, + checkpointInterval: Int = 25) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) @@ -123,9 +127,17 @@ object Pregel extends Logging { s" but got ${maxIterations}") var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() + val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( + checkpointInterval, graph.vertices.sparkContext) + graphCheckpointer.update(g) + // compute the messages - var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) + var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg).cache() + val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( + checkpointInterval, graph.vertices.sparkContext) + messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() + // Loop var prevG: Graph[VD, ED] = null var i = 0 @@ -133,6 +145,7 @@ object Pregel extends Logging { // Receive the messages and update the vertices. prevG = g g = g.joinVertices(messages)(vprog).cache() + graphCheckpointer.update(g) val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache @@ -143,6 +156,7 @@ object Pregel extends Logging { // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages // and the vertices of g). + messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) activeMessages = messages.count() logInfo("Pregel finished iteration " + i) @@ -155,6 +169,8 @@ object Pregel extends Logging { i += 1 } messages.unpersist(blocking = false) + graphCheckpointer.deleteAllCheckpoints() + messageCheckpointer.deleteAllCheckpoints() g } // end of apply diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala similarity index 95% rename from mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala rename to graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 80074897567eb..9408d09f30ab0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.spark.mllib.impl +// package org.apache.spark.mllib.impl +package org.apache.spark.graphx.util import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.PeriodicCheckpointer /** @@ -76,7 +78,7 @@ import org.apache.spark.storage.StorageLevel * * TODO: Move this out of MLlib? */ -private[mllib] class PeriodicGraphCheckpointer[VD, ED]( +private[spark] class PeriodicGraphCheckpointer[VD, ED]( checkpointInterval: Int, sc: SparkContext) extends PeriodicCheckpointer[Graph[VD, ED]](checkpointInterval, sc) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 03f4ac5b28e90..825b83144fff8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -34,7 +34,6 @@ import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedL EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} -import org.apache.spark.mllib.impl.PeriodicCheckpointer import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ import org.apache.spark.mllib.linalg.VectorImplicits._ @@ -44,6 +43,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType import org.apache.spark.util.VersionUtils +import org.apache.spark.util.PeriodicCheckpointer private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala index f3bace8181570..d9c3d7ae34c75 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala @@ -21,13 +21,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor} -import org.apache.spark.mllib.impl.PeriodicRDDCheckpointer import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.configuration.{BoostingStrategy => OldBoostingStrategy} import org.apache.spark.mllib.tree.impurity.{Variance => OldVariance} import org.apache.spark.mllib.tree.loss.{Loss => OldLoss} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.PeriodicRDDCheckpointer private[spark] object GradientBoostedTrees extends Logging { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 48bae4276c480..3697a9b46dd84 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -25,7 +25,7 @@ import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.graphx._ -import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer +import org.apache.spark.graphx.util.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel diff --git a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala index a13e7f63a9296..9114e27c9cca4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.graphx.util.PeriodicGraphCheckpointer import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala index 14adf8c29fc6b..352b71cf92905 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.{PeriodicRDDCheckpointer, Utils} class PeriodicRDDCheckpointerSuite extends SparkFunSuite with MLlibTestSparkContext { From b119e4a024b626088f45ea720a749dbeecc657f5 Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 14:37:50 -0400 Subject: [PATCH 03/18] remove unused code --- README.md | 2 +- .../scala/org/apache/spark/util/PeriodicCheckpointer.scala | 1 - .../spark/graphx/util/PeriodicGraphCheckpointer.scala | 6 +----- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 6c2507b9491b6..d0eca1ddea283 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, -and Spark Streaming for stream processing . +and Spark Streaming for stream processing. diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index aab84348d9b90..f86a6d9f489e8 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -15,7 +15,6 @@ * limitations under the License. */ -// package org.apache.spark.mllib.impl package org.apache.spark.util import scala.collection.mutable diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index 9408d09f30ab0..b1316ddee81c9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -15,7 +15,6 @@ * limitations under the License. */ -// package org.apache.spark.mllib.impl package org.apache.spark.graphx.util import org.apache.spark.SparkContext @@ -89,10 +88,7 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def persist(data: Graph[VD, ED]): Unit = { if (data.vertices.getStorageLevel == StorageLevel.NONE) { - data.vertices.persist() - } - if (data.edges.getStorageLevel == StorageLevel.NONE) { - data.edges.persist() + data.persist() } } From d183a7c6bd89b151da3ff82cbd2fdc45c56f7e6e Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 17:15:47 -0400 Subject: [PATCH 04/18] fix mima --- project/MimaExcludes.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e0ee00e6826ab..33d22b07e404b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -932,7 +932,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ) ++ Seq( + // SPARK-5484 Periodically do checkpoint in Pregel + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply") ) } From 352dcb28c5f053aaa9702b9e7ab10a702d82e6e5 Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 17:39:41 -0400 Subject: [PATCH 05/18] fix mima --- project/MimaExcludes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 33d22b07e404b..395c3dd140942 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -936,6 +936,7 @@ object MimaExcludes { ) ++ Seq( // SPARK-5484 Periodically do checkpoint in Pregel ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel") ) } From ad82e45a892158cbc538f89e32fab4997ffb7f72 Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 16 Sep 2016 17:53:29 -0400 Subject: [PATCH 06/18] fix scala style fail --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 395c3dd140942..36af4a263dc6e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -935,7 +935,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") ) ++ Seq( // SPARK-5484 Periodically do checkpoint in Pregel - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel") ) } From e786838af3912953d61787210213c269b4a5cdba Mon Sep 17 00:00:00 2001 From: ding Date: Tue, 17 Jan 2017 14:19:28 -0500 Subject: [PATCH 07/18] rebase upstream --- .../scala/org/apache/spark/ml/clustering/LDA.scala | 3 +-- project/MimaExcludes.scala | 13 +++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 825b83144fff8..ddde6dddeeaad 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -42,9 +42,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType -import org.apache.spark.util.VersionUtils import org.apache.spark.util.PeriodicCheckpointer - +import org.apache.spark.util.VersionUtils private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasMaxIter with HasSeed with HasCheckpointInterval { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 36af4a263dc6e..08f08e5c03f91 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,7 +46,12 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"), // [SPARK-19148][SQL] do not expose the external table concept in Catalog - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable"), + + // SPARK-5484 Periodically do checkpoint in Pregel + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.graphx.GraphOps.pregel$default$3") ) // Exclude rules for 2.1.x @@ -932,11 +937,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") - ) ++ Seq( - // SPARK-5484 Periodically do checkpoint in Pregel - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") ) } From 38e6238c2fc17063de60a3b8b529731997eff398 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Mon, 13 Feb 2017 18:39:13 -0800 Subject: [PATCH 08/18] Update a couple of test suite package names, and enhance the PeriodicGraphCheckpointerSuite to test specific storage levels --- .../spark/util/PeriodicRDDCheckpointerSuite.scala | 3 +-- .../graphx/util/PeriodicGraphCheckpointerSuite.scala | 11 ++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index a3612fa9b93d8..db0c1324e8a5f 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.mllib.impl +package org.apache.spark.util import org.apache.hadoop.fs.Path import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{PeriodicRDDCheckpointer, Utils} class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointerSuite.scala index b92a2a287dc5d..e0c65e6940f66 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointerSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointerSuite.scala @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.spark.mllib.impl +package org.apache.spark.graphx.util import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.graphx.{Edge, Graph, LocalSparkContext} -import org.apache.spark.graphx.util.PeriodicGraphCheckpointer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -90,6 +89,7 @@ class PeriodicGraphCheckpointerSuite extends SparkFunSuite with LocalSparkContex } private object PeriodicGraphCheckpointerSuite { + private val defaultStorageLevel = StorageLevel.MEMORY_ONLY_SER case class GraphToCheck(graph: Graph[Double, Double], gIndex: Int) @@ -100,7 +100,8 @@ private object PeriodicGraphCheckpointerSuite { Edge[Double](3, 4, 0)) def createGraph(sc: SparkContext): Graph[Double, Double] = { - Graph.fromEdges[Double, Double](sc.parallelize(edges), 0) + Graph.fromEdges[Double, Double]( + sc.parallelize(edges), 0, defaultStorageLevel, defaultStorageLevel) } def checkPersistence(graphs: Seq[GraphToCheck], iteration: Int): Unit = { @@ -120,8 +121,8 @@ private object PeriodicGraphCheckpointerSuite { assert(graph.vertices.getStorageLevel == StorageLevel.NONE) assert(graph.edges.getStorageLevel == StorageLevel.NONE) } else { - assert(graph.vertices.getStorageLevel != StorageLevel.NONE) - assert(graph.edges.getStorageLevel != StorageLevel.NONE) + assert(graph.vertices.getStorageLevel == defaultStorageLevel) + assert(graph.edges.getStorageLevel == defaultStorageLevel) } } catch { case _: AssertionError => From 194dc27317202a5232ba60764b64821e764fb601 Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 17 Feb 2017 06:47:11 -0500 Subject: [PATCH 09/18] update checkpoint interval and some comments --- .../src/main/scala/org/apache/spark/graphx/GraphOps.scala | 4 +++- .../src/main/scala/org/apache/spark/graphx/Pregel.scala | 8 ++++---- project/MimaExcludes.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index e78597c823557..7072da2f1db47 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -336,6 +336,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @param maxIterations the maximum number of iterations to run for * + * @param checkpointInterval the checkpoint interval + * * @param activeDirection the direction of edges incident to a vertex that received a message in * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only * out-edges of vertices that received a message in the previous round will run. @@ -362,7 +364,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, - checkpointInterval: Int = 25, + checkpointInterval: Int = 10, activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 03f508c1de951..f07643cb4f3e3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -117,7 +117,7 @@ object Pregel extends Logging { initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either, - checkpointInterval: Int = 25) + checkpointInterval: Int = 10) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) @@ -149,8 +149,8 @@ object Pregel extends Logging { val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache - // messages so it can be materialized on the next line, allowing us to uncache the previous - // iteration. + // and periodic checkpoint messages so it can be materialized on the next line, and avoid + // to have a long lineage chain. messages = GraphXUtils.mapReduceTriplets( g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages @@ -168,7 +168,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - messages.unpersist(blocking = false) + messageCheckpointer.unpersist(messages) graphCheckpointer.deleteAllCheckpoints() messageCheckpointer.deleteAllCheckpoints() g diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index f136ee2c18440..63d22dc06265e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -948,7 +948,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") ) } From 9d7e796a8f3939746c765c4ca2ae71cf486207cb Mon Sep 17 00:00:00 2001 From: ding Date: Fri, 17 Feb 2017 08:06:05 -0500 Subject: [PATCH 10/18] add config value to controll checkpoint interval in pergel --- .../org/apache/spark/util/PeriodicCheckpointer.scala | 10 ++++++++++ .../main/scala/org/apache/spark/graphx/GraphOps.scala | 6 +----- .../main/scala/org/apache/spark/graphx/Pregel.scala | 7 ++++--- project/MimaExcludes.scala | 7 +------ 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala index f86a6d9f489e8..ce06e18879a49 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala @@ -127,6 +127,16 @@ private[spark] abstract class PeriodicCheckpointer[T]( /** Get list of checkpoint files for this given Dataset */ protected def getCheckpointFiles(data: T): Iterable[String] + /** + * Call this to unpersist the Dataset. + */ + def unpersistDataSet(): Unit = { + while (persistedQueue.nonEmpty) { + val dataToUnpersist = persistedQueue.dequeue() + unpersist(dataToUnpersist) + } + } + /** * Call this at the end to delete any remaining checkpoint files. */ diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 7072da2f1db47..475bccf9bfc76 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -336,8 +336,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @param maxIterations the maximum number of iterations to run for * - * @param checkpointInterval the checkpoint interval - * * @param activeDirection the direction of edges incident to a vertex that received a message in * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only * out-edges of vertices that received a message in the previous round will run. @@ -364,14 +362,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, - checkpointInterval: Int = 10, activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { - Pregel(graph, initialMsg, maxIterations, activeDirection, - checkpointInterval)(vprog, sendMsg, mergeMsg) + Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index f07643cb4f3e3..57f0922558b24 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -116,8 +116,7 @@ object Pregel extends Logging { (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Either, - checkpointInterval: Int = 10) + activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) @@ -126,6 +125,8 @@ object Pregel extends Logging { require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," + s" but got ${maxIterations}") + val checkpointInterval = graph.vertices.sparkContext.getConf + .getInt("spark.graphx.pregel.checkpointInterval", 10) var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( checkpointInterval, graph.vertices.sparkContext) @@ -168,7 +169,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - messageCheckpointer.unpersist(messages) + messageCheckpointer.unpersistDataSet() graphCheckpointer.deleteAllCheckpoints() messageCheckpointer.deleteAllCheckpoints() g diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 63d22dc06265e..9d359427f27a6 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -57,12 +57,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.$default$11"), // [SPARK-17161] Removing Python-friendly constructors not needed - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), - - // SPARK-5484 Periodically do checkpoint in Pregel - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.Pregel.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.graphx.GraphOps.pregel"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.graphx.GraphOps.pregel$default$3") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this") ) // Exclude rules for 2.1.x From dae94aa1c216b390ad2fcc0b435b98e9fc2436b4 Mon Sep 17 00:00:00 2001 From: ding Date: Sun, 19 Feb 2017 14:10:30 -0500 Subject: [PATCH 11/18] document spark.graphx.pregel.checkpointInterval into graphx-programming-guide.md --- docs/graphx-programming-guide.md | 50 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index e271b28fb4f28..6887097aa0268 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -708,7 +708,9 @@ messages remaining. > messaging function. These constraints allow additional optimization within GraphX. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* -of its implementation (note calls to graph.cache have been removed): +of its implementation (note: to avoid stackOverflowError due to long lineage chains, graph and +messages are periodically checkpoint and the checkpoint interval is set by +"spark.graphx.pregel.checkpointInterval"): {% highlight scala %} class GraphOps[VD, ED] { @@ -720,25 +722,53 @@ class GraphOps[VD, ED] { sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { - // Receive the initial message at each vertex - var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() + val checkpointInterval = graph.vertices.sparkContext.getConf + .getInt("spark.graphx.pregel.checkpointInterval", 10) + var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) + val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( + checkpointInterval, graph.vertices.sparkContext) + graphCheckpointer.update(g) + // compute the messages - var messages = g.mapReduceTriplets(sendMsg, mergeMsg) + var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) + val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( + checkpointInterval, graph.vertices.sparkContext) + messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() - // Loop until no messages remain or maxIterations is achieved + + // Loop + var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. - g = g.joinVertices(messages)(vprog).cache() + prevG = g + g = g.joinVertices(messages)(vprog) + graphCheckpointer.update(g) + val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache - // messages so it can be materialized on the next line, allowing us to uncache the previous - // iteration. - messages = g.mapReduceTriplets( - sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() + // and periodic checkpoint messages so it can be materialized on the next line, and avoid + // to have a long lineage chain. + messages = GraphXUtils.mapReduceTriplets( + g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) + // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages + // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages + // and the vertices of g). + messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) activeMessages = messages.count() + + logInfo("Pregel finished iteration " + i) + + // Unpersist the RDDs hidden by newly-materialized RDDs + oldMessages.unpersist(blocking = false) + prevG.unpersistVertices(blocking = false) + prevG.edges.unpersist(blocking = false) + // count the iteration i += 1 } + messageCheckpointer.unpersistDataSet() + graphCheckpointer.deleteAllCheckpoints() + messageCheckpointer.deleteAllCheckpoints() g } } From dd6c366f504833f064b126a7fe85ea9cdc42fde1 Mon Sep 17 00:00:00 2001 From: ding Date: Sun, 19 Feb 2017 18:16:16 -0500 Subject: [PATCH 12/18] remove details of checkpointer in pregel implementation sketch --- docs/graphx-programming-guide.md | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 6887097aa0268..fa2f6f03c6965 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -722,8 +722,6 @@ class GraphOps[VD, ED] { sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { - val checkpointInterval = graph.vertices.sparkContext.getConf - .getInt("spark.graphx.pregel.checkpointInterval", 10) var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( checkpointInterval, graph.vertices.sparkContext) @@ -736,12 +734,11 @@ class GraphOps[VD, ED] { messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() - // Loop + // Loop until no messages remain or maxIterations is achieved var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. - prevG = g g = g.joinVertices(messages)(vprog) graphCheckpointer.update(g) @@ -751,24 +748,10 @@ class GraphOps[VD, ED] { // to have a long lineage chain. messages = GraphXUtils.mapReduceTriplets( g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) - // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages - // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages - // and the vertices of g). messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) activeMessages = messages.count() - - logInfo("Pregel finished iteration " + i) - - // Unpersist the RDDs hidden by newly-materialized RDDs - oldMessages.unpersist(blocking = false) - prevG.unpersistVertices(blocking = false) - prevG.edges.unpersist(blocking = false) - // count the iteration i += 1 } - messageCheckpointer.unpersistDataSet() - graphCheckpointer.deleteAllCheckpoints() - messageCheckpointer.deleteAllCheckpoints() g } } From 2639eb10f516a1c11f94cf2918cf2635f3b459bc Mon Sep 17 00:00:00 2001 From: ding Date: Sun, 19 Feb 2017 18:22:45 -0500 Subject: [PATCH 13/18] remove unused variable in pregel sketch implementation --- docs/graphx-programming-guide.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index fa2f6f03c6965..65ca0a58c4dd6 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -722,6 +722,7 @@ class GraphOps[VD, ED] { sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { + // Receive the initial message at each vertex var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( checkpointInterval, graph.vertices.sparkContext) @@ -733,15 +734,12 @@ class GraphOps[VD, ED] { checkpointInterval, graph.vertices.sparkContext) messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() - // Loop until no messages remain or maxIterations is achieved - var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. g = g.joinVertices(messages)(vprog) graphCheckpointer.update(g) - val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache // and periodic checkpoint messages so it can be materialized on the next line, and avoid From 11bc349e55eaa5f687d376d1a05f3509459dbecd Mon Sep 17 00:00:00 2001 From: ding Date: Thu, 23 Feb 2017 14:35:16 -0500 Subject: [PATCH 14/18] small changes for documentation --- docs/configuration.md | 14 ++++++++++++++ docs/graphx-programming-guide.md | 18 +++++------------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 2fcb3a096aea5..5df25b88171b4 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2115,6 +2115,20 @@ showDF(properties, numRows = 200, truncate = FALSE) +### GraphX + + + + + + + + +
Property NameDefaultMeaning
spark.graphx.pregel.checkpointInterval10 + Checkpoint interval for graph and message in Pregel. It used to avoid stackOverflowError due to long lineage chains + after lots of iterations. The checkpoint can be disabled by set as -1. +
+ ### Deploy diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 65ca0a58c4dd6..185f558faf632 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -710,7 +710,7 @@ messages remaining. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* of its implementation (note: to avoid stackOverflowError due to long lineage chains, graph and messages are periodically checkpoint and the checkpoint interval is set by -"spark.graphx.pregel.checkpointInterval"): +"spark.graphx.pregel.checkpointInterval", it can be disable by set as -1): {% highlight scala %} class GraphOps[VD, ED] { @@ -723,30 +723,22 @@ class GraphOps[VD, ED] { mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Receive the initial message at each vertex - var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) - val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( - checkpointInterval, graph.vertices.sparkContext) - graphCheckpointer.update(g) + var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() // compute the messages - var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) - val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( - checkpointInterval, graph.vertices.sparkContext) - messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) + var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() // Loop until no messages remain or maxIterations is achieved var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. - g = g.joinVertices(messages)(vprog) - graphCheckpointer.update(g) + g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache // and periodic checkpoint messages so it can be materialized on the next line, and avoid // to have a long lineage chain. messages = GraphXUtils.mapReduceTriplets( - g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) - messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) + g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() activeMessages = messages.count() i += 1 } From 9a6fd1ffb4c73e9b12c70ba5b1ea952a89c374b6 Mon Sep 17 00:00:00 2001 From: ding Date: Wed, 19 Apr 2017 18:11:33 -0400 Subject: [PATCH 15/18] mv PeriodicRDDCheckpointer to rdd.util and explain why use cache in PeriodicGraphCheckpointer --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../spark/{ => rdd}/util/PeriodicRDDCheckpointer.scala | 3 ++- core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala | 2 +- .../org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala | 5 +++-- graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala | 2 +- .../apache/spark/graphx/util/PeriodicGraphCheckpointer.scala | 3 +++ .../org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala | 2 +- 7 files changed, 13 insertions(+), 8 deletions(-) rename core/src/main/scala/org/apache/spark/{ => rdd}/util/PeriodicRDDCheckpointer.scala (97%) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0359508c00395..bc053a9bac89f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1419,7 +1419,7 @@ abstract class RDD[T: ClassTag]( val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { diff --git a/core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala rename to core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala index d6fff2f51439b..ab72addb2466b 100644 --- a/core/src/main/scala/org/apache/spark/util/PeriodicRDDCheckpointer.scala +++ b/core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.rdd.util import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.PeriodicCheckpointer /** diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index f9a7f151823a2..7f20206202cb9 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -135,7 +135,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers w } test("get a range of elements in an array not partitioned by a range partitioner") { - val pairArr = util.Random.shuffle((1 to 1000).toList).map(x => (x, x)) + val pairArr = scala.util.Random.shuffle((1 to 1000).toList).map(x => (x, x)) val pairs = sc.parallelize(pairArr, 10) val range = pairs.filterByRange(200, 800).collect() assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted) diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index db0c1324e8a5f..dc04afd161012 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.utils import org.apache.hadoop.fs.Path - import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 57f0922558b24..ca0a63a2e4b39 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.graphx.util.PeriodicGraphCheckpointer import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.util.PeriodicRDDCheckpointer +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer /** * Implements a Pregel-like bulk-synchronous message-passing API. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala index f23d574aab07b..fda501aa757d6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala @@ -87,6 +87,9 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED]( override protected def persist(data: Graph[VD, ED]): Unit = { if (data.vertices.getStorageLevel == StorageLevel.NONE) { + /* We need to use cache because persist does not honor the default storage level requested + * when constructing the graph. Only cache does that. + */ data.vertices.cache() } if (data.edges.getStorageLevel == StorageLevel.NONE) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala index d9c3d7ae34c75..1ce7c87dbd15d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala @@ -26,8 +26,8 @@ import org.apache.spark.mllib.tree.configuration.{BoostingStrategy => OldBoostin import org.apache.spark.mllib.tree.impurity.{Variance => OldVariance} import org.apache.spark.mllib.tree.loss.{Loss => OldLoss} import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.util.PeriodicRDDCheckpointer import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.PeriodicRDDCheckpointer private[spark] object GradientBoostedTrees extends Logging { From 5015b44a388c79adbb8a4039c2745388b8a8f51d Mon Sep 17 00:00:00 2001 From: ding Date: Thu, 20 Apr 2017 07:33:38 -0400 Subject: [PATCH 16/18] fix scalastyle error --- .../org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala index dc04afd161012..f9e1b791c86ea 100644 --- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.utils import org.apache.hadoop.fs.Path + import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.util.PeriodicRDDCheckpointer From 24d4ad6fd5b05e1d024a42ee656058e77237ffb9 Mon Sep 17 00:00:00 2001 From: ding Date: Sat, 22 Apr 2017 07:17:23 -0400 Subject: [PATCH 17/18] turn off checkpoint graph and messages in pregel by default --- docs/configuration.md | 4 ++-- docs/graphx-programming-guide.md | 11 +++++------ .../main/scala/org/apache/spark/graphx/Pregel.scala | 6 +++--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 5df25b88171b4..1823fb9b1f53a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2121,10 +2121,10 @@ showDF(properties, numRows = 200, truncate = FALSE) - +
Property NameDefaultMeaning
spark.graphx.pregel.checkpointInterval10-1 Checkpoint interval for graph and message in Pregel. It used to avoid stackOverflowError due to long lineage chains - after lots of iterations. The checkpoint can be disabled by set as -1. + after lots of iterations. The checkpoint is disabled by default.
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 185f558faf632..1b7fe4274358d 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -708,9 +708,8 @@ messages remaining. > messaging function. These constraints allow additional optimization within GraphX. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* -of its implementation (note: to avoid stackOverflowError due to long lineage chains, graph and -messages are periodically checkpoint and the checkpoint interval is set by -"spark.graphx.pregel.checkpointInterval", it can be disable by set as -1): +of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodcally +checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval"): {% highlight scala %} class GraphOps[VD, ED] { @@ -726,7 +725,7 @@ class GraphOps[VD, ED] { var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() // compute the messages - var messages = g.mapReduceTriplets(sendMsg, mergeMsg) + var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() // Loop until no messages remain or maxIterations is achieved var i = 0 @@ -735,8 +734,8 @@ class GraphOps[VD, ED] { g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache - // and periodic checkpoint messages so it can be materialized on the next line, and avoid - // to have a long lineage chain. + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. messages = GraphXUtils.mapReduceTriplets( g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() activeMessages = messages.count() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index ca0a63a2e4b39..755c6febc48e6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -126,7 +126,7 @@ object Pregel extends Logging { s" but got ${maxIterations}") val checkpointInterval = graph.vertices.sparkContext.getConf - .getInt("spark.graphx.pregel.checkpointInterval", 10) + .getInt("spark.graphx.pregel.checkpointInterval", -1) var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( checkpointInterval, graph.vertices.sparkContext) @@ -150,8 +150,8 @@ object Pregel extends Logging { val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache - // and periodic checkpoint messages so it can be materialized on the next line, and avoid - // to have a long lineage chain. + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. messages = GraphXUtils.mapReduceTriplets( g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages From ec6265986cb91585c0a6fdbc0c9675ec9fbba613 Mon Sep 17 00:00:00 2001 From: ding Date: Sun, 23 Apr 2017 13:18:04 -0400 Subject: [PATCH 18/18] recommend a good value in checkpoint interval in pregel --- docs/graphx-programming-guide.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 1b7fe4274358d..76aa7b405e18c 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -709,7 +709,8 @@ messages remaining. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodcally -checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval"): +checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval" to a positive number, +say 10. And set checkpoint directory as well using SparkContext.setCheckpointDir(directory: String)): {% highlight scala %} class GraphOps[VD, ED] {