@@ -207,8 +207,39 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
207207 * }}}
208208 *
209209 */
210- def mapTriplets [ED2 : ClassTag ](map : EdgeTriplet [VD , ED ] => ED2 ): Graph [VD , ED2 ] = {
211- mapTriplets((pid, iter) => iter.map(map))
210+ def mapTriplets [ED2 : ClassTag ](
211+ map : EdgeTriplet [VD , ED ] => ED2 ): Graph [VD , ED2 ] = {
212+ mapTriplets((pid, iter) => iter.map(map), TripletFields .All )
213+ }
214+
215+ /**
216+ * Transforms each edge attribute using the map function, passing it the adjacent vertex
217+ * attributes as well. If adjacent vertex values are not required,
218+ * consider using `mapEdges` instead.
219+ *
220+ * @note This does not change the structure of the
221+ * graph or modify the values of this graph. As a consequence
222+ * the underlying index structures can be reused.
223+ *
224+ * @param map the function from an edge object to a new edge value.
225+ * @param tripletFields which fields should be included in the edge triplet passed to the map
226+ * function. If not all fields are needed, specifying this can improve performance.
227+ *
228+ * @tparam ED2 the new edge data type
229+ *
230+ * @example This function might be used to initialize edge
231+ * attributes based on the attributes associated with each vertex.
232+ * {{{
233+ * val rawGraph: Graph[Int, Int] = someLoadFunction()
234+ * val graph = rawGraph.mapTriplets[Int]( edge =>
235+ * edge.src.data - edge.dst.data)
236+ * }}}
237+ *
238+ */
239+ def mapTriplets [ED2 : ClassTag ](
240+ map : EdgeTriplet [VD , ED ] => ED2 ,
241+ tripletFields : TripletFields ): Graph [VD , ED2 ] = {
242+ mapTriplets((pid, iter) => iter.map(map), tripletFields)
212243 }
213244
214245 /**
@@ -223,12 +254,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
223254 * the underlying index structures can be reused.
224255 *
225256 * @param map the iterator transform
257+ * @param tripletFields which fields should be included in the edge triplet passed to the map
258+ * function. If not all fields are needed, specifying this can improve performance.
226259 *
227260 * @tparam ED2 the new edge data type
228261 *
229262 */
230- def mapTriplets [ED2 : ClassTag ](map : (PartitionID , Iterator [EdgeTriplet [VD , ED ]]) => Iterator [ED2 ])
231- : Graph [VD , ED2 ]
263+ def mapTriplets [ED2 : ClassTag ](
264+ map : (PartitionID , Iterator [EdgeTriplet [VD , ED ]]) => Iterator [ED2 ],
265+ tripletFields : TripletFields ): Graph [VD , ED2 ]
232266
233267 /**
234268 * Reverses all edges in the graph. If this graph contains an edge from a to b then the returned
@@ -287,6 +321,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
287321 * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
288322 * the map phase destined to each vertex.
289323 *
324+ * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
325+ *
290326 * @tparam A the type of "message" to be sent to each vertex
291327 *
292328 * @param mapFunc the user defined map function which returns 0 or
@@ -296,13 +332,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
296332 * be commutative and associative and is used to combine the output
297333 * of the map phase
298334 *
299- * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
300- * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
301- * edges with destination in the active set. If the direction is `Out`,
302- * `mapFunc` will only be run on edges originating from vertices in the active set. If the
303- * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
304- * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
305- * active set. The active set must have the same index as the graph's vertices.
335+ * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
336+ * desired. This is done by specifying a set of "active" vertices and an edge direction. The
337+ * `sendMsg` function will then run only on edges connected to active vertices by edges in the
338+ * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
339+ * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
340+ * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
341+ * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
342+ * will be run on edges with *both* vertices in the active set. The active set must have the
343+ * same index as the graph's vertices.
306344 *
307345 * @example We can use this function to compute the in-degree of each
308346 * vertex
@@ -319,15 +357,88 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
319357 * predicate or implement PageRank.
320358 *
321359 */
360+ @ deprecated(" use aggregateMessages" , " 1.2.0" )
322361 def mapReduceTriplets [A : ClassTag ](
323362 mapFunc : EdgeTriplet [VD , ED ] => Iterator [(VertexId , A )],
324363 reduceFunc : (A , A ) => A ,
325364 activeSetOpt : Option [(VertexRDD [_], EdgeDirection )] = None )
326365 : VertexRDD [A ]
327366
328367 /**
329- * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. The
330- * input table should contain at most one entry for each vertex. If no entry in `other` is
368+ * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
369+ * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
370+ * sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
371+ * destined to the same vertex.
372+ *
373+ * @tparam A the type of message to be sent to each vertex
374+ *
375+ * @param sendMsg runs on each edge, sending messages to neighboring vertices using the
376+ * [[EdgeContext ]].
377+ * @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
378+ * combiner should be commutative and associative.
379+ * @param tripletFields which fields should be included in the [[EdgeContext ]] passed to the
380+ * `sendMsg` function. If not all fields are needed, specifying this can improve performance.
381+ *
382+ * @example We can use this function to compute the in-degree of each
383+ * vertex
384+ * {{{
385+ * val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
386+ * val inDeg: RDD[(VertexId, Int)] =
387+ * aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
388+ * }}}
389+ *
390+ * @note By expressing computation at the edge level we achieve
391+ * maximum parallelism. This is one of the core functions in the
392+ * Graph API in that enables neighborhood level computation. For
393+ * example this function can be used to count neighbors satisfying a
394+ * predicate or implement PageRank.
395+ *
396+ */
397+ def aggregateMessages [A : ClassTag ](
398+ sendMsg : EdgeContext [VD , ED , A ] => Unit ,
399+ mergeMsg : (A , A ) => A ,
400+ tripletFields : TripletFields = TripletFields .All )
401+ : VertexRDD [A ] = {
402+ aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None )
403+ }
404+
405+ /**
406+ * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
407+ * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
408+ * sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
409+ * destined to the same vertex.
410+ *
411+ * This variant can take an active set to restrict the computation and is intended for internal
412+ * use only.
413+ *
414+ * @tparam A the type of message to be sent to each vertex
415+ *
416+ * @param sendMsg runs on each edge, sending messages to neighboring vertices using the
417+ * [[EdgeContext ]].
418+ * @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
419+ * combiner should be commutative and associative.
420+ * @param tripletFields which fields should be included in the [[EdgeContext ]] passed to the
421+ * `sendMsg` function. If not all fields are needed, specifying this can improve performance.
422+ * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
423+ * desired. This is done by specifying a set of "active" vertices and an edge direction. The
424+ * `sendMsg` function will then run on only edges connected to active vertices by edges in the
425+ * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
426+ * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
427+ * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
428+ * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
429+ * will be run on edges with *both* vertices in the active set. The active set must have the
430+ * same index as the graph's vertices.
431+ */
432+ private [graphx] def aggregateMessagesWithActiveSet [A : ClassTag ](
433+ sendMsg : EdgeContext [VD , ED , A ] => Unit ,
434+ mergeMsg : (A , A ) => A ,
435+ tripletFields : TripletFields ,
436+ activeSetOpt : Option [(VertexRDD [_], EdgeDirection )])
437+ : VertexRDD [A ]
438+
439+ /**
440+ * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`.
441+ * The input table should contain at most one entry for each vertex. If no entry in `other` is
331442 * provided for a particular vertex in the graph, the map function receives `None`.
332443 *
333444 * @tparam U the type of entry in the table of updates
0 commit comments