Skip to content

Commit 2e6b736

Browse files
maropuankurdave
authored andcommitted
[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro <[email protected]> Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
1 parent e895e0c commit 2e6b736

File tree

2 files changed

+64
-5
lines changed

2 files changed

+64
-5
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Edge.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.graphx
1919

20+
import org.apache.spark.util.collection.SortDataFormat
21+
2022
/**
2123
* A single directed edge consisting of a source id, target id,
2224
* and the data associated with the edge.
@@ -65,4 +67,32 @@ object Edge {
6567
else 1
6668
}
6769
}
70+
71+
private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] {
72+
override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
73+
data(pos)
74+
}
75+
76+
override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
77+
val tmp = data(pos0)
78+
data(pos0) = data(pos1)
79+
data(pos1) = tmp
80+
}
81+
82+
override def copyElement(
83+
src: Array[Edge[ED]], srcPos: Int,
84+
dst: Array[Edge[ED]], dstPos: Int) {
85+
dst(dstPos) = src(srcPos)
86+
}
87+
88+
override def copyRange(
89+
src: Array[Edge[ED]], srcPos: Int,
90+
dst: Array[Edge[ED]], dstPos: Int, length: Int) {
91+
System.arraycopy(src, srcPos, dst, dstPos, length)
92+
}
93+
94+
override def allocate(length: Int): Array[Edge[ED]] = {
95+
new Array[Edge[ED]](length)
96+
}
97+
}
6898
}

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.spark.graphx.impl
1919

2020
import scala.reflect.ClassTag
21-
import scala.util.Sorting
22-
23-
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
2421

2522
import org.apache.spark.graphx._
2623
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
24+
import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector}
2725

2826
/** Constructs an EdgePartition from scratch. */
2927
private[graphx]
@@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
3836

3937
def toEdgePartition: EdgePartition[ED, VD] = {
4038
val edgeArray = edges.trim().array
41-
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
39+
new Sorter(Edge.edgeArraySortDataFormat[ED])
40+
.sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
4241
val localSrcIds = new Array[Int](edgeArray.size)
4342
val localDstIds = new Array[Int](edgeArray.size)
4443
val data = new Array[ED](edgeArray.size)
@@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
9796

9897
def toEdgePartition: EdgePartition[ED, VD] = {
9998
val edgeArray = edges.trim().array
100-
Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
99+
new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
100+
.sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering)
101101
val localSrcIds = new Array[Int](edgeArray.size)
102102
val localDstIds = new Array[Int](edgeArray.size)
103103
val data = new Array[ED](edgeArray.size)
@@ -140,4 +140,33 @@ private[impl] object EdgeWithLocalIds {
140140
}
141141
}
142142

143+
private[graphx] def edgeArraySortDataFormat[ED]
144+
= new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] {
145+
override def getKey(
146+
data: Array[EdgeWithLocalIds[ED]], pos: Int): EdgeWithLocalIds[ED] = {
147+
data(pos)
148+
}
149+
150+
override def swap(data: Array[EdgeWithLocalIds[ED]], pos0: Int, pos1: Int): Unit = {
151+
val tmp = data(pos0)
152+
data(pos0) = data(pos1)
153+
data(pos1) = tmp
154+
}
155+
156+
override def copyElement(
157+
src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
158+
dst: Array[EdgeWithLocalIds[ED]], dstPos: Int) {
159+
dst(dstPos) = src(srcPos)
160+
}
161+
162+
override def copyRange(
163+
src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
164+
dst: Array[EdgeWithLocalIds[ED]], dstPos: Int, length: Int) {
165+
System.arraycopy(src, srcPos, dst, dstPos, length)
166+
}
167+
168+
override def allocate(length: Int): Array[EdgeWithLocalIds[ED]] = {
169+
new Array[EdgeWithLocalIds[ED]](length)
170+
}
171+
}
143172
}

0 commit comments

Comments
 (0)