Skip to content

Commit 842d000

Browse files
jkbradleymengxr
authored andcommitted
[SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles methods to Graph
Added the 2 methods to Graph and GraphImpl. Both make calls to the underlying vertex and edge RDDs. This is needed for another PR (for LDA): [#4047] Notes: * getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String]. * I attempted to test to make sure the methods returned the correct values after checkpointing. It did not work; I guess that checkpointing does not occur quickly enough? I noticed that there are not checkpointing tests for RDDs; is it just hard to test well? CC: rxin CC: mengxr (since related to LDA) Author: Joseph K. Bradley <[email protected]> Closes #4253 from jkbradley/graphx-checkpoint and squashes the following commits: b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile 250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD 695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work. 188665f [Joseph K. Bradley] improved documentation 235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl
1 parent 5a55261 commit 842d000

File tree

6 files changed

+51
-2
lines changed

6 files changed

+51
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
104104
*/
105105
def checkpoint(): Unit
106106

107+
/**
108+
* Return whether this Graph has been checkpointed or not.
109+
* This returns true iff both the vertices RDD and edges RDD have been checkpointed.
110+
*/
111+
def isCheckpointed: Boolean
112+
113+
/**
114+
* Gets the name of the files to which this Graph was checkpointed.
115+
* (The vertices RDD and edges RDD are checkpointed separately.)
116+
*/
117+
def getCheckpointFiles: Seq[String]
118+
107119
/**
108120
* Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that
109121
* build a new graph in each iteration.

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
7373
override def checkpoint() = {
7474
partitionsRDD.checkpoint()
7575
}
76-
76+
77+
override def isCheckpointed: Boolean = {
78+
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
79+
}
80+
81+
override def getCheckpointFile: Option[String] = {
82+
partitionsRDD.getCheckpointFile
83+
}
84+
7785
/** The number of edges in the RDD. */
7886
override def count(): Long = {
7987
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
7070
replicatedVertexView.edges.checkpoint()
7171
}
7272

73+
override def isCheckpointed: Boolean = {
74+
vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
75+
}
76+
77+
override def getCheckpointFiles: Seq[String] = {
78+
Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
79+
case Some(path) => Seq(path)
80+
case None => Seq()
81+
}
82+
}
83+
7384
override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
7485
unpersistVertices(blocking)
7586
replicatedVertexView.edges.unpersist(blocking)

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] (
7474
override def checkpoint() = {
7575
partitionsRDD.checkpoint()
7676
}
77-
77+
78+
override def isCheckpointed: Boolean = {
79+
firstParent[ShippableVertexPartition[VD]].isCheckpointed
80+
}
81+
82+
override def getCheckpointFile: Option[String] = {
83+
partitionsRDD.getCheckpointFile
84+
}
85+
7886
/** The number of vertices in the RDD. */
7987
override def count(): Long = {
8088
partitionsRDD.map(_.size).reduce(_ + _)

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
375375
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
376376
val rdd = sc.parallelize(ring)
377377
val graph = Graph.fromEdges(rdd, 1.0F)
378+
assert(!graph.isCheckpointed)
379+
assert(graph.getCheckpointFiles.size === 0)
378380
graph.checkpoint()
379381
graph.edges.map(_.attr).count()
380382
graph.vertices.map(_._2).count()
@@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
383385
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
384386
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
385387
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
388+
assert(graph.isCheckpointed)
389+
assert(graph.getCheckpointFiles.size === 2)
386390
}
387391
}
388392

project/MimaExcludes.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ object MimaExcludes {
127127
// SPARK-5315 Spark Streaming Java API returns Scala DStream
128128
ProblemFilters.exclude[MissingMethodProblem](
129129
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
130+
) ++ Seq(
131+
// SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
132+
ProblemFilters.exclude[MissingMethodProblem](
133+
"org.apache.spark.graphx.Graph.getCheckpointFiles"),
134+
ProblemFilters.exclude[MissingMethodProblem](
135+
"org.apache.spark.graphx.Graph.isCheckpointed")
130136
)
131137

132138
case v if v.startsWith("1.2") =>

0 commit comments

Comments
 (0)