diff --git a/sql/core/benchmarks/DatasetBenchmark-results.txt b/sql/core/benchmarks/DatasetBenchmark-results.txt new file mode 100644 index 000000000000..dcc190eb45c0 --- /dev/null +++ b/sql/core/benchmarks/DatasetBenchmark-results.txt @@ -0,0 +1,46 @@ +================================================================================================ +Dataset Benchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +RDD 11800 / 12042 8.5 118.0 1.0X +DataFrame 1927 / 2189 51.9 19.3 6.1X +Dataset 2483 / 2605 40.3 24.8 4.8X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +RDD 16286 / 16301 6.1 162.9 1.0X +DataFrame 8101 / 8104 12.3 81.0 2.0X +Dataset 17445 / 17811 5.7 174.4 0.9X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +RDD 2971 / 3184 33.7 29.7 1.0X +DataFrame 1243 / 1296 80.5 12.4 2.4X +Dataset 3062 / 3091 32.7 30.6 1.0X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +RDD 5253 / 5269 19.0 52.5 1.0X +DataFrame 211 / 234 473.4 2.1 24.9X +Dataset 9550 / 9552 10.5 95.5 0.6X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +RDD sum 5086 / 5108 19.7 50.9 1.0X +DataFrame sum 65 / 73 1548.9 0.6 78.8X +Dataset sum using Aggregator 9024 / 9320 11.1 90.2 0.6X +Dataset complex Aggregator 15079 / 15171 6.6 150.8 0.3X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index fa2f0b6ba61d..e3df449b41f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ @@ -26,8 +26,15 @@ import org.apache.spark.sql.types.StringType /** * Benchmark for Dataset typed operations comparing with DataFrame and RDD versions. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/DatasetBenchmark-results.txt". + * }}} */ -object DatasetBenchmark { +object DatasetBenchmark extends SqlBasedBenchmark { case class Data(l: Long, s: String) @@ -39,7 +46,7 @@ object DatasetBenchmark { val df = ds.toDF("l") val func = (l: Long) => l + 1 - val benchmark = new Benchmark("back-to-back map long", numRows) + val benchmark = new Benchmark("back-to-back map long", numRows, output = output) benchmark.addCase("RDD") { iter => var res = rdd @@ -78,7 +85,7 @@ object DatasetBenchmark { import spark.implicits._ val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) - val benchmark = new Benchmark("back-to-back map", numRows) + val benchmark = new Benchmark("back-to-back map", numRows, output = output) val func = (d: Data) => Data(d.l + 1, d.s) val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) @@ -123,7 +130,7 @@ object DatasetBenchmark { val df = ds.toDF("l") val func = (l: Long) => l % 2L == 0L - val benchmark = new Benchmark("back-to-back filter Long", numRows) + val benchmark = new Benchmark("back-to-back filter Long", numRows, output = output) benchmark.addCase("RDD") { iter => var res = rdd @@ -162,7 +169,7 @@ object DatasetBenchmark { import spark.implicits._ val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) - val benchmark = new Benchmark("back-to-back filter", numRows) + val benchmark = new Benchmark("back-to-back filter", numRows, output = output) val func = (d: Data, i: Int) => d.l % (100L + i) == 0L val funcs = 0.until(numChains).map { i => (d: Data) => func(d, i) @@ -220,7 +227,7 @@ object DatasetBenchmark { import spark.implicits._ val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) - val benchmark = new Benchmark("aggregate", numRows) + val benchmark = new Benchmark("aggregate", numRows, output = output) val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) benchmark.addCase("RDD sum") { iter => @@ -242,75 +249,22 @@ object DatasetBenchmark { benchmark } - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder + override def getSparkSession: SparkSession = { + SparkSession.builder .master("local[*]") .appName("Dataset benchmark") .getOrCreate() + } + override def runBenchmarkSuite(): Unit = { val numRows = 100000000 val numChains = 10 - - val benchmark0 = backToBackMapLong(spark, numRows, numChains) - val benchmark1 = backToBackMap(spark, numRows, numChains) - val benchmark2 = backToBackFilterLong(spark, numRows, numChains) - val benchmark3 = backToBackFilter(spark, numRows, numChains) - val benchmark4 = aggregate(spark, numRows) - - /* - OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic - Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz - back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - RDD 1883 / 1892 53.1 18.8 1.0X - DataFrame 502 / 642 199.1 5.0 3.7X - Dataset 657 / 784 152.2 6.6 2.9X - */ - benchmark0.run() - - /* - OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 - Intel Xeon E3-12xx v2 (Ivy Bridge) - back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - RDD 3448 / 3646 29.0 34.5 1.0X - DataFrame 2647 / 3116 37.8 26.5 1.3X - Dataset 4781 / 5155 20.9 47.8 0.7X - */ - benchmark1.run() - - /* - OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic - Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz - back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - RDD 846 / 1120 118.1 8.5 1.0X - DataFrame 270 / 329 370.9 2.7 3.1X - Dataset 545 / 789 183.5 5.4 1.6X - */ - benchmark2.run() - - /* - OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 - Intel Xeon E3-12xx v2 (Ivy Bridge) - back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - RDD 1346 / 1618 74.3 13.5 1.0X - DataFrame 59 / 72 1695.4 0.6 22.8X - Dataset 2777 / 2805 36.0 27.8 0.5X - */ - benchmark3.run() - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - RDD sum 1913 / 1942 52.3 19.1 1.0X - DataFrame sum 46 / 61 2157.7 0.5 41.3X - Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X - Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X - */ - benchmark4.run() + runBenchmark("Dataset Benchmark") { + backToBackMapLong(spark, numRows, numChains).run() + backToBackMap(spark, numRows, numChains).run() + backToBackFilterLong(spark, numRows, numChains).run() + backToBackFilter(spark, numRows, numChains).run() + aggregate(spark, numRows).run() + } } }