@@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel
2727import org .apache .spark .graphx .impl .RoutingTablePartition
2828import org .apache .spark .graphx .impl .ShippableVertexPartition
2929import org .apache .spark .graphx .impl .VertexAttributeBlock
30+ import org .apache .spark .graphx .impl .VertexRDDImpl
3031import org .apache .spark .graphx .impl .RoutingTableMessageRDDFunctions ._
3132import org .apache .spark .graphx .impl .VertexRDDFunctions ._
3233
@@ -55,85 +56,34 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
5556 *
5657 * @tparam VD the vertex attribute associated with each vertex in the set.
5758 */
58- class VertexRDD [@ specialized VD : ClassTag ](
59- val partitionsRDD : RDD [ShippableVertexPartition [VD ]],
60- val targetStorageLevel : StorageLevel = StorageLevel .MEMORY_ONLY )
61- extends RDD [(VertexId , VD )](partitionsRDD.context, List (new OneToOneDependency (partitionsRDD))) {
59+ trait VertexRDD [@ specialized VD ] extends RDD [(VertexId , VD )] {
6260
63- require(partitionsRDD.partitioner.isDefined)
61+ implicit protected def vdTag : ClassTag [VD ]
62+
63+ private [graphx] def partitionsRDD : RDD [ShippableVertexPartition [VD ]]
6464
6565 /**
6666 * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
6767 * VertexRDD will be based on a different index and can no longer be quickly joined with this
6868 * RDD.
6969 */
70- def reindex (): VertexRDD [VD ] = this .withPartitionsRDD(partitionsRDD.map(_.reindex()))
71-
72- override val partitioner = partitionsRDD.partitioner
73-
74- override protected def getPartitions : Array [Partition ] = partitionsRDD.partitions
75-
76- override protected def getPreferredLocations (s : Partition ): Seq [String ] =
77- partitionsRDD.preferredLocations(s)
78-
79- override def setName (_name : String ): this .type = {
80- if (partitionsRDD.name != null ) {
81- partitionsRDD.setName(partitionsRDD.name + " , " + _name)
82- } else {
83- partitionsRDD.setName(_name)
84- }
85- this
86- }
87- setName(" VertexRDD" )
88-
89- /**
90- * Persists the vertex partitions at the specified storage level, ignoring any existing target
91- * storage level.
92- */
93- override def persist (newLevel : StorageLevel ): this .type = {
94- partitionsRDD.persist(newLevel)
95- this
96- }
97-
98- override def unpersist (blocking : Boolean = true ): this .type = {
99- partitionsRDD.unpersist(blocking)
100- this
101- }
102-
103- /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
104- override def cache (): this .type = {
105- partitionsRDD.persist(targetStorageLevel)
106- this
107- }
108-
109- /** The number of vertices in the RDD. */
110- override def count (): Long = {
111- partitionsRDD.map(_.size.toLong).reduce(_ + _)
112- }
113-
114- /**
115- * Provides the `RDD[(VertexId, VD)]` equivalent output.
116- */
117- override def compute (part : Partition , context : TaskContext ): Iterator [(VertexId , VD )] = {
118- firstParent[ShippableVertexPartition [VD ]].iterator(part, context).next.iterator
119- }
70+ def reindex (): VertexRDD [VD ]
12071
12172 /**
12273 * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
12374 */
12475 private [graphx] def mapVertexPartitions [VD2 : ClassTag ](
12576 f : ShippableVertexPartition [VD ] => ShippableVertexPartition [VD2 ])
126- : VertexRDD [VD2 ] = {
127- val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true )
128- this .withPartitionsRDD(newPartitionsRDD)
129- }
130-
77+ : VertexRDD [VD2 ]
13178
13279 /**
13380 * Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
13481 * preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
13582 * rather than allocating new memory.
13683 *
84+ * It is declared and defined in the VertexRDD trait to allow refining the return type from
85+ * `RDD[(VertexId, VD)]` to `VertexRDD[VD]`.
86+ *
13787 * @param pred the user defined predicate, which takes a tuple to conform to the
13888 * `RDD[(VertexId, VD)]` interface
13989 */
@@ -149,8 +99,7 @@ class VertexRDD[@specialized VD: ClassTag](
14999 * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
150100 * original VertexRDD
151101 */
152- def mapValues [VD2 : ClassTag ](f : VD => VD2 ): VertexRDD [VD2 ] =
153- this .mapVertexPartitions(_.map((vid, attr) => f(attr)))
102+ def mapValues [VD2 : ClassTag ](f : VD => VD2 ): VertexRDD [VD2 ]
154103
155104 /**
156105 * Maps each vertex attribute, additionally supplying the vertex ID.
@@ -161,23 +110,13 @@ class VertexRDD[@specialized VD: ClassTag](
161110 * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
162111 * original VertexRDD. The resulting VertexRDD retains the same index.
163112 */
164- def mapValues [VD2 : ClassTag ](f : (VertexId , VD ) => VD2 ): VertexRDD [VD2 ] =
165- this .mapVertexPartitions(_.map(f))
113+ def mapValues [VD2 : ClassTag ](f : (VertexId , VD ) => VD2 ): VertexRDD [VD2 ]
166114
167115 /**
168116 * Hides vertices that are the same between `this` and `other`; for vertices that are different,
169117 * keeps the values from `other`.
170118 */
171- def diff (other : VertexRDD [VD ]): VertexRDD [VD ] = {
172- val newPartitionsRDD = partitionsRDD.zipPartitions(
173- other.partitionsRDD, preservesPartitioning = true
174- ) { (thisIter, otherIter) =>
175- val thisPart = thisIter.next()
176- val otherPart = otherIter.next()
177- Iterator (thisPart.diff(otherPart))
178- }
179- this .withPartitionsRDD(newPartitionsRDD)
180- }
119+ def diff (other : VertexRDD [VD ]): VertexRDD [VD ]
181120
182121 /**
183122 * Left joins this RDD with another VertexRDD with the same index. This function will fail if
@@ -194,16 +133,7 @@ class VertexRDD[@specialized VD: ClassTag](
194133 * @return a VertexRDD containing the results of `f`
195134 */
196135 def leftZipJoin [VD2 : ClassTag , VD3 : ClassTag ]
197- (other : VertexRDD [VD2 ])(f : (VertexId , VD , Option [VD2 ]) => VD3 ): VertexRDD [VD3 ] = {
198- val newPartitionsRDD = partitionsRDD.zipPartitions(
199- other.partitionsRDD, preservesPartitioning = true
200- ) { (thisIter, otherIter) =>
201- val thisPart = thisIter.next()
202- val otherPart = otherIter.next()
203- Iterator (thisPart.leftJoin(otherPart)(f))
204- }
205- this .withPartitionsRDD(newPartitionsRDD)
206- }
136+ (other : VertexRDD [VD2 ])(f : (VertexId , VD , Option [VD2 ]) => VD3 ): VertexRDD [VD3 ]
207137
208138 /**
209139 * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
@@ -224,37 +154,14 @@ class VertexRDD[@specialized VD: ClassTag](
224154 def leftJoin [VD2 : ClassTag , VD3 : ClassTag ]
225155 (other : RDD [(VertexId , VD2 )])
226156 (f : (VertexId , VD , Option [VD2 ]) => VD3 )
227- : VertexRDD [VD3 ] = {
228- // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
229- // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
230- other match {
231- case other : VertexRDD [_] =>
232- leftZipJoin(other)(f)
233- case _ =>
234- this .withPartitionsRDD[VD3 ](
235- partitionsRDD.zipPartitions(
236- other.copartitionWithVertices(this .partitioner.get), preservesPartitioning = true ) {
237- (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
238- }
239- )
240- }
241- }
157+ : VertexRDD [VD3 ]
242158
243159 /**
244160 * Efficiently inner joins this VertexRDD with another VertexRDD sharing the same index. See
245161 * [[innerJoin ]] for the behavior of the join.
246162 */
247163 def innerZipJoin [U : ClassTag , VD2 : ClassTag ](other : VertexRDD [U ])
248- (f : (VertexId , VD , U ) => VD2 ): VertexRDD [VD2 ] = {
249- val newPartitionsRDD = partitionsRDD.zipPartitions(
250- other.partitionsRDD, preservesPartitioning = true
251- ) { (thisIter, otherIter) =>
252- val thisPart = thisIter.next()
253- val otherPart = otherIter.next()
254- Iterator (thisPart.innerJoin(otherPart)(f))
255- }
256- this .withPartitionsRDD(newPartitionsRDD)
257- }
164+ (f : (VertexId , VD , U ) => VD2 ): VertexRDD [VD2 ]
258165
259166 /**
260167 * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
@@ -268,21 +175,7 @@ class VertexRDD[@specialized VD: ClassTag](
268175 * `this` and `other`, with values supplied by `f`
269176 */
270177 def innerJoin [U : ClassTag , VD2 : ClassTag ](other : RDD [(VertexId , U )])
271- (f : (VertexId , VD , U ) => VD2 ): VertexRDD [VD2 ] = {
272- // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
273- // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
274- other match {
275- case other : VertexRDD [_] =>
276- innerZipJoin(other)(f)
277- case _ =>
278- this .withPartitionsRDD(
279- partitionsRDD.zipPartitions(
280- other.copartitionWithVertices(this .partitioner.get), preservesPartitioning = true ) {
281- (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
282- }
283- )
284- }
285- }
178+ (f : (VertexId , VD , U ) => VD2 ): VertexRDD [VD2 ]
286179
287180 /**
288181 * Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a
@@ -296,38 +189,20 @@ class VertexRDD[@specialized VD: ClassTag](
296189 * messages.
297190 */
298191 def aggregateUsingIndex [VD2 : ClassTag ](
299- messages : RDD [(VertexId , VD2 )], reduceFunc : (VD2 , VD2 ) => VD2 ): VertexRDD [VD2 ] = {
300- val shuffled = messages.copartitionWithVertices(this .partitioner.get)
301- val parts = partitionsRDD.zipPartitions(shuffled, true ) { (thisIter, msgIter) =>
302- thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
303- }
304- this .withPartitionsRDD[VD2 ](parts)
305- }
192+ messages : RDD [(VertexId , VD2 )], reduceFunc : (VD2 , VD2 ) => VD2 ): VertexRDD [VD2 ]
306193
307194 /**
308195 * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
309196 * [[EdgeRDD ]].
310197 */
311- def reverseRoutingTables (): VertexRDD [VD ] =
312- this .mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
198+ def reverseRoutingTables (): VertexRDD [VD ]
313199
314200 /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
315- def withEdges (edges : EdgeRDD [_, _]): VertexRDD [VD ] = {
316- val routingTables = VertexRDD .createRoutingTables(edges, this .partitioner.get)
317- val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true ) {
318- (partIter, routingTableIter) =>
319- val routingTable =
320- if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition .empty
321- partIter.map(_.withRoutingTable(routingTable))
322- }
323- this .withPartitionsRDD(vertexPartitions)
324- }
201+ def withEdges (edges : EdgeRDD [_, _]): VertexRDD [VD ]
325202
326203 /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
327204 private [graphx] def withPartitionsRDD [VD2 : ClassTag ](
328- partitionsRDD : RDD [ShippableVertexPartition [VD2 ]]): VertexRDD [VD2 ] = {
329- new VertexRDD (partitionsRDD, this .targetStorageLevel)
330- }
205+ partitionsRDD : RDD [ShippableVertexPartition [VD2 ]]): VertexRDD [VD2 ]
331206
332207 /**
333208 * Changes the target storage level while preserving all other properties of the
@@ -337,20 +212,14 @@ class VertexRDD[@specialized VD: ClassTag](
337212 * [[org.apache.spark.graphx.VertexRDD#cache ]] on the returned VertexRDD.
338213 */
339214 private [graphx] def withTargetStorageLevel (
340- targetStorageLevel : StorageLevel ): VertexRDD [VD ] = {
341- new VertexRDD (this .partitionsRDD, targetStorageLevel)
342- }
215+ targetStorageLevel : StorageLevel ): VertexRDD [VD ]
343216
344217 /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
345218 private [graphx] def shipVertexAttributes (
346- shipSrc : Boolean , shipDst : Boolean ): RDD [(PartitionID , VertexAttributeBlock [VD ])] = {
347- partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
348- }
219+ shipSrc : Boolean , shipDst : Boolean ): RDD [(PartitionID , VertexAttributeBlock [VD ])]
349220
350221 /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
351- private [graphx] def shipVertexIds (): RDD [(PartitionID , Array [VertexId ])] = {
352- partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
353- }
222+ private [graphx] def shipVertexIds (): RDD [(PartitionID , Array [VertexId ])]
354223
355224} // end of VertexRDD
356225
@@ -376,7 +245,7 @@ object VertexRDD {
376245 val vertexPartitions = vPartitioned.mapPartitions(
377246 iter => Iterator (ShippableVertexPartition (iter)),
378247 preservesPartitioning = true )
379- new VertexRDD (vertexPartitions)
248+ new VertexRDDImpl (vertexPartitions)
380249 }
381250
382251 /**
@@ -421,7 +290,7 @@ object VertexRDD {
421290 if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition .empty
422291 Iterator (ShippableVertexPartition (vertexIter, routingTable, defaultVal, mergeFunc))
423292 }
424- new VertexRDD (vertexPartitions)
293+ new VertexRDDImpl (vertexPartitions)
425294 }
426295
427296 /**
@@ -443,10 +312,10 @@ object VertexRDD {
443312 if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition .empty
444313 Iterator (ShippableVertexPartition (Iterator .empty, routingTable, defaultVal))
445314 }, preservesPartitioning = true )
446- new VertexRDD (vertexPartitions)
315+ new VertexRDDImpl (vertexPartitions)
447316 }
448317
449- private def createRoutingTables (
318+ private [graphx] def createRoutingTables (
450319 edges : EdgeRDD [_, _], vertexPartitioner : Partitioner ): RDD [RoutingTablePartition ] = {
451320 // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
452321 val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
0 commit comments