Skip to content

Commit 95ae209

Browse files
[SPARK-25479][TEST] Refactor DatasetBenchmark to use main method
## What changes were proposed in this pull request? Refactor `DatasetBenchmark` to use main method. Generate benchmark result: ```sh SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.DatasetBenchmark" ``` ## How was this patch tested? manual tests Closes #22488 from wangyum/SPARK-25479. Lead-authored-by: Yuming Wang <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 71c24aa commit 95ae209

File tree

2 files changed

+71
-71
lines changed

2 files changed

+71
-71
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
================================================================================================
2+
Dataset Benchmark
3+
================================================================================================
4+
5+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
6+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
7+
back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------
9+
RDD 11800 / 12042 8.5 118.0 1.0X
10+
DataFrame 1927 / 2189 51.9 19.3 6.1X
11+
Dataset 2483 / 2605 40.3 24.8 4.8X
12+
13+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
14+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
15+
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
16+
------------------------------------------------------------------------------------------------
17+
RDD 16286 / 16301 6.1 162.9 1.0X
18+
DataFrame 8101 / 8104 12.3 81.0 2.0X
19+
Dataset 17445 / 17811 5.7 174.4 0.9X
20+
21+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
22+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
23+
back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
24+
------------------------------------------------------------------------------------------------
25+
RDD 2971 / 3184 33.7 29.7 1.0X
26+
DataFrame 1243 / 1296 80.5 12.4 2.4X
27+
Dataset 3062 / 3091 32.7 30.6 1.0X
28+
29+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
30+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
31+
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
32+
------------------------------------------------------------------------------------------------
33+
RDD 5253 / 5269 19.0 52.5 1.0X
34+
DataFrame 211 / 234 473.4 2.1 24.9X
35+
Dataset 9550 / 9552 10.5 95.5 0.6X
36+
37+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
38+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
39+
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
40+
------------------------------------------------------------------------------------------------
41+
RDD sum 5086 / 5108 19.7 50.9 1.0X
42+
DataFrame sum 65 / 73 1548.9 0.6 78.8X
43+
Dataset sum using Aggregator 9024 / 9320 11.1 90.2 0.6X
44+
Dataset complex Aggregator 15079 / 15171 6.6 150.8 0.3X
45+
46+

sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala

Lines changed: 25 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,24 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.{SparkConf, SparkContext}
2120
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
2222
import org.apache.spark.sql.expressions.Aggregator
2323
import org.apache.spark.sql.expressions.scalalang.typed
2424
import org.apache.spark.sql.functions._
2525
import org.apache.spark.sql.types.StringType
2626

2727
/**
2828
* Benchmark for Dataset typed operations comparing with DataFrame and RDD versions.
29+
* To run this benchmark:
30+
* {{{
31+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
32+
* 2. build/sbt "sql/test:runMain <this class>"
33+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
34+
* Results will be written to "benchmarks/DatasetBenchmark-results.txt".
35+
* }}}
2936
*/
30-
object DatasetBenchmark {
37+
object DatasetBenchmark extends SqlBasedBenchmark {
3138

3239
case class Data(l: Long, s: String)
3340

@@ -39,7 +46,7 @@ object DatasetBenchmark {
3946
val df = ds.toDF("l")
4047
val func = (l: Long) => l + 1
4148

42-
val benchmark = new Benchmark("back-to-back map long", numRows)
49+
val benchmark = new Benchmark("back-to-back map long", numRows, output = output)
4350

4451
benchmark.addCase("RDD") { iter =>
4552
var res = rdd
@@ -78,7 +85,7 @@ object DatasetBenchmark {
7885
import spark.implicits._
7986

8087
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
81-
val benchmark = new Benchmark("back-to-back map", numRows)
88+
val benchmark = new Benchmark("back-to-back map", numRows, output = output)
8289
val func = (d: Data) => Data(d.l + 1, d.s)
8390

8491
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
@@ -123,7 +130,7 @@ object DatasetBenchmark {
123130
val df = ds.toDF("l")
124131
val func = (l: Long) => l % 2L == 0L
125132

126-
val benchmark = new Benchmark("back-to-back filter Long", numRows)
133+
val benchmark = new Benchmark("back-to-back filter Long", numRows, output = output)
127134

128135
benchmark.addCase("RDD") { iter =>
129136
var res = rdd
@@ -162,7 +169,7 @@ object DatasetBenchmark {
162169
import spark.implicits._
163170

164171
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
165-
val benchmark = new Benchmark("back-to-back filter", numRows)
172+
val benchmark = new Benchmark("back-to-back filter", numRows, output = output)
166173
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
167174
val funcs = 0.until(numChains).map { i =>
168175
(d: Data) => func(d, i)
@@ -220,7 +227,7 @@ object DatasetBenchmark {
220227
import spark.implicits._
221228

222229
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
223-
val benchmark = new Benchmark("aggregate", numRows)
230+
val benchmark = new Benchmark("aggregate", numRows, output = output)
224231

225232
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
226233
benchmark.addCase("RDD sum") { iter =>
@@ -242,75 +249,22 @@ object DatasetBenchmark {
242249
benchmark
243250
}
244251

245-
def main(args: Array[String]): Unit = {
246-
val spark = SparkSession.builder
252+
override def getSparkSession: SparkSession = {
253+
SparkSession.builder
247254
.master("local[*]")
248255
.appName("Dataset benchmark")
249256
.getOrCreate()
257+
}
250258

259+
override def runBenchmarkSuite(): Unit = {
251260
val numRows = 100000000
252261
val numChains = 10
253-
254-
val benchmark0 = backToBackMapLong(spark, numRows, numChains)
255-
val benchmark1 = backToBackMap(spark, numRows, numChains)
256-
val benchmark2 = backToBackFilterLong(spark, numRows, numChains)
257-
val benchmark3 = backToBackFilter(spark, numRows, numChains)
258-
val benchmark4 = aggregate(spark, numRows)
259-
260-
/*
261-
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
262-
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
263-
back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
264-
------------------------------------------------------------------------------------------------
265-
RDD 1883 / 1892 53.1 18.8 1.0X
266-
DataFrame 502 / 642 199.1 5.0 3.7X
267-
Dataset 657 / 784 152.2 6.6 2.9X
268-
*/
269-
benchmark0.run()
270-
271-
/*
272-
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
273-
Intel Xeon E3-12xx v2 (Ivy Bridge)
274-
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
275-
------------------------------------------------------------------------------------------------
276-
RDD 3448 / 3646 29.0 34.5 1.0X
277-
DataFrame 2647 / 3116 37.8 26.5 1.3X
278-
Dataset 4781 / 5155 20.9 47.8 0.7X
279-
*/
280-
benchmark1.run()
281-
282-
/*
283-
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
284-
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
285-
back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
286-
------------------------------------------------------------------------------------------------
287-
RDD 846 / 1120 118.1 8.5 1.0X
288-
DataFrame 270 / 329 370.9 2.7 3.1X
289-
Dataset 545 / 789 183.5 5.4 1.6X
290-
*/
291-
benchmark2.run()
292-
293-
/*
294-
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
295-
Intel Xeon E3-12xx v2 (Ivy Bridge)
296-
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
297-
------------------------------------------------------------------------------------------------
298-
RDD 1346 / 1618 74.3 13.5 1.0X
299-
DataFrame 59 / 72 1695.4 0.6 22.8X
300-
Dataset 2777 / 2805 36.0 27.8 0.5X
301-
*/
302-
benchmark3.run()
303-
304-
/*
305-
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
306-
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
307-
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
308-
------------------------------------------------------------------------------------------------
309-
RDD sum 1913 / 1942 52.3 19.1 1.0X
310-
DataFrame sum 46 / 61 2157.7 0.5 41.3X
311-
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
312-
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
313-
*/
314-
benchmark4.run()
262+
runBenchmark("Dataset Benchmark") {
263+
backToBackMapLong(spark, numRows, numChains).run()
264+
backToBackMap(spark, numRows, numChains).run()
265+
backToBackFilterLong(spark, numRows, numChains).run()
266+
backToBackFilter(spark, numRows, numChains).run()
267+
aggregate(spark, numRows).run()
268+
}
315269
}
316270
}

0 commit comments

Comments
 (0)