Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions sql/core/benchmarks/DatasetBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
================================================================================================
Dataset Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 11800 / 12042 8.5 118.0 1.0X
DataFrame 1927 / 2189 51.9 19.3 6.1X
Dataset 2483 / 2605 40.3 24.8 4.8X

OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 16286 / 16301 6.1 162.9 1.0X
DataFrame 8101 / 8104 12.3 81.0 2.0X
Dataset 17445 / 17811 5.7 174.4 0.9X

OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 2971 / 3184 33.7 29.7 1.0X
DataFrame 1243 / 1296 80.5 12.4 2.4X
Dataset 3062 / 3091 32.7 30.6 1.0X

OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 5253 / 5269 19.0 52.5 1.0X
DataFrame 211 / 234 473.4 2.1 24.9X
Dataset 9550 / 9552 10.5 95.5 0.6X

OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD sum 5086 / 5108 19.7 50.9 1.0X
DataFrame sum 65 / 73 1548.9 0.6 78.8X
Dataset sum using Aggregator 9024 / 9320 11.1 90.2 0.6X
Dataset complex Aggregator 15079 / 15171 6.6 150.8 0.3X


96 changes: 25 additions & 71 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@

package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

/**
* Benchmark for Dataset typed operations comparing with DataFrame and RDD versions.
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/DatasetBenchmark-results.txt".
* }}}
*/
object DatasetBenchmark {
object DatasetBenchmark extends SqlBasedBenchmark {

case class Data(l: Long, s: String)

Expand All @@ -39,7 +46,7 @@ object DatasetBenchmark {
val df = ds.toDF("l")
val func = (l: Long) => l + 1

val benchmark = new Benchmark("back-to-back map long", numRows)
val benchmark = new Benchmark("back-to-back map long", numRows, output = output)

benchmark.addCase("RDD") { iter =>
var res = rdd
Expand Down Expand Up @@ -78,7 +85,7 @@ object DatasetBenchmark {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("back-to-back map", numRows)
val benchmark = new Benchmark("back-to-back map", numRows, output = output)
val func = (d: Data) => Data(d.l + 1, d.s)

val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
Expand Down Expand Up @@ -123,7 +130,7 @@ object DatasetBenchmark {
val df = ds.toDF("l")
val func = (l: Long) => l % 2L == 0L

val benchmark = new Benchmark("back-to-back filter Long", numRows)
val benchmark = new Benchmark("back-to-back filter Long", numRows, output = output)

benchmark.addCase("RDD") { iter =>
var res = rdd
Expand Down Expand Up @@ -162,7 +169,7 @@ object DatasetBenchmark {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("back-to-back filter", numRows)
val benchmark = new Benchmark("back-to-back filter", numRows, output = output)
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
val funcs = 0.until(numChains).map { i =>
(d: Data) => func(d, i)
Expand Down Expand Up @@ -220,7 +227,7 @@ object DatasetBenchmark {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val benchmark = new Benchmark("aggregate", numRows)
val benchmark = new Benchmark("aggregate", numRows, output = output)

val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD sum") { iter =>
Expand All @@ -242,75 +249,22 @@ object DatasetBenchmark {
benchmark
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
override def getSparkSession: SparkSession = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need override default SparkSession as default SparkSession is:

    SparkSession.builder()
      .master("local[1]")
      .appName(this.getClass.getCanonicalName)
      .config(SQLConf.SHUFFLE_PARTITIONS.key, 1)
      .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1)
      .getOrCreate()

SparkSession.builder
.master("local[*]")
.appName("Dataset benchmark")
.getOrCreate()
}

override def runBenchmarkSuite(): Unit = {
val numRows = 100000000
val numChains = 10

val benchmark0 = backToBackMapLong(spark, numRows, numChains)
val benchmark1 = backToBackMap(spark, numRows, numChains)
val benchmark2 = backToBackFilterLong(spark, numRows, numChains)
val benchmark3 = backToBackFilter(spark, numRows, numChains)
val benchmark4 = aggregate(spark, numRows)

/*
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 1883 / 1892 53.1 18.8 1.0X
DataFrame 502 / 642 199.1 5.0 3.7X
Dataset 657 / 784 152.2 6.6 2.9X
*/
benchmark0.run()

/*
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 3448 / 3646 29.0 34.5 1.0X
DataFrame 2647 / 3116 37.8 26.5 1.3X
Dataset 4781 / 5155 20.9 47.8 0.7X
*/
benchmark1.run()

/*
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 846 / 1120 118.1 8.5 1.0X
DataFrame 270 / 329 370.9 2.7 3.1X
Dataset 545 / 789 183.5 5.4 1.6X
*/
benchmark2.run()

/*
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 1346 / 1618 74.3 13.5 1.0X
DataFrame 59 / 72 1695.4 0.6 22.8X
Dataset 2777 / 2805 36.0 27.8 0.5X
*/
benchmark3.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD sum 1913 / 1942 52.3 19.1 1.0X
DataFrame sum 46 / 61 2157.7 0.5 41.3X
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
*/
benchmark4.run()
runBenchmark("Dataset Benchmark") {
backToBackMapLong(spark, numRows, numChains).run()
backToBackMap(spark, numRows, numChains).run()
backToBackFilterLong(spark, numRows, numChains).run()
backToBackFilter(spark, numRows, numChains).run()
aggregate(spark, numRows).run()
}
}
}