Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag
import scala.util.Random

Expand All @@ -25,6 +26,9 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.HashPartitioner

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
Expand Down Expand Up @@ -185,6 +189,55 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
}

/**
* Direct all edges from the lower vertex id to the higher vertex id and aggregate
* duplicate edges.
*
* This function is relatively costly as it requires shuffling all the so that edges between the
* same pair of vertices are on the same machine.
*
* @param merge the function used to merge duplicate edges. The default function takes one of
* the edge values an discards the rest.
*
* @return the graph with all edges canonicalized.
*/
def canonicalizeEdges(merge: (ED, ED) => ED = (a,b) => a): Graph[VD, ED] = {
val numPartitions = graph.edges.partitions.length
val edTag = classTag[ED]
val vdTag = classTag[VD]
// Canonicalize the edge directions and then repartition
val canonicalEdges = graph.edges.withPartitionsRDD(graph.edges.map { e =>
var srcId = e.srcId
var dstId = e.dstId
if (e.srcId > e.dstId) {
srcId = e.dstId
dstId = e.srcId
}
val part = PartitionStrategy.EdgePartition2D.getPartition(srcId, dstId, numPartitions)
(part, (srcId, dstId, e.attr))
}.partitionBy(new HashPartitioner(numPartitions)).mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
// Build a new graph reusing the old vertex rdd and group the edges
GraphImpl.fromExistingRDDs(graph.vertices.withEdges(canonicalEdges), canonicalEdges)
.groupEdges(merge)
}

/**
* Remove self edges.
*
* @return a graph with all self edges removed
*/
def removeSelfEdges(): Graph[VD, ED] = {
graph.subgraph(epred = e => e.srcId != e.dstId)
}

/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,55 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D

/**
* Compute the number of triangles passing through each vertex.
*
* The algorithm is relatively straightforward and can be computed in three steps:
*
* <ul>
* <li>Compute the set of neighbors for each vertex
* <li>For each edge compute the intersection of the sets and send the count to both vertices.
* <li> Compute the set of neighbors for each vertex
* <li> For each edge compute the intersection of the sets and send the count to both vertices.
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.
* </ul>
*
* Note that the input graph should have its edges in canonical direction
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
* using [[org.apache.spark.graphx.Graph#partitionBy]].
* There are two implementations. The default `TriangleCount.run` implementation first removes
* self cycles and canonicalizes the graph to ensure that the following conditions hold:
* <ul>
* <li> There are no self edges
* <li> All edges are oriented src > dst
* <li> There are no duplicate edges
* </ul>
* However, the canonicalization procedure is costly as it requires repartitioning the graph.
* If the input data is already in "canonical form" with self cycles removed then the
* `TriangleCount.runPreCanonicalized` should be used instead.
*
* {{{
* val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
* val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
* }}}
*
*/
object TriangleCount {

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache()
// Transform the edge data something cheap to shuffle and then canonicalize
val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().canonicalizeEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
optCounter.getOrElse(0)
}
}


def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(4)
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
Expand All @@ -56,14 +79,14 @@ object TriangleCount {
}
set
}

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}

// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
assert(ctx.srcAttr != null)
assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
Expand All @@ -80,15 +103,18 @@ object TriangleCount {
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}

// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two)
assert((dblCount & 1) == 0)
dblCount / 2
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// This algorithm double counts each triangle so the final count should be even
val isEven = (dblCount & 1) == 0
if (!isEven) {
throw new Exception("Triangle count resulted in an invalid number of triangles.")
}
dblCount / 2
}
} // end of TriangleCount
}
}
30 changes: 30 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
}
}

test("canonicalizeEdges") {
withSpark { sc =>
val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (3->4), (2->1), (3-> 4), (9->5))
.map {
case (a,b) => (a.toLong, b.toLong)
}
val correctEdges = edgeArray.map { case (a,b) => if (a < b) (a,b) else (b, a) }.toSet
val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
val canonicalizedEdges = graph.canonicalizeEdges().edges.map(e => (e.srcId, e.dstId))
.collect
assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
assert(canonicalizedEdges.toSet === correctEdges)
}
}

test("removeSelfEdges") {
withSpark { sc =>
val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (1->1))
.map {
case (a,b) => (a.toLong, b.toLong)
}
val correctEdges = edgeArray.filter { case (a,b) => a != b }.toSet
val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId))
.collect
assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
assert(canonicalizedEdges.toSet === correctEdges)
}
}

test ("filter") {
withSpark { sc =>
val n = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
assert(count === 2)
} else {
assert(count === 1)
}
}
}
Expand All @@ -76,7 +76,8 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
Expand Down