Skip to content

Commit 4df6518

Browse files
sameeragarwalyhuai
authored andcommitted
[SPARK-14620][SQL] Use/benchmark a better hash in VectorizedHashMap
## What changes were proposed in this pull request? This PR uses a better hashing algorithm while probing the AggregateHashMap: ```java long h = 0 h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2); h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2); h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2); ... h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2); return h ``` Depends on: #12345 ## How was this patch tested? Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- codegen = F 2417 / 2457 8.7 115.2 1.0X codegen = T hashmap = F 1554 / 1581 13.5 74.1 1.6X codegen = T hashmap = T 877 / 929 23.9 41.8 2.8X Author: Sameer Agarwal <[email protected]> Closes #12379 from sameeragarwal/hash.
1 parent 8028a28 commit 4df6518

File tree

2 files changed

+66
-28
lines changed

2 files changed

+66
-28
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -86,28 +86,21 @@ class VectorizedHashMapGenerator(
8686
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
8787
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
8888
| private int[] buckets;
89-
| private int numBuckets;
90-
| private int maxSteps;
89+
| private int capacity = 1 << 16;
90+
| private double loadFactor = 0.5;
91+
| private int numBuckets = (int) (capacity / loadFactor);
92+
| private int maxSteps = 2;
9193
| private int numRows = 0;
9294
| private org.apache.spark.sql.types.StructType schema = $generatedSchema
9395
| private org.apache.spark.sql.types.StructType aggregateBufferSchema =
9496
| $generatedAggBufferSchema
9597
|
9698
| public $generatedClassName() {
97-
| // TODO: These should be generated based on the schema
98-
| int DEFAULT_CAPACITY = 1 << 16;
99-
| double DEFAULT_LOAD_FACTOR = 0.25;
100-
| int DEFAULT_MAX_STEPS = 2;
101-
| assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
102-
| this.maxSteps = DEFAULT_MAX_STEPS;
103-
| numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
104-
|
10599
| batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
106-
| org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
107-
|
100+
| org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
108101
| // TODO: Possibly generate this projection in TungstenAggregate directly
109102
| aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
110-
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
103+
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
111104
| for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
112105
| aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
113106
| }
@@ -130,9 +123,11 @@ class VectorizedHashMapGenerator(
130123
*/
131124
private def generateHashFunction(): String = {
132125
s"""
133-
|// TODO: Improve this hash function
134126
|private long hash($groupingKeySignature) {
135-
| return ${groupingKeys.map(_._2).mkString(" | ")};
127+
| long h = 0;
128+
| ${groupingKeys.map(key => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);")
129+
.mkString("\n")}
130+
| return h;
136131
|}
137132
""".stripMargin
138133
}
@@ -201,15 +196,20 @@ class VectorizedHashMapGenerator(
201196
| while (step < maxSteps) {
202197
| // Return bucket index if it's either an empty slot or already contains the key
203198
| if (buckets[idx] == -1) {
204-
| ${groupingKeys.zipWithIndex.map(k =>
205-
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
206-
| ${bufferValues.zipWithIndex.map(k =>
207-
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
208-
.mkString("\n")}
209-
| buckets[idx] = numRows++;
210-
| batch.setNumRows(numRows);
211-
| aggregateBufferBatch.setNumRows(numRows);
212-
| return aggregateBufferBatch.getRow(buckets[idx]);
199+
| if (numRows < capacity) {
200+
| ${groupingKeys.zipWithIndex.map(k =>
201+
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
202+
| ${bufferValues.zipWithIndex.map(k =>
203+
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
204+
.mkString("\n")}
205+
| buckets[idx] = numRows++;
206+
| batch.setNumRows(numRows);
207+
| aggregateBufferBatch.setNumRows(numRows);
208+
| return aggregateBufferBatch.getRow(buckets[idx]);
209+
| } else {
210+
| // No more space
211+
| return null;
212+
| }
213213
| } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) {
214214
| return aggregateBufferBatch.getRow(buckets[idx]);
215215
| }

sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
150150
*/
151151
}
152152

153-
ignore("aggregate with keys") {
153+
ignore("aggregate with linear keys") {
154154
val N = 20 << 20
155155

156156
val benchmark = new Benchmark("Aggregate w keys", N)
@@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
180180
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
181181
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
182182
-------------------------------------------------------------------------------------------
183-
codegen = F 2219 / 2392 9.4 105.8 1.0X
184-
codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X
185-
codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X
183+
codegen = F 2067 / 2166 10.1 98.6 1.0X
184+
codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X
185+
codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X
186+
*/
187+
}
188+
189+
ignore("aggregate with randomized keys") {
190+
val N = 20 << 20
191+
192+
val benchmark = new Benchmark("Aggregate w keys", N)
193+
sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")
194+
195+
def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()
196+
197+
benchmark.addCase(s"codegen = F") { iter =>
198+
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
199+
f()
200+
}
201+
202+
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
203+
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
204+
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
205+
f()
206+
}
207+
208+
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
209+
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
210+
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
211+
f()
212+
}
213+
214+
benchmark.run()
215+
216+
/*
217+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
218+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
219+
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
220+
-------------------------------------------------------------------------------------------
221+
codegen = F 2517 / 2608 8.3 120.0 1.0X
222+
codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X
223+
codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X
186224
*/
187225
}
188226

0 commit comments

Comments
 (0)