Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import java.sql.Timestamp

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -36,7 +37,12 @@ import org.apache.spark.sql.internal.SQLConf
*/
object DateTimeBenchmark extends SqlBasedBenchmark {
private def doBenchmark(cardinality: Int, exprs: String*): Unit = {
spark.range(cardinality).selectExpr(exprs: _*).write.format("noop").save()
spark.range(cardinality)
.selectExpr(exprs: _*)
.write
.format("noop")
.mode(Overwrite)
.save()
}

private def run(cardinality: Int, name: String, exprs: String*): Unit = {
Expand Down Expand Up @@ -132,7 +138,10 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
benchmark.addCase("From java.sql.Timestamp", numIters) { _ =>
spark.range(rowsNum)
.map(millis => new Timestamp(millis))
.write.format("noop").save()
.write
.format("noop")
.mode(Overwrite)
.save()
}
benchmark.addCase("Collect longs", numIters) { _ =>
spark.range(0, rowsNum, 1, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import java.time.Instant

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -44,6 +45,7 @@ object ExtractBenchmark extends SqlBasedBenchmark {
.selectExpr(exprs: _*)
.write
.format("noop")
.mode(Overwrite)
.save()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -41,6 +42,7 @@ object MakeDateTimeBenchmark extends SqlBasedBenchmark {
.selectExpr(exprs: _*)
.write
.format("noop")
.mode(Overwrite)
.save()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -47,7 +48,11 @@ abstract class NestedSchemaPruningBenchmark extends SqlBasedBenchmark {

private def addCase(benchmark: Benchmark, name: String, sql: String): Unit = {
benchmark.addCase(name) { _ =>
spark.sql(sql).write.format("noop").save()
spark.sql(sql)
.write
.format("noop")
.mode(Overwrite)
.save()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
Expand All @@ -42,16 +43,25 @@ object UDFBenchmark extends SqlBasedBenchmark {
val nullableIntCol = when(
idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType))
val stringCol = idCol.cast(StringType)
spark.range(cardinality).select(
udf(idCol, nullableIntCol, stringCol)).write.format("noop").save()
spark.range(cardinality)
.select(udf(idCol, nullableIntCol, stringCol))
.write
.format("noop")
.mode(Overwrite)
.save()
}

private def doRunBenchmarkWithPrimitiveTypes(
udf: UserDefinedFunction, cardinality: Int): Unit = {
val idCol = col("id")
val nullableIntCol = when(
idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType))
spark.range(cardinality).select(udf(idCol, nullableIntCol)).write.format("noop").save()
spark.range(cardinality)
.select(udf(idCol, nullableIntCol))
.write
.format("noop")
.mode(Overwrite)
.save()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
Expand Down Expand Up @@ -104,16 +114,25 @@ object UDFBenchmark extends SqlBasedBenchmark {
val benchmark = new Benchmark("UDF identity overhead", cardinality, output = output)

benchmark.addCase(s"Baseline", numIters = 5) { _ =>
spark.range(cardinality).select(
col("id"), col("id") * 2, col("id") * 3).write.format("noop").save()
spark.range(cardinality)
.select(col("id"), col("id") * 2, col("id") * 3)
.write
.format("noop")
.mode(Overwrite)
.save()
}

val identityUDF = udf { x: Long => x }
benchmark.addCase(s"With identity UDF", numIters = 5) { _ =>
spark.range(cardinality).select(
identityUDF(col("id")),
identityUDF(col("id") * 2),
identityUDF(col("id") * 3)).write.format("noop").save()
spark.range(cardinality)
.select(
identityUDF(col("id")),
identityUDF(col("id") * 2),
identityUDF(col("id") * 3))
.write
.format("noop")
.mode(Overwrite)
.save()
}

benchmark.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate}

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand All @@ -42,7 +43,9 @@ import org.apache.spark.sql.types._
object CSVBenchmark extends SqlBasedBenchmark {
import spark.implicits._

private def toNoop(ds: Dataset[_]): Unit = ds.write.format("noop").save()
private def toNoop(ds: Dataset[_]): Unit = {
ds.write.format("noop").mode(Overwrite).save()
}

private def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate}

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.SaveMode.Overwrite
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -49,7 +50,7 @@ object JSONBenchmark extends SqlBasedBenchmark {
}

private def run(ds: Dataset[_]): Unit = {
ds.write.format("noop").save()
ds.write.format("noop").mode(Overwrite).save()
}

def schemaInferring(rowsNum: Int, numIters: Int): Unit = {
Expand Down