Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.impl;

/**
* Criteria for filtering edges based on activeness. For internal use only.
*/
public enum EdgeActiveness {
/** Neither the source vertex nor the destination vertex need be active. */
Neither,
/** The source vertex must be active. */
SrcOnly,
/** The destination vertex must be active. */
DstOnly,
/** Both vertices must be active. */
Both,
/** At least one vertex must be active. */
Either
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class EdgePartition[
activeSet: Option[VertexSet])
extends Serializable {

/** No-arg constructor for serialization. */
private def this() = this(null, null, null, null, null, null, null, null)

/** Return a new `EdgePartition` with the specified edge data. */
Expand Down Expand Up @@ -375,22 +376,15 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
* @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
* active set
* @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
* the active set
* @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
* considered
* @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
def aggregateMessagesEdgeScan[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
srcMustBeActive: Boolean,
dstMustBeActive: Boolean,
maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

Expand All @@ -401,10 +395,13 @@ class EdgePartition[
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
val srcIsActive = !srcMustBeActive || isActive(srcId)
val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId)
else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId)
else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId)
else throw new Exception("unreachable")
if (edgeIsActive) {
val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
Expand All @@ -424,22 +421,15 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
* @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
* active set
* @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
* the active set
* @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
* considered
* @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
def aggregateMessagesIndexScan[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
srcMustBeActive: Boolean,
dstMustBeActive: Boolean,
maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

Expand All @@ -448,18 +438,30 @@ class EdgePartition[
val clusterSrcId = cluster._1
val clusterPos = cluster._2
val clusterLocalSrcId = localSrcIds(clusterPos)
val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
if (srcIsActive || maySatisfyEither) {

val scanCluster =
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId)
else if (activeness == EdgeActiveness.DstOnly) true
else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId)
else if (activeness == EdgeActiveness.Either) true
else throw new Exception("unreachable")

if (scanCluster) {
var pos = clusterPos
val srcAttr =
if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD]
ctx.setSrcOnly(clusterSrcId, clusterLocalSrcId, srcAttr)
while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
val localDstId = localDstIds(pos)
val dstId = local2global(localDstId)
val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) true
else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
else if (activeness == EdgeActiveness.Both) isActive(dstId)
else if (activeness == EdgeActiveness.Either) isActive(clusterSrcId) || isActive(dstId)
else throw new Exception("unreachable")
if (edgeIsActive) {
val dstAttr =
if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
true, true, false)
EdgeActiveness.Both)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, true, false)
EdgeActiveness.Both)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, true, true)
EdgeActiveness.Either)
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
true, false, false)
EdgeActiveness.SrcOnly)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, false, false)
EdgeActiveness.SrcOnly)
}
case Some(EdgeDirection.In) =>
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
false, true, false)
EdgeActiveness.DstOnly)
case _ => // None
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
false, false, false)
EdgeActiveness.Neither)
}
}).setName("GraphImpl.aggregateMessages - preAgg")

Expand Down