Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 187 additions & 75 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.BitSet

/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
Expand All @@ -30,54 +31,76 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
* @tparam ED the edge attribute type
* @tparam VD the vertex attribute type
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param localSrcIds the local source vertex id of each edge as an index into `local2global` and
* `vertexAttrs`
* @param localDstIds the local destination vertex id of each edge as an index into `local2global`
* and `vertexAttrs`
* @param data the attribute associated with each edge
* @param index a clustered index on source vertex id
* @param vertices a map from referenced vertex ids to their corresponding attributes. Must
* contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
* those vertex ids. The mask is not used.
* @param index a clustered index on source vertex id as a map from each global source vertex id to
* the offset in the edge arrays where the cluster for that vertex id begins
* @param global2local a map from referenced vertex ids to local ids which index into vertexAttrs
* @param local2global an array of global vertex ids where the offsets are local vertex ids
* @param vertexAttrs an array of vertex attributes where the offsets are local vertex ids
* @param activeSet an optional active vertex set for filtering computation on the edges
*/
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
val srcIds: Array[VertexId] = null,
val dstIds: Array[VertexId] = null,
val localSrcIds: Array[Int] = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create an explicit empty ctor instead of having null value for everything? and in that ctor say it is only needed for serialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also try to make all of these private rather than val's. (just remove the val)

val localDstIds: Array[Int] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val vertices: VertexPartition[VD] = null,
val global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val local2global: Array[VertexId] = null,
val vertexAttrs: Array[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {

/** Return a new `EdgePartition` with the specified edge data. */
def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
}

/** Return a new `EdgePartition` with the specified vertex partition. */
def withVertices[VD2: ClassTag](
vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u explain data is indexed by local vid

new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}

/** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
val activeSet = new VertexSet
iter.foreach(activeSet.add(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while

new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs,
Some(activeSet))
}

/** Return a new `EdgePartition` with the specified active set. */
def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
def withActiveSet(activeSet: Option[VertexSet]): EdgePartition[ED, VD] = {
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}

/** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
this.withVertices(vertices.innerJoinKeepLeft(iter))
val newVertexAttrs = new Array[VD](vertexAttrs.length)
System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
iter.foreach { kv =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rewrite this with while loop

newVertexAttrs(global2local(kv._1)) = kv._2
}
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
activeSet)
}

/** Return a new `EdgePartition` without any locally cached vertex attributes. */
def clearVertices[VD2: ClassTag](): EdgePartition[ED, VD2] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to pick a new name that implies immutability.

val newVertexAttrs = new Array[VD2](vertexAttrs.length)
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
activeSet)
}

def srcIds(i: Int): VertexId = local2global(localSrcIds(i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i => localId?


def dstIds(i: Int): VertexId = local2global(localDstIds(i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too - i => localId


/** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid)
Expand All @@ -92,11 +115,19 @@ class EdgePartition[
* @return a new edge partition with all edges reversed.
*/
def reverse: EdgePartition[ED, VD] = {
val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
for (e <- iterator) {
builder.add(e.dstId, e.srcId, e.attr)
val builder = new VertexPreservingEdgePartitionBuilder(
global2local, local2global, vertexAttrs, size)(classTag[ED], classTag[VD])
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val srcId = local2global(localSrcId)
val dstId = local2global(localDstId)
val attr = data(i)
builder.add(dstId, srcId, localDstId, localSrcId, attr)
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand Down Expand Up @@ -157,13 +188,25 @@ class EdgePartition[
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
val filtered = tripletIterator().filter(et =>
vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
val builder = new EdgePartitionBuilder[ED, VD]
for (e <- filtered) {
builder.add(e.srcId, e.dstId, e.attr)
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
var i = 0
while (i < size) {
// The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val et = new EdgeTriplet[VD, ED]
et.srcId = local2global(localSrcId)
et.dstId = local2global(localDstId)
et.srcAttr = vertexAttrs(localSrcId)
et.dstAttr = vertexAttrs(localDstId)
et.attr = data(i)
if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
}
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand All @@ -183,7 +226,8 @@ class EdgePartition[
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
val builder = new EdgePartitionBuilder[ED, VD]
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs)
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED]
Expand All @@ -193,7 +237,7 @@ class EdgePartition[
currAttr = merge(currAttr, data(i))
} else {
if (i > 0) {
builder.add(currSrcId, currDstId, currAttr)
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
}
currSrcId = srcIds(i)
currDstId = dstIds(i)
Expand All @@ -202,9 +246,9 @@ class EdgePartition[
i += 1
}
if (size > 0) {
builder.add(currSrcId, currDstId, currAttr)
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
Expand All @@ -220,7 +264,8 @@ class EdgePartition[
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
val builder = new EdgePartitionBuilder[ED3, VD]
val builder = new VertexPreservingEdgePartitionBuilder[ED3, VD](
global2local, local2global, vertexAttrs)
var i = 0
var j = 0
// For i = index of each edge in `this`...
Expand All @@ -233,20 +278,21 @@ class EdgePartition[
while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
// ... run `f` on the matching edge
builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
builder.add(srcId, dstId, localSrcIds(i), localDstIds(i),
f(srcId, dstId, this.data(i), other.data(j)))
}
}
i += 1
}
builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
builder.toEdgePartition.withActiveSet(activeSet)
}

/**
* The number of edges in this partition
*
* @return size of the partition
*/
val size: Int = srcIds.size
val size: Int = localSrcIds.size

/** The number of unique source vertices in the partition. */
def indexSize: Int = index.size
Expand Down Expand Up @@ -285,50 +331,116 @@ class EdgePartition[
}

/**
* Upgrade the given edge iterator into a triplet iterator.
* Send messages along edges and aggregate them at the receiving vertices. Implemented by scanning
* all edges sequentially and filtering them with `idPred`.
*
* @param mapFunc the edge map function which generates messages to neighboring vertices
* @param reduceFunc the combiner applied to messages destined to the same vertex
* @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source vertex attribute
* @param mapUsesDstAttr whether or not `mapFunc` uses the edge's destination vertex attribute
* @param idPred a predicate to filter edges based on their source and destination vertex ids
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
* @return iterator aggregated messages keyed by the receiving vertex id
*/
def upgradeIterator(
edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
: Iterator[EdgeTriplet[VD, ED]] = {
new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
def mapReduceTriplets[A: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to rename this mapRedueTripletsEdgeScan, to contrast with the other index scan one.

mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
mapUsesSrcAttr: Boolean,
mapUsesDstAttr: Boolean,
idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be great to do away with the closure here since this requires caller to understand the internals of EdgePartition. we can just have two boolean fields

val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

var edge = new EdgeTriplet[VD, ED]
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
if (idPred(srcId, dstId)) {
edge.srcId = srcId
edge.dstId = dstId
edge.attr = data(i)
if (mapUsesSrcAttr) { edge.srcAttr = vertexAttrs(localSrcId) }
if (mapUsesDstAttr) { edge.dstAttr = vertexAttrs(localDstId) }

mapFunc(edge).foreach { kv =>
val globalId = kv._1
val msg = kv._2
val localId = if (globalId == srcId) localSrcId else localDstId
if (bitset.get(localId)) {
aggregates(localId) = reduceFunc(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
}
i += 1
}

bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
}

/**
* Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
* Send messages along edges and aggregate them at the receiving vertices. Implemented by
* filtering the source vertex index with `srcIdPred`, then scanning edge clusters and filtering
* with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an edge to run.
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
*/
def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))

/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
* @param mapFunc the edge map function which generates messages to neighboring vertices
* @param reduceFunc the combiner applied to messages destined to the same vertex
* @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source vertex attribute
* @param mapUsesDstAttr whether or not `mapFunc` uses the edge's destination vertex attribute
* @param srcIdPred a predicate to filter edges based on their source vertex id
* @param dstIdPred a predicate to filter edges based on their destination vertex id
*
* Be careful not to keep references to the objects from this iterator. To improve GC performance
* the same object is re-used in `next()`.
* @return iterator aggregated messages keyed by the receiving vertex id
*/
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
def mapReduceTripletsWithIndex[A: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapReduceTripletsIndexScan

mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
mapUsesSrcAttr: Boolean,
mapUsesDstAttr: Boolean,
srcIdPred: VertexId => Boolean,
dstIdPred: VertexId => Boolean): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)

override def hasNext: Boolean = {
pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
}
var edge = new EdgeTriplet[VD, ED]
index.iterator.foreach { cluster =>
val clusterSrcId = cluster._1
val clusterPos = cluster._2
val clusterLocalSrcId = localSrcIds(clusterPos)
if (srcIdPred(clusterSrcId)) {
var pos = clusterPos
edge.srcId = clusterSrcId
if (mapUsesSrcAttr) { edge.srcAttr = vertexAttrs(clusterLocalSrcId) }
while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
val localDstId = localDstIds(pos)
val dstId = local2global(localDstId)
if (dstIdPred(dstId)) {
edge.dstId = dstId
edge.attr = data(pos)
if (mapUsesDstAttr) { edge.dstAttr = vertexAttrs(localDstId) }

override def next(): Edge[ED] = {
assert(srcIds(pos) == srcId)
edge.srcId = srcIds(pos)
edge.dstId = dstIds(pos)
edge.attr = data(pos)
pos += 1
edge
mapFunc(edge).foreach { kv =>
val globalId = kv._1
val msg = kv._2
val localId = if (globalId == clusterSrcId) clusterLocalSrcId else localDstId
if (bitset.get(localId)) {
aggregates(localId) = reduceFunc(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
}
pos += 1
}
}
}

bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
}
}
Loading