From 2fa2cf544407af9706c3b7e6f42b9909e2f287b4 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 31 Mar 2016 14:15:34 -0700 Subject: [PATCH 1/3] [SPARK-14394] --- .../aggregate/TungstenAggregateHashMap.scala | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala new file mode 100644 index 000000000000..d08dece93e44 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.StructType + +class TungstenAggregateHashMap( + ctx: CodegenContext, + generatedClassName: String, + groupingKeySchema: StructType, + bufferSchema: StructType) { + val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) + val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) + val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") + + def generateAggregateHashMap(): String = { + + s""" + |public class $generatedClassName { + |${initializeAggregateHashMap()} + | + |${generateFindOrInsert()} + | + |${generateEquals()} + | + |${generateHashFunction()} + |} + """.stripMargin + } + + def initializeAggregateHashMap(): String = { + val generatedSchema: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${(groupingKeySchema ++ bufferSchema).map(key => + s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") + .mkString("\n")}; + """.stripMargin + + s""" + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | private int[] buckets; + | private int numBuckets; + | private int maxSteps; + | private int numRows = 0; + | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | + | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { + | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); + | this.maxSteps = maxSteps; + | numBuckets = (int) (capacity / loadFactor); + | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, + | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); + | buckets = new int[numBuckets]; + | java.util.Arrays.fill(buckets, -1); + | } + | + | public $generatedClassName() { + | new $generatedClassName(1 << 16, 0.25, 5); + | } + """.stripMargin + } + + def generateHashFunction(): String = { + s""" + |// TODO: Improve this Hash Function + |private long hash($groupingKeySignature) { + | return ${groupingKeys.map(_._2).mkString(" & ")}; + |} + """.stripMargin + } + + def generateEquals(): String = { + s""" + |private boolean equals(int idx, $groupingKeySignature) { + | return ${groupingKeys.zipWithIndex.map(key => + s"batch.column(${key._2}).getLong(buckets[idx]) == ${key._1._2}").mkString(" && ")}; + |} + """.stripMargin + } + + def generateFindOrInsert(): String = { + s""" + |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(${ + groupingKeySignature}) { + | int idx = find(${groupingKeys.map(_._2).mkString(", ")}); + | if (idx != -1 && buckets[idx] == -1) { + | ${groupingKeys.zipWithIndex.map(key => + s"batch.column(${key._2}).putLong(numRows, ${key._1._2});").mkString("\n")} + | ${bufferValues.zipWithIndex.map(key => + s"batch.column(${groupingKeys.length + key._2}).putLong(numRows, 0);") + .mkString("\n")} + | buckets[idx] = numRows++; + | } + | return batch.getRow(buckets[idx]); + |} + | + |private int find($groupingKeySignature) { + | long h = hash(${groupingKeys.map(_._2).mkString(", ")}); + | int step = 0; + | int idx = (int) h & (numBuckets - 1); + | while (step < maxSteps) { + | // Return bucket index if it's either an empty slot or already contains the key + | if (buckets[idx] == -1) { + | return idx; + | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { + | return idx; + | } + | idx = (idx + 1) & (numBuckets - 1); + | step++; + | } + |// Didn't find it + |return -1; + |} + """.stripMargin + } +} From 551a4db5a2790172fb204d4a847983e72ed34c6e Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 6 Apr 2016 17:37:08 -0700 Subject: [PATCH 2/3] hash function --- .../aggregate/TungstenAggregateHashMap.scala | 133 ------------------ .../VectorizedHashMapGenerator.scala | 5 +- .../BenchmarkWholeStageCodegen.scala | 46 +++++- 3 files changed, 45 insertions(+), 139 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala deleted file mode 100644 index d08dece93e44..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.aggregate - -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -import org.apache.spark.sql.types.StructType - -class TungstenAggregateHashMap( - ctx: CodegenContext, - generatedClassName: String, - groupingKeySchema: StructType, - bufferSchema: StructType) { - val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) - val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) - val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") - - def generateAggregateHashMap(): String = { - - s""" - |public class $generatedClassName { - |${initializeAggregateHashMap()} - | - |${generateFindOrInsert()} - | - |${generateEquals()} - | - |${generateHashFunction()} - |} - """.stripMargin - } - - def initializeAggregateHashMap(): String = { - val generatedSchema: String = - s""" - |new org.apache.spark.sql.types.StructType() - |${(groupingKeySchema ++ bufferSchema).map(key => - s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") - .mkString("\n")}; - """.stripMargin - - s""" - | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; - | private int[] buckets; - | private int numBuckets; - | private int maxSteps; - | private int numRows = 0; - | private org.apache.spark.sql.types.StructType schema = $generatedSchema - | - | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { - | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); - | this.maxSteps = maxSteps; - | numBuckets = (int) (capacity / loadFactor); - | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); - | buckets = new int[numBuckets]; - | java.util.Arrays.fill(buckets, -1); - | } - | - | public $generatedClassName() { - | new $generatedClassName(1 << 16, 0.25, 5); - | } - """.stripMargin - } - - def generateHashFunction(): String = { - s""" - |// TODO: Improve this Hash Function - |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" & ")}; - |} - """.stripMargin - } - - def generateEquals(): String = { - s""" - |private boolean equals(int idx, $groupingKeySignature) { - | return ${groupingKeys.zipWithIndex.map(key => - s"batch.column(${key._2}).getLong(buckets[idx]) == ${key._1._2}").mkString(" && ")}; - |} - """.stripMargin - } - - def generateFindOrInsert(): String = { - s""" - |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(${ - groupingKeySignature}) { - | int idx = find(${groupingKeys.map(_._2).mkString(", ")}); - | if (idx != -1 && buckets[idx] == -1) { - | ${groupingKeys.zipWithIndex.map(key => - s"batch.column(${key._2}).putLong(numRows, ${key._1._2});").mkString("\n")} - | ${bufferValues.zipWithIndex.map(key => - s"batch.column(${groupingKeys.length + key._2}).putLong(numRows, 0);") - .mkString("\n")} - | buckets[idx] = numRows++; - | } - | return batch.getRow(buckets[idx]); - |} - | - |private int find($groupingKeySignature) { - | long h = hash(${groupingKeys.map(_._2).mkString(", ")}); - | int step = 0; - | int idx = (int) h & (numBuckets - 1); - | while (step < maxSteps) { - | // Return bucket index if it's either an empty slot or already contains the key - | if (buckets[idx] == -1) { - | return idx; - | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { - | return idx; - | } - | idx = (idx + 1) & (numBuckets - 1); - | step++; - | } - |// Didn't find it - |return -1; - |} - """.stripMargin - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 395cc7ab9170..ca3c038d372f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -130,9 +130,10 @@ class VectorizedHashMapGenerator( */ private def generateHashFunction(): String = { s""" - |// TODO: Improve this hash function |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" | ")}; + | long h = 0; + | ${groupingKeys.map(key => s"h = (h << 5) - h + ${key._2};").mkString("\n")} + | return h; |} """.stripMargin } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index d23f19c48063..e0e46506e15b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("aggregate with keys") { + ignore("aggregate with linear keys") { val N = 20 << 20 val benchmark = new Benchmark("Aggregate w keys", N) @@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - codegen = F 2219 / 2392 9.4 105.8 1.0X - codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X - codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X + codegen = F 2323 / 2567 9.0 110.8 1.0X + codegen = T hashmap = F 1182 / 1246 17.7 56.4 2.0X + codegen = T hashmap = T 381 / 489 55.0 18.2 6.1X + */ + } + + ignore("aggregate with randomized keys") { + val N = 20 << 20 + + val benchmark = new Benchmark("Aggregate w keys", N) + sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test") + + def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() + + benchmark.addCase(s"codegen = F") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + f() + } + + benchmark.addCase(s"codegen = T hashmap = F") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + f() + } + + benchmark.addCase(s"codegen = T hashmap = T") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + f() + } + + benchmark.run() + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + codegen = F 2417 / 2457 8.7 115.2 1.0X + codegen = T hashmap = F 1554 / 1581 13.5 74.1 1.6X + codegen = T hashmap = T 877 / 929 23.9 41.8 2.8X */ } From 1562d46af39b8e78d8265f046f52f1753cb484a7 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 15 Apr 2016 11:58:11 -0700 Subject: [PATCH 3/3] CR --- .../VectorizedHashMapGenerator.scala | 45 +++++++++---------- .../BenchmarkWholeStageCodegen.scala | 12 ++--- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index ca3c038d372f..dd9b2f097e12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -86,28 +86,21 @@ class VectorizedHashMapGenerator( | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; - | private int numBuckets; - | private int maxSteps; + | private int capacity = 1 << 16; + | private double loadFactor = 0.5; + | private int numBuckets = (int) (capacity / loadFactor); + | private int maxSteps = 2; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema | private org.apache.spark.sql.types.StructType aggregateBufferSchema = | $generatedAggBufferSchema | | public $generatedClassName() { - | // TODO: These should be generated based on the schema - | int DEFAULT_CAPACITY = 1 << 16; - | double DEFAULT_LOAD_FACTOR = 0.25; - | int DEFAULT_MAX_STEPS = 2; - | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); - | this.maxSteps = DEFAULT_MAX_STEPS; - | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); - | | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); - | + | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | // TODO: Possibly generate this projection in TungstenAggregate directly | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( - | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); | } @@ -132,7 +125,8 @@ class VectorizedHashMapGenerator( s""" |private long hash($groupingKeySignature) { | long h = 0; - | ${groupingKeys.map(key => s"h = (h << 5) - h + ${key._2};").mkString("\n")} + | ${groupingKeys.map(key => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);") + .mkString("\n")} | return h; |} """.stripMargin @@ -202,15 +196,20 @@ class VectorizedHashMapGenerator( | while (step < maxSteps) { | // Return bucket index if it's either an empty slot or already contains the key | if (buckets[idx] == -1) { - | ${groupingKeys.zipWithIndex.map(k => - s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")} - | ${bufferValues.zipWithIndex.map(k => - s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);") - .mkString("\n")} - | buckets[idx] = numRows++; - | batch.setNumRows(numRows); - | aggregateBufferBatch.setNumRows(numRows); - | return aggregateBufferBatch.getRow(buckets[idx]); + | if (numRows < capacity) { + | ${groupingKeys.zipWithIndex.map(k => + s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")} + | ${bufferValues.zipWithIndex.map(k => + s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);") + .mkString("\n")} + | buckets[idx] = numRows++; + | batch.setNumRows(numRows); + | aggregateBufferBatch.setNumRows(numRows); + | return aggregateBufferBatch.getRow(buckets[idx]); + | } else { + | // No more space + | return null; + | } | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { | return aggregateBufferBatch.getRow(buckets[idx]); | } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index e0e46506e15b..3fb70f2eb6ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -180,9 +180,9 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - codegen = F 2323 / 2567 9.0 110.8 1.0X - codegen = T hashmap = F 1182 / 1246 17.7 56.4 2.0X - codegen = T hashmap = T 381 / 489 55.0 18.2 6.1X + codegen = F 2067 / 2166 10.1 98.6 1.0X + codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X + codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X */ } @@ -218,9 +218,9 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - codegen = F 2417 / 2457 8.7 115.2 1.0X - codegen = T hashmap = F 1554 / 1581 13.5 74.1 1.6X - codegen = T hashmap = T 877 / 929 23.9 41.8 2.8X + codegen = F 2517 / 2608 8.3 120.0 1.0X + codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X + codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X */ }