diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 4611a3ace219..139126024d6d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -85,6 +85,21 @@ abstract class EdgeRDD[ED]( (other: EdgeRDD[ED2]) (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] + /** + * Unions this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same + * [[PartitionStrategy]]. + * + * @param other the EdgeRDD to union with + * @param f the union function applied to corresponding values of `this` and `other` + * @param map the map function that maps `this` edge to unionized edge + * @param rmap the map function that maps `other` edge to unionized edge + * @return a new EdgeRDD containing all edges that appear in both `this` and `other`, + * with values supplied by `f` + */ + def union[ED2: ClassTag, ED3: ClassTag] + (other: EdgeRDD[ED2]) (f: (VertexId, VertexId, ED, ED2) => ED3) + (map: (ED) => ED3, rmap: (ED2) => ED3) : EdgeRDD[ED3] + /** * Changes the target storage level while preserving all other properties of the * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index db73a8abc573..fbf0852b7cd1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -340,6 +340,27 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] + /** + * Merges two graphs into one single graph. For correct results, the graph + * must have been partitioned using [[partitionBy]]. + * + * @param other the graph to merge current graph with + * @param mergeEdges the user-supplied commutative associative function to + * merge edge attributes for duplicate edges. + * @param mergeVertices the user-supplied commutative associative function + * to merge vertex attributes for duplicate vertices + * @tparam VD2 other graph vertex type + * @tparam ED2 other graph edge type + * @tparam VD3 unionized graph vertex type + * @tparam ED3 unionized graph edge type + * @return The resulting graph with union of vertices from each constituent + * graph and a single edge for each (source, dest) vertex pair in either graph + */ + def union[VD2: ClassTag, VD3: ClassTag, ED2: ClassTag, ED3: ClassTag] + (other: Graph[VD2, ED2], + mergeEdges: (VertexId, VertexId, ED, ED2) => ED3, + mergeVertices: (VertexId, VertexId, VD, VD2) => VD3): Graph[VD3, ED3] + /** * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index a9f04b559c3d..127ec7eeef65 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -214,6 +214,30 @@ abstract class VertexRDD[VD]( def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] + /** + * Efficiently unions this VertexRDD with another VertexRDD sharing the same index. See + * [[union]] for the behavior of the union. + */ + + def unionZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) + (f: (VertexId, VertexId, VD, U) => VD2): VertexRDD[VD2] + + /** + * Unions this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is + * backed by a VertexRDD with the same index then the efficient [[union]] implementation + * is used. + * + * @param other an RDD containing vertices to union with. If there are multiple entries + * for the same vertex, one is picked arbitrarily. + * Use [[aggregateUsingIndex]] to merge multiple entries. + * @param f the union function applied to corresponding values of `this` and `other` + * @return a VertexRDD co-indexed with `this`, containing vertices that appear in both + * `this` and `other`, with values supplied by `f` + */ + + def unionJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (f: (VertexId, VertexId, VD, U) => VD2) : VertexRDD[VD2] + /** * Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a * VertexRDD co-indexed with `this`. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index ab021a252eb8..86f5ef1c7eb1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -17,6 +17,10 @@ package org.apache.spark.graphx.impl +import java.util + +import scala.collection.mutable +import scala.collection.mutable.HashMap import scala.reflect.{classTag, ClassTag} import org.apache.spark.graphx._ @@ -107,7 +111,7 @@ class EdgePartition[ @inline private def dstIds(pos: Int): VertexId = local2global(localDstIds(pos)) - @inline private def attrs(pos: Int): ED = data(pos) + @inline def attrs(pos: Int): ED = data(pos) /** Look up vid in activeSet, throwing an exception if it is None. */ def isActive(vid: VertexId): Boolean = { @@ -306,6 +310,73 @@ class EdgePartition[ builder.toEdgePartition } + /** + * Apply `f` to all edges present in either `this` or `other` and return a new `EdgePartition` + * containing the resulting edges. + * + * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for + * each edge, but each time it may be invoked on any corresponding edge in `other`. + * + * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked + * once. + */ + def union[ED2: ClassTag, ED3: ClassTag] + (other: EdgePartition[ED2, _]) + (f: (VertexId, VertexId, ED, ED2) => ED3) + (map: (ED) => ED3, rmap: (ED2) => ED3) : EdgePartition[ED3, VD] = { + val builder = new EdgePartitionBuilder[ED3, VD]() + var i = 0 + var j = 0 + // For i = index of each edge in `this`... + + val indexedEdgesThisPartition: + HashMap[Long, Tuple5[VertexId, VertexId, ED, PartitionID, PartitionID]] = HashMap() + val indexedEdgesOtherPartition : + HashMap[Long, Tuple5[VertexId, VertexId, ED2, PartitionID, PartitionID]] = HashMap() + + while (i < this.size) { + indexedEdgesThisPartition.put(31*this.srcIds(i) + this.dstIds(i), + (this.srcIds(i), this.dstIds(i), this.data(i), localSrcIds(i), localDstIds(i))) + i = i + 1 + } + while (j < other.size) { + indexedEdgesOtherPartition.put(31*other.srcIds(j) + other.dstIds(j), + (other.srcIds(j), other.dstIds(j), other.attrs(j), localSrcIds(j), localDstIds(j))) + j = j + 1 + } + + def unionizePartitions[ED: ClassTag, ED2: ClassTag]( + thisPartitionMap: HashMap[Long, (VertexId, VertexId, ED, PartitionID, PartitionID)], + otherPartitionMap: HashMap[Long, (VertexId, VertexId, ED2, PartitionID, PartitionID)]) + (f: (VertexId, VertexId, ED, ED2) => ED3) + (map: (ED) => ED3): Unit = { + val iter = thisPartitionMap.keysIterator + while (iter.hasNext) { + val key = iter.next() + val srcId = thisPartitionMap.get(key).get._1 + val dstId = thisPartitionMap.get(key).get._2 + val localSrcId: PartitionID = thisPartitionMap.get(key).get._4 + val localDstId: PartitionID = thisPartitionMap.get(key).get._5 + if (otherPartitionMap.contains(key)) { + builder.add(srcId, dstId, + f(srcId, dstId, thisPartitionMap.get(key).get._3, otherPartitionMap.get(key).get._3)) + otherPartitionMap -= key + } + else { + builder.add(srcId, dstId, + map(thisPartitionMap.get(key).get._3)) + } + } + } + + unionizePartitions[ED, ED2](indexedEdgesThisPartition, indexedEdgesOtherPartition)(f)(map) + unionizePartitions[ED2, ED](indexedEdgesOtherPartition, + indexedEdgesThisPartition)((v1: VertexId, v2: VertexId, + e2: ED2, e1: ED) => f(v1, v2, e1, e2))(rmap) + + builder.toEdgePartition + } + /** * The number of edges in this partition * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index c88b2f65a86c..cc9b81d7864c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -113,6 +113,19 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( }) } + override def union[ED2: ClassTag, ED3: ClassTag] + (other: EdgeRDD[ED2]) (f: (VertexId, VertexId, ED, ED2) => ED3) + (map: (ED) => ED3, rmap: (ED2) => ED3) : EdgeRDDImpl[ED3, VD] = { + val ed2Tag = classTag[ED2] + val ed3Tag = classTag[ED3] + this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + (thisIter, otherIter) => + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, thisEPart.union(otherEPart)(f)(map, rmap)(ed2Tag, ed3Tag))) + }) + } + def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = { this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d23a26c..684c021e226e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} -import org.apache.spark.HashPartitioner +import org.apache.spark.{graphx, HashPartitioner} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.storage.StorageLevel @@ -184,6 +184,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) } + override def union[VD2: ClassTag, VD3: ClassTag, ED2: ClassTag, ED3: ClassTag] + (other: Graph[VD2, ED2], + mergeEdges: (VertexId, VertexId, ED, ED2) => ED3, + mergeVertices: (VertexId, VertexId, VD, VD2) => VD3): Graph[VD3, ED3] = { + val newVertices : VertexRDD[VD3] = vertices.unionJoin(other.vertices)(mergeVertices) + val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD3, ED]] + .updateVertices(newVertices) + val newEdges : EdgeRDDImpl[ED3, VD3] = + newReplicatedVertexView.edges.union(other.edges)(mergeEdges)( + (edge: ED) => edge.asInstanceOf[ED3], (edge: ED2) => edge.asInstanceOf[ED3]) + + new GraphImpl[VD3, ED3](newVertices, newReplicatedVertexView.withEdges(newEdges)) + } // /////////////////////////////////////////////////////////////////////////////////////////////// // Lower level transformation methods // /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala index 5ad6390a56c4..2a4a75949eeb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala @@ -51,6 +51,25 @@ private[graphx] object VertexPartitionBase { } (map.keySet, map._values, map.keySet.getBitSet) } + + /** + * Construct the constituents of a VertexPartitionBase from the given two sets of vertices, + * merging duplicate entries using 'mergeFunc' + */ + + + def initFrom[VD: ClassTag](iter1: Iterator[(VertexId, VD)], + iter2: Iterator[(VertexId, VD)], mergeFunc : (VD, VD) => VD) + : (VertexIdToIndexMap, Array[VD], BitSet) = { + val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD] + iter1.foreach { pair => + map.setMerge(pair._1, pair._2, mergeFunc) + } + iter2.foreach { pair => + map.setMerge(pair._1, pair._2, mergeFunc) + } + (map.keySet, map._values, map.keySet.getBitSet) + } } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala index b90f9fa32705..17e51d89b504 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala @@ -179,6 +179,47 @@ private[graphx] abstract class VertexPartitionBaseOps innerJoin(createUsingIndex(iter))(f) } + /** Union another VertexPartition. */ + def union[U: ClassTag, VD2: ClassTag] + (other: Self[U]) + (f: (VertexId, VertexId, VD, U) => VD2): Self[VD2] = { + if (self.index != other.index) { + logWarning("Unioning two VertexPartitions with different indexes is slow.") + union(createUsingIndex(other.iterator))(f) + } else { + val newMask = self.mask | other.mask + val newValues = new Array[VD2](self.capacity + other.capacity) + val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD2] + var i = newMask.nextSetBit(0) + while (i >= 0) { + if (self.mask.get(i) && other.mask.get(i)) { + newValues(i) = f(self.index.getValue(i), other.index.getValue(i), + self.values(i), other.values(i)) + hashMap.update(self.index.getValue(i), newValues(i)) + } + else if (self.mask.get(i)) { + newValues(i) = self.values(i).asInstanceOf[VD2] + hashMap.update(self.index.getValue(i), newValues(i)) + } + else { + newValues(i) = other.values(i).asInstanceOf[VD2] + hashMap.update(other.index.getValue(i), newValues(i)) + } + i = newMask.nextSetBit(i + 1) + } + this.withIndex(hashMap.keySet).withValues(newValues).withMask(newMask) + } + } + + /** + * Union an iterator of messages. + */ + def union[U: ClassTag, VD2: ClassTag] + (iter: Iterator[Product2[VertexId, U]]) + (f: (VertexId, VertexId, VD, U) => VD2): Self[VD2] = { + union(createUsingIndex(iter))(f) + } + /** * Similar effect as aggregateUsingIndex((a, b) => a) */ diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 33ac7b0ed609..06557be93f74 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -209,6 +209,36 @@ class VertexRDDImpl[VD] private[graphx] ( } } + override def unionZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) + (f: (VertexId, VertexId, VD, U) => VD2): VertexRDD[VD2] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.union(otherPart)(f)) + } + this.withPartitionsRDD(newPartitionsRDD) + } + + override def unionJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (f: (VertexId, VertexId, VD, U) => VD2) : VertexRDD[VD2] = { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient union + var msgs = null + other match { + case other: VertexRDD[_] if this.partitioner == other.partitioner => + unionZipJoin(other)(f) + case _ => + this.withPartitionsRDD( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) { + (partIter, msgs) => partIter.map(_.union(msgs)(f)) + } + ) + } + } + override def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = messages.partitionBy(this.partitioner.get) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 1f5e27d5508b..8867e3f8d9a5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -267,6 +267,37 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { } } + test("summation") { + withSpark { sc => + val n = 8 + val verticesG = sc.parallelize((1 to n).map(x => (x: VertexId, x))) + val edgesG = sc.parallelize(Seq( + Edge(1, 2, 1), Edge(1, 3, 1), Edge(1, 4, 1), Edge(2, 3, 2), Edge(2, 4, 2), Edge(3, 4, 3), + Edge(4, 5, 4), Edge(4, 6, 4), Edge(5, 6, 5))) + val graphG: Graph[Int, Int] = Graph(verticesG, edgesG).cache() + + val verticesH = sc.parallelize((5 to 8).map(x => (x: VertexId, x))) + val edgesH = sc.parallelize(Seq( + Edge(5, 6, 5), Edge(5, 7, 5), Edge(5, 8, 5), Edge(6, 7, 6), Edge(6, 8, 6), Edge(7, 8, 7))) + val graphH: Graph[Int, Int] = Graph(verticesH, edgesH).cache() + + val projectedGraph = graphG.union[Int, Int, Int, Int](graphH, + (src, dst, a, b) => a - b, + (src, dst, a, b) => a + b) + + val v = projectedGraph.vertices.collect().toSet + assert(v === (1 to 8).map(e => (if (e < 5) (e, e) else (e, 2*e))).toSet) + + // the map is necessary because of object-reuse in the edge iterator + val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect() + assert(e.toSet === Set(Edge(1, 3, 1), Edge(6, 8, 6), Edge(3, 4, 3), + Edge(4, 6, 4), Edge(2, 3, 2), Edge(2, 4, 2), + Edge(7, 8, 7), Edge(5, 7, 5), Edge(1, 2, 1), + Edge(6, 7, 6), Edge(5, 8, 5), Edge(4, 5, 4), + Edge(1, 4, 1), Edge(5, 6, 0))) + } + } + test("groupEdges") { withSpark { sc => val n = 5 diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index f1aa685a79c9..c54b5674a9a1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -163,6 +163,38 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("union") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n).cache() + val evens = verts.filter(q => ((q._2 % 2) == 0)).cache() + val odds : VertexRDD[Int] = verts.filter(q => ((q._2 % 2) == 1)).cache() + // union with another VertexRDD + assert(odds.unionJoin(evens) { (id1, id2, a, b) => a + b }.collect().toSet === + (0 to n by 1).map(x => (x.toLong, x)).toSet) + // union with an RDD + val evensRDD = evens.map(identity) + assert(odds.unionJoin[Int, Int](evensRDD) { (id1, id2, a, b) => a + b }.collect().toSet === + (0 to n by 1).map(x => (x.toLong, x)).toSet) } + } + + test("union vertices with the non-equal number of partitions") { + withSpark { sc => + val n = 10 + val vertexA = VertexRDD( + sc.parallelize(1 to n).map(i => (i.toLong, i))) + val vertexB = VertexRDD( + vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3))) + assert(vertexA.partitions.size != vertexB.partitions.size) + val vertexC = vertexA.unionJoin(vertexB) { (vid1, vid2, old, newVal) => + old + newVal + } + assert(vertexC.collect().toSet == + (1 to n).map(x => if (x % 2 == 0) (x.toLong, 2*x) + else (x.toLong, x)).toSet) + } + } + test("aggregateUsingIndex") { withSpark { sc => val n = 100 diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 7435647c6d9e..3639d0ec77d1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -90,6 +90,17 @@ class EdgePartitionSuite extends SparkFunSuite { List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) } + test("union") { + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) + val a = makeEdgePartition(aList) + val b = makeEdgePartition(bList) + + assert(a.union(b)((src, dst, a, b) => a )(e => e, e => e).iterator.map(_.copy()).toList === + List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(1, 1, 0), + Edge(1, 2, 0), Edge(3, 4, 0), Edge(5, 4, 0), Edge(5, 5, 0))) + } + test("isActive, numActives, replaceActives") { val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition .withActiveSet(Iterator(0L, 2L, 0L))