Skip to content
14 changes: 14 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -183,6 +188,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
}

/**
* Remove self edges.
*
* @return a graph with all self edges removed
*/
def removeSelfEdges(): Graph[VD, ED] = {
graph.subgraph(epred = e => e.srcId != e.dstId)
}

/**
* Join the vertices with an RDD and then apply a function from the
* vertex and RDD entry to a new vertex value. The input table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,55 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D

/**
* Compute the number of triangles passing through each vertex.
*
* The algorithm is relatively straightforward and can be computed in three steps:
*
* <ul>
* <li>Compute the set of neighbors for each vertex
* <li>For each edge compute the intersection of the sets and send the count to both vertices.
* <li> Compute the set of neighbors for each vertex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the <li> should be closed if you're writing HTML tags

* <li> For each edge compute the intersection of the sets and send the count to both vertices.
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.
* </ul>
*
* Note that the input graph should have its edges in canonical direction
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
* using [[org.apache.spark.graphx.Graph#partitionBy]].
* There are two implementations. The default `TriangleCount.run` implementation first removes
* self cycles and canonicalizes the graph to ensure that the following conditions hold:
* <ul>
* <li> There are no self edges
* <li> All edges are oriented src > dst
* <li> There are no duplicate edges
* </ul>
* However, the canonicalization procedure is costly as it requires repartitioning the graph.
* If the input data is already in "canonical form" with self cycles removed then the
* `TriangleCount.runPreCanonicalized` should be used instead.
*
* {{{
* val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
* val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
* }}}
*
*/
object TriangleCount {

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache()
// Transform the edge data something cheap to shuffle and then canonicalize
val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just { (_, _, optCounter) => right?

optCounter.getOrElse(0)
}
}


def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(4)
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
Expand All @@ -56,14 +79,14 @@ object TriangleCount {
}
set
}

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}

// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
assert(ctx.srcAttr != null)
assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
Expand All @@ -80,15 +103,18 @@ object TriangleCount {
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}

// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two)
assert((dblCount & 1) == 0)
dblCount / 2
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// This algorithm double counts each triangle so the final count should be even
val isEven = (dblCount & 1) == 0
if (!isEven) {
throw new Exception("Triangle count resulted in an invalid number of triangles.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just require(...) this condition. (dblCount % 2) == 0 would be a little more canonical

}
dblCount / 2
}
} // end of TriangleCount
}
}
15 changes: 15 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("removeSelfEdges") {
withSpark { sc =>
val edgeArray = Array((1 -> 2), (2 -> 3), (3 -> 3), (4 -> 3), (1 -> 1))
.map {
case (a, b) => (a.toLong, b.toLong)
}
val correctEdges = edgeArray.filter { case (a, b) => a != b }.toSet
val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId))
.collect
assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
assert(canonicalizedEdges.toSet === correctEdges)
}
}

test ("filter") {
withSpark { sc =>
val n = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
assert(count === 2)
} else {
assert(count === 1)
}
}
}
Expand All @@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
Expand Down