Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,55 +340,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex.
*
* This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
*
* @tparam A the type of "message" to be sent to each vertex
*
* @param mapFunc the user defined map function which returns 0 or
* more messages to neighboring vertices
*
* @param reduceFunc the user defined reduce function which should
* be commutative and associative and is used to combine the output
* of the map phase
*
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
* desired. This is done by specifying a set of "active" vertices and an edge direction. The
* `sendMsg` function will then run only on edges connected to active vertices by edges in the
* specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
* destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
* originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
* run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
* will be run on edges with *both* vertices in the active set. The active set must have the
* same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}}
*
* @note By expressing computation at the edge level we achieve
* maximum parallelism. This is one of the core functions in the
* Graph API in that enables neighborhood level computation. For
* example this function can be used to count neighbors satisfying a
* predicate or implement PageRank.
*
*/
@deprecated("use aggregateMessages", "1.2.0")
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
Expand Down
27 changes: 27 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.graphx

import scala.reflect.ClassTag

import org.apache.spark.SparkConf
import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.{BitSet, OpenHashSet}

object GraphXUtils {

/**
* Registers classes that GraphX uses with Kryo.
*/
Expand All @@ -42,4 +45,28 @@ object GraphXUtils {
classOf[OpenHashSet[Int]],
classOf[OpenHashSet[Long]]))
}

/**
* A proxy method to map the obsolete API to the new one.
*/
private[graphx] def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](
g: Graph[VD, ED],
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
def sendMsg(ctx: EdgeContext[VD, ED, A]) {
mapFunc(ctx.toEdgeTriplet).foreach { kv =>
val id = kv._1
val msg = kv._2
if (id == ctx.srcId) {
ctx.sendToSrc(msg)
} else {
assert(id == ctx.dstId)
ctx.sendToDst(msg)
}
}
}
g.aggregateMessagesWithActiveSet(
sendMsg, reduceFunc, TripletFields.All, activeSetOpt)
}
}
6 changes: 3 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Pregel extends Logging {
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
Expand All @@ -135,8 +135,8 @@ object Pregel extends Logging {
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
Expand Down
25 changes: 0 additions & 25 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// Lower level transformation methods
// ///////////////////////////////////////////////////////////////////////////////////////////////

override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {

def sendMsg(ctx: EdgeContext[VD, ED, A]) {
mapFunc(ctx.toEdgeTriplet).foreach { kv =>
val id = kv._1
val msg = kv._2
if (id == ctx.srcId) {
ctx.sendToSrc(msg)
} else {
assert(id == ctx.dstId)
ctx.sendToDst(msg)
}
}
}

val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)

aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
}

override def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ object SVDPlusPlus {
var gamma7: Double)
extends Serializable

/**
* This method is now replaced by the updated version of `run()` and returns exactly
* the same result.
*/
@deprecated("Call run()", "1.4.0")
def runSVDPlusPlus(edges: RDD[Edge[Double]], conf: Conf)
: (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
{
run(edges, conf)
}

/**
* Implement SVD++ based on "Factorization Meets the Neighborhood:
* a Multifaceted Collaborative Filtering Model",
Expand Down
52 changes: 6 additions & 46 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect().toSet === Set((1L, 2)))
}
}
Expand Down Expand Up @@ -281,49 +282,6 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("mapReduceTriplets") {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)

// activeSetOpt
val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
val numEvenNeighbors = vids.mapReduceTriplets(et => {
// Map function should only run on edges with destination in the active set
if (et.dstId % 2 != 0) {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)

// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
newOpt.getOrElse(old)
}
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)

}
}

test("aggregateMessages") {
withSpark { sc =>
val n = 5
Expand All @@ -347,7 +305,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
val neighborDegreeSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
reverseStarDegrees,
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
Expand Down Expand Up @@ -420,7 +379,8 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
val neighborAttrSums = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph,
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {
Expand Down
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedPythonFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UserDefinedFunction$")
) ++ Seq(
// SPARK-12995 Remove deprecated APIs in graphx
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.lib.SVDPlusPlus.runSVDPlusPlus"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down