diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d5150382d599..81e6dc4e6f34 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -17,6 +17,7 @@
package org.apache.spark.graphx
+import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag
import scala.util.Random
@@ -25,6 +26,9 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.impl.GraphImpl
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.HashPartitioner
/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -185,6 +189,55 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
}
+ /**
+ * Direct all edges from the lower vertex id to the higher vertex id and aggregate
+ * duplicate edges.
+ *
+ * This function is relatively costly as it requires shuffling all the so that edges between the
+ * same pair of vertices are on the same machine.
+ *
+ * @param merge the function used to merge duplicate edges. The default function takes one of
+ * the edge values an discards the rest.
+ *
+ * @return the graph with all edges canonicalized.
+ */
+ def canonicalizeEdges(merge: (ED, ED) => ED = (a,b) => a): Graph[VD, ED] = {
+ val numPartitions = graph.edges.partitions.length
+ val edTag = classTag[ED]
+ val vdTag = classTag[VD]
+ // Canonicalize the edge directions and then repartition
+ val canonicalEdges = graph.edges.withPartitionsRDD(graph.edges.map { e =>
+ var srcId = e.srcId
+ var dstId = e.dstId
+ if (e.srcId > e.dstId) {
+ srcId = e.dstId
+ dstId = e.srcId
+ }
+ val part = PartitionStrategy.EdgePartition2D.getPartition(srcId, dstId, numPartitions)
+ (part, (srcId, dstId, e.attr))
+ }.partitionBy(new HashPartitioner(numPartitions)).mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
+ iter.foreach { message =>
+ val data = message._2
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true)).cache()
+ // Build a new graph reusing the old vertex rdd and group the edges
+ GraphImpl.fromExistingRDDs(graph.vertices.withEdges(canonicalEdges), canonicalEdges)
+ .groupEdges(merge)
+ }
+
+ /**
+ * Remove self edges.
+ *
+ * @return a graph with all self edges removed
+ */
+ def removeSelfEdges(): Graph[VD, ED] = {
+ graph.subgraph(epred = e => e.srcId != e.dstId)
+ }
+
/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index daf162085e3e..e4a163b69ed0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
import org.apache.spark.graphx._
+import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D
/**
* Compute the number of triangles passing through each vertex.
@@ -27,25 +28,47 @@ import org.apache.spark.graphx._
* The algorithm is relatively straightforward and can be computed in three steps:
*
*
- * - Compute the set of neighbors for each vertex
- *
- For each edge compute the intersection of the sets and send the count to both vertices.
+ *
- Compute the set of neighbors for each vertex
+ *
- For each edge compute the intersection of the sets and send the count to both vertices.
*
- Compute the sum at each vertex and divide by two since each triangle is counted twice.
*
*
- * Note that the input graph should have its edges in canonical direction
- * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
- * using [[org.apache.spark.graphx.Graph#partitionBy]].
+ * There are two implementations. The default `TriangleCount.run` implementation first removes
+ * self cycles and canonicalizes the graph to ensure that the following conditions hold:
+ *
+ * - There are no self edges
+ *
- All edges are oriented src > dst
+ *
- There are no duplicate edges
+ *
+ * However, the canonicalization procedure is costly as it requires repartitioning the graph.
+ * If the input data is already in "canonical form" with self cycles removed then the
+ * `TriangleCount.runPreCanonicalized` should be used instead.
+ *
+ * {{{
+ * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
+ * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
+ * }}}
+ *
*/
object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
- // Remove redundant edges
- val g = graph.groupEdges((a, b) => a).cache()
+ // Transform the edge data something cheap to shuffle and then canonicalize
+ val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().canonicalizeEdges()
+ // Get the triangle counts
+ val counters = runPreCanonicalized(canonicalGraph).vertices
+ // Join them bath with the original graph
+ graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
+ optCounter.getOrElse(0)
+ }
+ }
+
+ def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
- g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
- val set = new VertexSet(4)
+ graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
+ val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
@@ -56,14 +79,14 @@ object TriangleCount {
}
set
}
+
// join the sets with the graph
- val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
+ val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}
+
// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
- assert(ctx.srcAttr != null)
- assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
@@ -80,15 +103,18 @@ object TriangleCount {
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}
+
// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
- g.outerJoinVertices(counters) {
- (vid, _, optCounter: Option[Int]) =>
- val dblCount = optCounter.getOrElse(0)
- // double count should be even (divisible by two)
- assert((dblCount & 1) == 0)
- dblCount / 2
+ graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
+ val dblCount = optCounter.getOrElse(0)
+ // This algorithm double counts each triangle so the final count should be even
+ val isEven = (dblCount & 1) == 0
+ if (!isEven) {
+ throw new Exception("Triangle count resulted in an invalid number of triangles.")
+ }
+ dblCount / 2
}
- } // end of TriangleCount
+ }
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index ea94d4accb63..ea7036cf489b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -56,6 +56,36 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
}
}
+ test("canonicalizeEdges") {
+ withSpark { sc =>
+ val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (3->4), (2->1), (3-> 4), (9->5))
+ .map {
+ case (a,b) => (a.toLong, b.toLong)
+ }
+ val correctEdges = edgeArray.map { case (a,b) => if (a < b) (a,b) else (b, a) }.toSet
+ val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
+ val canonicalizedEdges = graph.canonicalizeEdges().edges.map(e => (e.srcId, e.dstId))
+ .collect
+ assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
+ assert(canonicalizedEdges.toSet === correctEdges)
+ }
+ }
+
+ test("removeSelfEdges") {
+ withSpark { sc =>
+ val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (1->1))
+ .map {
+ case (a,b) => (a.toLong, b.toLong)
+ }
+ val correctEdges = edgeArray.filter { case (a,b) => a != b }.toSet
+ val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
+ val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId))
+ .collect
+ assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
+ assert(canonicalizedEdges.toSet === correctEdges)
+ }
+ }
+
test ("filter") {
withSpark { sc =>
val n = 5
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
index 293c7f3ba4c2..dc1649f644a6 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
@@ -65,9 +65,9 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
- assert(count === 4)
- } else {
assert(count === 2)
+ } else {
+ assert(count === 1)
}
}
}
@@ -76,7 +76,8 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
- Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices