diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt index 99937309a414..477429430cdd 100644 --- a/sql/core/benchmarks/JSONBenchmark-results.txt +++ b/sql/core/benchmarks/JSONBenchmark-results.txt @@ -7,31 +7,42 @@ OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 62946 / 63310 1.6 629.5 1.0X -UTF-8 is set 112814 / 112866 0.9 1128.1 0.6X +No encoding 71832 / 72149 1.4 718.3 1.0X +UTF-8 is set 101700 / 101819 1.0 1017.0 0.7X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -JSON per-line parsing: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +count a short column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 16468 / 16553 6.1 164.7 1.0X -UTF-8 is set 16420 / 16441 6.1 164.2 1.0X +No encoding 16501 / 16519 6.1 165.0 1.0X +UTF-8 is set 16477 / 16516 6.1 164.8 1.0X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -JSON parsing of wide lines: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +count a wide column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 39789 / 40053 0.3 3978.9 1.0X -UTF-8 is set 39505 / 39584 0.3 3950.5 1.0X +No encoding 39871 / 40242 0.3 3987.1 1.0X +UTF-8 is set 39581 / 39721 0.3 3958.1 1.0X +Preparing data for benchmarking ... +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Select 10 columns + count() 16011 / 16033 0.6 1601.1 1.0X +Select 1 column + count() 14350 / 14392 0.7 1435.0 1.1X +count() 3007 / 3034 3.3 300.7 5.3X + +Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +creation of JSON parser per line: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Select 10 columns + count() 15997 / 16015 0.6 1599.7 1.0X -Select 1 column + count() 13280 / 13326 0.8 1328.0 1.2X -count() 3006 / 3021 3.3 300.6 5.3X +Short column without encoding 8334 / 8453 1.2 833.4 1.0X +Short column with UTF-8 13627 / 13784 0.7 1362.7 0.6X +Wide column without encoding 155073 / 155351 0.1 15507.3 0.1X +Wide column with UTF-8 212114 / 212263 0.0 21211.4 0.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index 04f724ec8638..f50c25ecfc1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -39,13 +39,17 @@ import org.apache.spark.sql.types._ object JSONBenchmark extends SqlBasedBenchmark { import spark.implicits._ - def schemaInferring(rowsNum: Int): Unit = { + def prepareDataInfo(benchmark: Benchmark): Unit = { + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + } + + def schemaInferring(rowsNum: Int, numIters: Int): Unit = { val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output) withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + prepareDataInfo(benchmark) spark.sparkContext.range(0, rowsNum, 1) .map(_ => "a") @@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark { .option("encoding", "UTF-8") .json(path.getAbsolutePath) - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read.json(path.getAbsolutePath) } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .json(path.getAbsolutePath) @@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def perlineParsing(rowsNum: Int): Unit = { - val benchmark = new Benchmark("JSON per-line parsing", rowsNum, output = output) + def writeShortColumn(path: String, rowsNum: Int): StructType = { + spark.sparkContext.range(0, rowsNum, 1) + .map(_ => "a") + .toDF("fieldA") + .write.json(path) + new StructType().add("fieldA", StringType) + } - withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + def countShortColumn(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("count a short column", rowsNum, output = output) - spark.sparkContext.range(0, rowsNum, 1) - .map(_ => "a") - .toDF("fieldA") - .write.json(path.getAbsolutePath) - val schema = new StructType().add("fieldA", StringType) + withTempPath { path => + prepareDataInfo(benchmark) + val schema = writeShortColumn(path.getAbsolutePath, rowsNum) - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read .schema(schema) .json(path.getAbsolutePath) .count() } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .schema(schema) @@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def perlineParsingOfWideColumn(rowsNum: Int): Unit = { - val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum, output = output) + def writeWideColumn(path: String, rowsNum: Int): StructType = { + spark.sparkContext.range(0, rowsNum, 1) + .map { i => + val s = "abcdef0123456789ABCDEF" * 20 + s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}""" + } + .toDF().write.text(path) + new StructType() + .add("a", StringType).add("b", LongType) + .add("c", StringType).add("d", LongType) + .add("e", StringType).add("f", LongType) + .add("x", StringType).add("y", LongType) + .add("z", StringType) + } + + def countWideColumn(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("count a wide column", rowsNum, output = output) withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + prepareDataInfo(benchmark) + val schema = writeWideColumn(path.getAbsolutePath, rowsNum) - spark.sparkContext.range(0, rowsNum, 1) - .map { i => - val s = "abcdef0123456789ABCDEF" * 20 - s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}""" - } - .toDF().write.text(path.getAbsolutePath) - val schema = new StructType() - .add("a", StringType).add("b", LongType) - .add("c", StringType).add("d", LongType) - .add("e", StringType).add("f", LongType) - .add("x", StringType).add("y", LongType) - .add("z", StringType) - - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read .schema(schema) .json(path.getAbsolutePath) .count() } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .schema(schema) @@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def countBenchmark(rowsNum: Int): Unit = { + def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = { val colsNum = 10 val benchmark = - new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output) + new Benchmark(s"Select a subset of $colsNum columns", rowsNum, output = output) withTempPath { path => + prepareDataInfo(benchmark) + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) val schema = StructType(fields) val columnNames = schema.fieldNames @@ -158,13 +166,13 @@ object JSONBenchmark extends SqlBasedBenchmark { val ds = spark.read.schema(schema).json(path.getAbsolutePath) - benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ => ds.select("*").filter((_: Row) => true).count() } - benchmark.addCase(s"Select 1 column + count()", 3) { _ => + benchmark.addCase(s"Select 1 column + count()", numIters) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", 3) { _ => + benchmark.addCase(s"count()", numIters) { _ => ds.count() } @@ -172,12 +180,64 @@ object JSONBenchmark extends SqlBasedBenchmark { } } + def jsonParserCreation(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("creation of JSON parser per line", rowsNum, output = output) + + withTempPath { path => + prepareDataInfo(benchmark) + + val shortColumnPath = path.getAbsolutePath + "/short" + val shortSchema = writeShortColumn(shortColumnPath, rowsNum) + + val wideColumnPath = path.getAbsolutePath + "/wide" + val wideSchema = writeWideColumn(wideColumnPath, rowsNum) + + benchmark.addCase("Short column without encoding", numIters) { _ => + spark.read + .schema(shortSchema) + .json(shortColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Short column with UTF-8", numIters) { _ => + spark.read + .option("encoding", "UTF-8") + .schema(shortSchema) + .json(shortColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Wide column without encoding", numIters) { _ => + spark.read + .schema(wideSchema) + .json(wideColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Wide column with UTF-8", numIters) { _ => + spark.read + .option("encoding", "UTF-8") + .schema(wideSchema) + .json(wideColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { - schemaInferring(100 * 1000 * 1000) - perlineParsing(100 * 1000 * 1000) - perlineParsingOfWideColumn(10 * 1000 * 1000) - countBenchmark(10 * 1000 * 1000) + schemaInferring(100 * 1000 * 1000, numIters) + countShortColumn(100 * 1000 * 1000, numIters) + countWideColumn(10 * 1000 * 1000, numIters) + selectSubsetOfColumns(10 * 1000 * 1000, numIters) + jsonParserCreation(10 * 1000 * 1000, numIters) } } }