Skip to content

Commit e0f8ecc

Browse files
committed
Take activeSet in ExistingEdgePartitionBuilder
Also rename VertexPreservingEdgePartitionBuilder to ExistingEdgePartitionBuilder to better reflect its usage.
1 parent c85076d commit e0f8ecc

File tree

2 files changed

+19
-24
lines changed

2 files changed

+19
-24
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,6 @@ class EdgePartition[
7171
Some(activeSet))
7272
}
7373

74-
/** Return a new `EdgePartition` with the specified active set. */
75-
def withActiveSet(activeSet: Option[VertexSet]): EdgePartition[ED, VD] = {
76-
new EdgePartition(
77-
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
78-
}
79-
8074
/** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
8175
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
8276
val newVertexAttrs = new Array[VD](vertexAttrs.length)
@@ -116,8 +110,8 @@ class EdgePartition[
116110
* @return a new edge partition with all edges reversed.
117111
*/
118112
def reverse: EdgePartition[ED, VD] = {
119-
val builder = new VertexPreservingEdgePartitionBuilder(
120-
global2local, local2global, vertexAttrs, size)(classTag[ED], classTag[VD])
113+
val builder = new ExistingEdgePartitionBuilder[ED, VD](
114+
global2local, local2global, vertexAttrs, activeSet, size)
121115
var i = 0
122116
while (i < size) {
123117
val localSrcId = localSrcIds(i)
@@ -128,7 +122,7 @@ class EdgePartition[
128122
builder.add(dstId, srcId, localDstId, localSrcId, attr)
129123
i += 1
130124
}
131-
builder.toEdgePartition.withActiveSet(activeSet)
125+
builder.toEdgePartition
132126
}
133127

134128
/**
@@ -189,8 +183,8 @@ class EdgePartition[
189183
def filter(
190184
epred: EdgeTriplet[VD, ED] => Boolean,
191185
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
192-
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
193-
global2local, local2global, vertexAttrs)
186+
val builder = new ExistingEdgePartitionBuilder[ED, VD](
187+
global2local, local2global, vertexAttrs, activeSet)
194188
var i = 0
195189
while (i < size) {
196190
// The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
@@ -207,7 +201,7 @@ class EdgePartition[
207201
}
208202
i += 1
209203
}
210-
builder.toEdgePartition.withActiveSet(activeSet)
204+
builder.toEdgePartition
211205
}
212206

213207
/**
@@ -227,8 +221,8 @@ class EdgePartition[
227221
* @return a new edge partition without duplicate edges
228222
*/
229223
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
230-
val builder = new VertexPreservingEdgePartitionBuilder[ED, VD](
231-
global2local, local2global, vertexAttrs)
224+
val builder = new ExistingEdgePartitionBuilder[ED, VD](
225+
global2local, local2global, vertexAttrs, activeSet)
232226
var currSrcId: VertexId = null.asInstanceOf[VertexId]
233227
var currDstId: VertexId = null.asInstanceOf[VertexId]
234228
var currLocalSrcId = -1
@@ -260,7 +254,7 @@ class EdgePartition[
260254
if (size > 0) {
261255
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
262256
}
263-
builder.toEdgePartition.withActiveSet(activeSet)
257+
builder.toEdgePartition
264258
}
265259

266260
/**
@@ -276,8 +270,8 @@ class EdgePartition[
276270
def innerJoin[ED2: ClassTag, ED3: ClassTag]
277271
(other: EdgePartition[ED2, _])
278272
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
279-
val builder = new VertexPreservingEdgePartitionBuilder[ED3, VD](
280-
global2local, local2global, vertexAttrs)
273+
val builder = new ExistingEdgePartitionBuilder[ED3, VD](
274+
global2local, local2global, vertexAttrs, activeSet)
281275
var i = 0
282276
var j = 0
283277
// For i = index of each edge in `this`...
@@ -296,7 +290,7 @@ class EdgePartition[
296290
}
297291
i += 1
298292
}
299-
builder.toEdgePartition.withActiveSet(activeSet)
293+
builder.toEdgePartition
300294
}
301295

302296
/**

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,15 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
7777

7878
/**
7979
* Constructs an EdgePartition from an existing EdgePartition with the same vertex set. This enables
80-
* reuse of the local vertex ids.
80+
* reuse of the local vertex ids. Intended for internal use in EdgePartition only.
8181
*/
82-
private[graphx]
83-
class VertexPreservingEdgePartitionBuilder[
82+
private[impl]
83+
class ExistingEdgePartitionBuilder[
8484
@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
8585
global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
8686
local2global: Array[VertexId],
8787
vertexAttrs: Array[VD],
88+
activeSet: Option[VertexSet],
8889
size: Int = 64) {
8990
var edges = new PrimitiveVector[EdgeWithLocalIds[ED]](size)
9091

@@ -119,14 +120,14 @@ class VertexPreservingEdgePartitionBuilder[
119120
}
120121

121122
new EdgePartition(
122-
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs)
123+
localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
123124
}
124125
}
125126

126-
private[graphx] case class EdgeWithLocalIds[@specialized ED](
127+
private[impl] case class EdgeWithLocalIds[@specialized ED](
127128
srcId: VertexId, dstId: VertexId, localSrcId: Int, localDstId: Int, attr: ED)
128129

129-
private[graphx] object EdgeWithLocalIds {
130+
private[impl] object EdgeWithLocalIds {
130131
implicit def lexicographicOrdering[ED] = new Ordering[EdgeWithLocalIds[ED]] {
131132
override def compare(a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]): Int = {
132133
if (a.srcId == b.srcId) {

0 commit comments

Comments
 (0)