Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions sql/core/benchmarks/JSONBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,42 @@ OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 62946 / 63310 1.6 629.5 1.0X
UTF-8 is set 112814 / 112866 0.9 1128.1 0.6X
No encoding 71832 / 72149 1.4 718.3 1.0X
UTF-8 is set 101700 / 101819 1.0 1017.0 0.7X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON per-line parsing: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
count a short column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 16468 / 16553 6.1 164.7 1.0X
UTF-8 is set 16420 / 16441 6.1 164.2 1.0X
No encoding 16501 / 16519 6.1 165.0 1.0X
UTF-8 is set 16477 / 16516 6.1 164.8 1.0X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON parsing of wide lines: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
count a wide column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 39789 / 40053 0.3 3978.9 1.0X
UTF-8 is set 39505 / 39584 0.3 3950.5 1.0X
No encoding 39871 / 40242 0.3 3987.1 1.0X
UTF-8 is set 39581 / 39721 0.3 3958.1 1.0X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count() 16011 / 16033 0.6 1601.1 1.0X
Select 1 column + count() 14350 / 14392 0.7 1435.0 1.1X
count() 3007 / 3034 3.3 300.7 5.3X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
creation of JSON parser per line: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count() 15997 / 16015 0.6 1599.7 1.0X
Select 1 column + count() 13280 / 13326 0.8 1328.0 1.2X
count() 3006 / 3021 3.3 300.6 5.3X
Short column without encoding 8334 / 8453 1.2 833.4 1.0X
Short column with UTF-8 13627 / 13784 0.7 1362.7 0.6X
Wide column without encoding 155073 / 155351 0.1 15507.3 0.1X
Wide column with UTF-8 212114 / 212263 0.0 21211.4 0.0X


Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ import org.apache.spark.sql.types._
object JSONBenchmark extends SqlBasedBenchmark {
import spark.implicits._

def schemaInferring(rowsNum: Int): Unit = {
def prepareDataInfo(benchmark: Benchmark): Unit = {
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
}

def schemaInferring(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output)

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
prepareDataInfo(benchmark)

spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
Expand All @@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark {
.option("encoding", "UTF-8")
.json(path.getAbsolutePath)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read.json(path.getAbsolutePath)
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.json(path.getAbsolutePath)
Expand All @@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def perlineParsing(rowsNum: Int): Unit = {
val benchmark = new Benchmark("JSON per-line parsing", rowsNum, output = output)
def writeShortColumn(path: String, rowsNum: Int): StructType = {
spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write.json(path)
new StructType().add("fieldA", StringType)
}

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
def countShortColumn(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("count a short column", rowsNum, output = output)

spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write.json(path.getAbsolutePath)
val schema = new StructType().add("fieldA", StringType)
withTempPath { path =>
prepareDataInfo(benchmark)
val schema = writeShortColumn(path.getAbsolutePath, rowsNum)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read
.schema(schema)
.json(path.getAbsolutePath)
.count()
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(schema)
Expand All @@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def perlineParsingOfWideColumn(rowsNum: Int): Unit = {
val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum, output = output)
def writeWideColumn(path: String, rowsNum: Int): StructType = {
spark.sparkContext.range(0, rowsNum, 1)
.map { i =>
val s = "abcdef0123456789ABCDEF" * 20
s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
}
.toDF().write.text(path)
new StructType()
.add("a", StringType).add("b", LongType)
.add("c", StringType).add("d", LongType)
.add("e", StringType).add("f", LongType)
.add("x", StringType).add("y", LongType)
.add("z", StringType)
}

def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("count a wide column", rowsNum, output = output)

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
prepareDataInfo(benchmark)
val schema = writeWideColumn(path.getAbsolutePath, rowsNum)

spark.sparkContext.range(0, rowsNum, 1)
.map { i =>
val s = "abcdef0123456789ABCDEF" * 20
s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
}
.toDF().write.text(path.getAbsolutePath)
val schema = new StructType()
.add("a", StringType).add("b", LongType)
.add("c", StringType).add("d", LongType)
.add("e", StringType).add("f", LongType)
.add("x", StringType).add("y", LongType)
.add("z", StringType)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read
.schema(schema)
.json(path.getAbsolutePath)
.count()
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(schema)
Expand All @@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def countBenchmark(rowsNum: Int): Unit = {
def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = {
val colsNum = 10
val benchmark =
new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output)
new Benchmark(s"Select a subset of $colsNum columns", rowsNum, output = output)

withTempPath { path =>
prepareDataInfo(benchmark)

val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
val schema = StructType(fields)
val columnNames = schema.fieldNames
Expand All @@ -158,26 +166,78 @@ object JSONBenchmark extends SqlBasedBenchmark {

val ds = spark.read.schema(schema).json(path.getAbsolutePath)

benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ =>
ds.select("*").filter((_: Row) => true).count()
}
benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
ds.select($"col1").filter((_: Row) => true).count()
}
benchmark.addCase(s"count()", 3) { _ =>
benchmark.addCase(s"count()", numIters) { _ =>
ds.count()
}

benchmark.run()
}
}

def jsonParserCreation(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("creation of JSON parser per line", rowsNum, output = output)

withTempPath { path =>
prepareDataInfo(benchmark)

val shortColumnPath = path.getAbsolutePath + "/short"
val shortSchema = writeShortColumn(shortColumnPath, rowsNum)

val wideColumnPath = path.getAbsolutePath + "/wide"
val wideSchema = writeWideColumn(wideColumnPath, rowsNum)

benchmark.addCase("Short column without encoding", numIters) { _ =>
spark.read
.schema(shortSchema)
.json(shortColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Short column with UTF-8", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(shortSchema)
.json(shortColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Wide column without encoding", numIters) { _ =>
spark.read
.schema(wideSchema)
.json(wideColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Wide column with UTF-8", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(wideSchema)
.json(wideColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.run()
}
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("Benchmark for performance of JSON parsing") {
schemaInferring(100 * 1000 * 1000)
perlineParsing(100 * 1000 * 1000)
perlineParsingOfWideColumn(10 * 1000 * 1000)
countBenchmark(10 * 1000 * 1000)
schemaInferring(100 * 1000 * 1000, numIters)
countShortColumn(100 * 1000 * 1000, numIters)
countWideColumn(10 * 1000 * 1000, numIters)
selectSubsetOfColumns(10 * 1000 * 1000, numIters)
jsonParserCreation(10 * 1000 * 1000, numIters)
}
}
}