File tree Expand file tree Collapse file tree 2 files changed +21
-0
lines changed
graphx/src/main/scala/org/apache/spark/graphx Expand file tree Collapse file tree 2 files changed +21
-0
lines changed Original file line number Diff line number Diff line change @@ -129,6 +129,16 @@ abstract class VertexRDD[VD](
129129 */
130130 def diff (other : RDD [(VertexId , VD )]): VertexRDD [VD ]
131131
132+ /**
133+ * Hides vertices that are the same between `this` and `other`; for vertices that are different,
134+ * keeps the values from `other`.
135+ *
136+ * SPARK-5922: Deprecates this method call in place of diff(other: RDD[(VertexID, VD)])
137+ *
138+ * @param other the other VertexRDD with which to diff.
139+ */
140+ def diff (other : VertexRDD [VD ]): VertexRDD [VD ]
141+
132142 /**
133143 * Left joins this RDD with another VertexRDD with the same index. This function will fail if
134144 * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
Original file line number Diff line number Diff line change @@ -114,6 +114,17 @@ class VertexRDDImpl[VD] private[graphx] (
114114 this .withPartitionsRDD(newPartitionsRDD)
115115 }
116116
117+ override def diff (other : VertexRDD [VD ]): VertexRDD [VD ] = {
118+ val newPartitionsRDD = partitionsRDD.zipPartitions(
119+ VertexRDD (other).partitionsRDD, preservesPartitioning = true
120+ ) { (thisIter, otherIter) =>
121+ val thisPart = thisIter.next()
122+ val otherPart = otherIter.next()
123+ Iterator (thisPart.diff(otherPart))
124+ }
125+ this .withPartitionsRDD(newPartitionsRDD)
126+ }
127+
117128 override def leftZipJoin [VD2 : ClassTag , VD3 : ClassTag ]
118129 (other : VertexRDD [VD2 ])(f : (VertexId , VD , Option [VD2 ]) => VD3 ): VertexRDD [VD3 ] = {
119130 val newPartitionsRDD = partitionsRDD.zipPartitions(
You can’t perform that action at this time.
0 commit comments