From 428fa26880bb32f04d0799d2c227e52defb99428 Mon Sep 17 00:00:00 2001 From: Robin East Date: Mon, 14 Sep 2015 22:09:26 +0100 Subject: [PATCH 1/8] Change bytes to bits in RoutingTablePartition.toMessage --- .../org/apache/spark/graphx/impl/RoutingTablePartition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala index eb3c997e0f3c..4f1260a5a67b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala @@ -34,7 +34,7 @@ object RoutingTablePartition { /** * A message from an edge partition to a vertex specifying the position in which the edge * partition references the vertex (src, dst, or both). The edge partition is encoded in the lower - * 30 bytes of the Int, and the position is encoded in the upper 2 bytes of the Int. + * 30 bits of the Int, and the position is encoded in the upper 2 bits of the Int. */ type RoutingTableMessage = (VertexId, Int) From 96fcc0aae84450d6cc3edf046807048b2d8c2db1 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 22 Sep 2014 14:57:28 -0700 Subject: [PATCH 2/8] Improving Triangle Count --- .../spark/graphx/lib/TriangleCount.scala | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a5d598053f9c..a1fa7f4e53ed 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib import scala.reflect.ClassTag import org.apache.spark.graphx._ +import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D /** * Compute the number of triangles passing through each vertex. @@ -39,13 +40,20 @@ import org.apache.spark.graphx._ object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache() + + // Remove self edges and orient remaining edges from low id to high id + val canonicalEdges = graph.edges.filter(e => e.srcId != e.dstId).map { e => + if (e.srcId < e.dstId) (e.srcId, e.dstId) else (e.dstId, e.srcId) + } + + // Automatically group duplicate edges + val reducedGraph = Graph.fromEdgeTuples(canonicalEdges, defaultValue = true, + uniqueEdges = Some(EdgePartition2D)).cache() // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = - g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => - val set = new VertexSet(4) + reducedGraph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => + val set = new VertexSet(nbrs.length) var i = 0 while (i < nbrs.size) { // prevent self cycle @@ -56,14 +64,14 @@ object TriangleCount { } set } + // join the sets with the graph - val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { + val setGraph: Graph[VertexSet, Int] = reducedGraph.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) } + // Edge function computes intersection of smaller vertex with larger vertex def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) { - assert(ctx.srcAttr != null) - assert(ctx.dstAttr != null) val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) { (ctx.srcAttr, ctx.dstAttr) } else { @@ -80,15 +88,18 @@ object TriangleCount { ctx.sendToSrc(counter) ctx.sendToDst(counter) } + // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice - g.outerJoinVertices(counters) { - (vid, _, optCounter: Option[Int]) => - val dblCount = optCounter.getOrElse(0) - // double count should be even (divisible by two) - assert((dblCount & 1) == 0) - dblCount / 2 + graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => + val dblCount = optCounter.getOrElse(0) + // This algorithm double counts each triangle so the final count should be even + val isEven = (dblCount & 1) == 0 + if (!isEven) { + throw new Exception("Triangle count resulted in an invalid number of triangles.") + } + dblCount / 2 } - } // end of TriangleCount + } } From 1edc09df8e32b6717aa300fe62636a9613bcbc27 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 22 Sep 2014 15:16:46 -0700 Subject: [PATCH 3/8] fixing bug in unit tests where bi-directed edges lead to duplicate triangles. --- .../org/apache/spark/graphx/lib/TriangleCountSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala index 608e43cf3ff5..f19c3acdc85c 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { val verts = triangleCount.vertices verts.collect().foreach { case (vid, count) => if (vid == 0) { - assert(count === 4) - } else { assert(count === 2) + } else { + assert(count === 1) } } } @@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count a single triangle with duplicate edges") { withSpark { sc => val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(1L -> 0L, 1L -> 1L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() val triangleCount = graph.triangleCount() val verts = triangleCount.vertices From 47673cadc957eb35dbab01cdcbbe21382987e691 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 12 Nov 2014 23:18:58 -0800 Subject: [PATCH 4/8] factored out code for canonicalization --- .../org/apache/spark/graphx/GraphOps.scala | 54 ++++++++++++++++++- .../spark/graphx/lib/TriangleCount.scala | 43 ++++++++++----- .../apache/spark/graphx/GraphOpsSuite.scala | 30 +++++++++++ 3 files changed, 112 insertions(+), 15 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index d048fb5d561f..77d79bb46275 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -17,12 +17,15 @@ package org.apache.spark.graphx +import scala.reflect.{classTag, ClassTag} import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ -import org.apache.spark.rdd.RDD +import org.apache.spark.graphx.impl.GraphImpl +import org.apache.spark.graphx.impl.EdgePartitionBuilder +import org.apache.spark.HashPartitioner /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -183,6 +186,55 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } } + /** + * Direct all edges from the lower vertex id to the higher vertex id and aggregate + * duplicate edges. + * + * This function is relatively costly as it requires shuffling all the so that edges between the + * same pair of vertices are on the same machine. + * + * @param merge the function used to merge duplicate edges. The default function takes one of + * the edge values an discards the rest. + * + * @return the graph with all edges canonicalized. + */ + def canonicalizeEdges(merge: (ED, ED) => ED = (a,b) => a): Graph[VD, ED] = { + val numPartitions = graph.edges.partitions.length + val edTag = classTag[ED] + val vdTag = classTag[VD] + // Canonicalize the edge directions and then repartition + val canonicalEdges = graph.edges.withPartitionsRDD(graph.edges.map { e => + var srcId = e.srcId + var dstId = e.dstId + if (e.srcId > e.dstId) { + srcId = e.dstId + dstId = e.srcId + } + val part = PartitionStrategy.EdgePartition2D.getPartition(srcId, dstId, numPartitions) + (part, (srcId, dstId, e.attr)) + }.partitionBy(new HashPartitioner(numPartitions)).mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) + iter.foreach { message => + val data = message._2 + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true)).cache() + // Build a new graph reusing the old vertex rdd and group the edges + GraphImpl.fromExistingRDDs(graph.vertices.withEdges(canonicalEdges), canonicalEdges) + .groupEdges(merge) + } + + /** + * Remove self edges. + * + * @return a graph with all self edges removed + */ + def removeSelfEdges(): Graph[VD, ED] = { + graph.subgraph(epred = e => e.srcId != e.dstId) + } + /** * Join the vertices with an RDD and then apply a function from the * vertex and RDD entry to a new vertex value. The input table diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a1fa7f4e53ed..cef665397588 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -28,31 +28,46 @@ import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D * The algorithm is relatively straightforward and can be computed in three steps: * *
    - *
  • Compute the set of neighbors for each vertex - *
  • For each edge compute the intersection of the sets and send the count to both vertices. + *
  • Compute the set of neighbors for each vertex + *
  • For each edge compute the intersection of the sets and send the count to both vertices. *
  • Compute the sum at each vertex and divide by two since each triangle is counted twice. *
* - * Note that the input graph should have its edges in canonical direction - * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned - * using [[org.apache.spark.graphx.Graph#partitionBy]]. + * There are two implementations. The default `TriangleCount.run` implementation first removes + * self cycles and canonicalizes the graph to ensure that the following conditions hold: + *
    + *
  • There are no self edges + *
  • All edges are oriented src > dst + *
  • There are no duplicate edges + *
+ * However, the canonicalization procedure is costly as it requires repartitioning the graph. + * If the input data is already in "canonical form" with self cycles removed then the + * `TriangleCount.runPreCanonicalized` should be used instead. + * + * {{{ + * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges() + * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices + * }}} + * */ object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { - - // Remove self edges and orient remaining edges from low id to high id - val canonicalEdges = graph.edges.filter(e => e.srcId != e.dstId).map { e => - if (e.srcId < e.dstId) (e.srcId, e.dstId) else (e.dstId, e.srcId) + // Transform the edge data something cheap to shuffle and then canonicalize + val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().canonicalizeEdges() + // Get the triangle counts + val counters = runPreCanonicalized(canonicalGraph).vertices + // Join them bath with the original graph + graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => + optCounter.getOrElse(0) } + } - // Automatically group duplicate edges - val reducedGraph = Graph.fromEdgeTuples(canonicalEdges, defaultValue = true, - uniqueEdges = Some(EdgePartition2D)).cache() + def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = - reducedGraph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => + graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => val set = new VertexSet(nbrs.length) var i = 0 while (i < nbrs.size) { @@ -66,7 +81,7 @@ object TriangleCount { } // join the sets with the graph - val setGraph: Graph[VertexSet, Int] = reducedGraph.outerJoinVertices(nbrSets) { + val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 57a8b95dd12e..dba09eb09262 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -55,6 +55,36 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext { } } + test("canonicalizeEdges") { + withSpark { sc => + val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (3->4), (2->1), (3-> 4), (9->5)) + .map { + case (a,b) => (a.toLong, b.toLong) + } + val correctEdges = edgeArray.map { case (a,b) => if (a < b) (a,b) else (b, a) }.toSet + val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1) + val canonicalizedEdges = graph.canonicalizeEdges().edges.map(e => (e.srcId, e.dstId)) + .collect + assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size) + assert(canonicalizedEdges.toSet === correctEdges) + } + } + + test("removeSelfEdges") { + withSpark { sc => + val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (1->1)) + .map { + case (a,b) => (a.toLong, b.toLong) + } + val correctEdges = edgeArray.filter { case (a,b) => a != b }.toSet + val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1) + val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId)) + .collect + assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size) + assert(canonicalizedEdges.toSet === correctEdges) + } + } + test ("filter") { withSpark { sc => val n = 5 From c6cd74792d4f82e562d1c792d322f17b1877d4af Mon Sep 17 00:00:00 2001 From: Robin East Date: Sat, 20 Feb 2016 21:46:49 +0000 Subject: [PATCH 5/8] SPARK-3650 updates to PR 2495 to work with current master --- R/pkg/DESCRIPTION | 5 +- .../org/apache/spark/graphx/GraphOps.scala | 48 ++----------------- .../spark/graphx/lib/TriangleCount.scala | 4 +- .../apache/spark/graphx/GraphOpsSuite.scala | 21 ++------ 4 files changed, 13 insertions(+), 65 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 465bc37788e5..0cd0d75df0f7 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -18,10 +18,10 @@ Collate: 'schema.R' 'generics.R' 'jobj.R' - 'RDD.R' - 'pairRDD.R' 'column.R' 'group.R' + 'RDD.R' + 'pairRDD.R' 'DataFrame.R' 'SQLContext.R' 'backend.R' @@ -36,3 +36,4 @@ Collate: 'stats.R' 'types.R' 'utils.R' +RoxygenNote: 5.0.1 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 77d79bb46275..f0ab3f018a5d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -21,11 +21,13 @@ import scala.reflect.{classTag, ClassTag} import scala.reflect.ClassTag import scala.util.Random +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkContext._ import org.apache.spark.SparkException -import org.apache.spark.graphx.lib._ -import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.graphx.impl.EdgePartitionBuilder -import org.apache.spark.HashPartitioner +import org.apache.spark.graphx.impl.GraphImpl +import org.apache.spark.graphx.lib._ +import org.apache.spark.rdd.RDD /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -186,46 +188,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } } - /** - * Direct all edges from the lower vertex id to the higher vertex id and aggregate - * duplicate edges. - * - * This function is relatively costly as it requires shuffling all the so that edges between the - * same pair of vertices are on the same machine. - * - * @param merge the function used to merge duplicate edges. The default function takes one of - * the edge values an discards the rest. - * - * @return the graph with all edges canonicalized. - */ - def canonicalizeEdges(merge: (ED, ED) => ED = (a,b) => a): Graph[VD, ED] = { - val numPartitions = graph.edges.partitions.length - val edTag = classTag[ED] - val vdTag = classTag[VD] - // Canonicalize the edge directions and then repartition - val canonicalEdges = graph.edges.withPartitionsRDD(graph.edges.map { e => - var srcId = e.srcId - var dstId = e.dstId - if (e.srcId > e.dstId) { - srcId = e.dstId - dstId = e.srcId - } - val part = PartitionStrategy.EdgePartition2D.getPartition(srcId, dstId, numPartitions) - (part, (srcId, dstId, e.attr)) - }.partitionBy(new HashPartitioner(numPartitions)).mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) - iter.foreach { message => - val data = message._2 - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true)).cache() - // Build a new graph reusing the old vertex rdd and group the edges - GraphImpl.fromExistingRDDs(graph.vertices.withEdges(canonicalEdges), canonicalEdges) - .groupEdges(merge) - } - /** * Remove self edges. * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index cef665397588..f97240905677 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -54,7 +54,7 @@ object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Transform the edge data something cheap to shuffle and then canonicalize - val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().canonicalizeEdges() + val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges() // Get the triangle counts val counters = runPreCanonicalized(canonicalGraph).vertices // Join them bath with the original graph @@ -64,7 +64,7 @@ object TriangleCount { } - def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { + def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index dba09eb09262..3967f6683de7 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -55,28 +55,13 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext { } } - test("canonicalizeEdges") { - withSpark { sc => - val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (3->4), (2->1), (3-> 4), (9->5)) - .map { - case (a,b) => (a.toLong, b.toLong) - } - val correctEdges = edgeArray.map { case (a,b) => if (a < b) (a,b) else (b, a) }.toSet - val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1) - val canonicalizedEdges = graph.canonicalizeEdges().edges.map(e => (e.srcId, e.dstId)) - .collect - assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size) - assert(canonicalizedEdges.toSet === correctEdges) - } - } - test("removeSelfEdges") { withSpark { sc => - val edgeArray = Array((1->2), (2->3), (3 ->3), (4 ->3), (1->1)) + val edgeArray = Array((1 -> 2), (2 -> 3), (3 -> 3), (4 -> 3), (1 -> 1)) .map { - case (a,b) => (a.toLong, b.toLong) + case (a, b) => (a.toLong, b.toLong) } - val correctEdges = edgeArray.filter { case (a,b) => a != b }.toSet + val correctEdges = edgeArray.filter { case (a, b) => a != b }.toSet val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1) val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId)) .collect From c8ad0bd4ed998b86a465bc36ec59ddc5dcceef5e Mon Sep 17 00:00:00 2001 From: Robin East Date: Sun, 21 Feb 2016 11:27:10 +0000 Subject: [PATCH 6/8] revert unexpected changes to R/pkg/DESCRIPTION --- R/pkg/DESCRIPTION | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 0cd0d75df0f7..465bc37788e5 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -18,10 +18,10 @@ Collate: 'schema.R' 'generics.R' 'jobj.R' - 'column.R' - 'group.R' 'RDD.R' 'pairRDD.R' + 'column.R' + 'group.R' 'DataFrame.R' 'SQLContext.R' 'backend.R' @@ -36,4 +36,3 @@ Collate: 'stats.R' 'types.R' 'utils.R' -RoxygenNote: 5.0.1 From d56ad1a4b033806c0dc1a3a97a47562f775e1e74 Mon Sep 17 00:00:00 2001 From: Robin East Date: Sun, 21 Feb 2016 13:17:36 +0000 Subject: [PATCH 7/8] add closing tags --- .../org/apache/spark/graphx/lib/TriangleCount.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index f97240905677..a62cb967d4f0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -28,17 +28,17 @@ import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D * The algorithm is relatively straightforward and can be computed in three steps: * *
    - *
  • Compute the set of neighbors for each vertex - *
  • For each edge compute the intersection of the sets and send the count to both vertices. - *
  • Compute the sum at each vertex and divide by two since each triangle is counted twice. + *
  • Compute the set of neighbors for each vertex
  • + *
  • For each edge compute the intersection of the sets and send the count to both vertices.
  • + *
  • Compute the sum at each vertex and divide by two since each triangle is counted twice.
  • *
* * There are two implementations. The default `TriangleCount.run` implementation first removes * self cycles and canonicalizes the graph to ensure that the following conditions hold: *
    - *
  • There are no self edges - *
  • All edges are oriented src > dst - *
  • There are no duplicate edges + *
  • There are no self edges
  • + *
  • All edges are oriented src > dst
  • + *
  • There are no duplicate edges
  • *
* However, the canonicalization procedure is costly as it requires repartitioning the graph. * If the input data is already in "canonical form" with self cycles removed then the From 1e6f5d2e01ea7902a1c7ec5261e21e855ed8b073 Mon Sep 17 00:00:00 2001 From: Robin East Date: Sun, 21 Feb 2016 15:04:47 +0000 Subject: [PATCH 8/8] code tidy up of run --- .../scala/org/apache/spark/graphx/lib/TriangleCount.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a62cb967d4f0..51bcdf20dec4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -107,13 +107,10 @@ object TriangleCount { // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice - graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => + graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0) // This algorithm double counts each triangle so the final count should be even - val isEven = (dblCount & 1) == 0 - if (!isEven) { - throw new Exception("Triangle count resulted in an invalid number of triangles.") - } + require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.") dblCount / 2 } }