diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index df0f87e483cd..9cfa9070aca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark import java.sql.Timestamp import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.internal.SQLConf /** @@ -36,7 +37,12 @@ import org.apache.spark.sql.internal.SQLConf */ object DateTimeBenchmark extends SqlBasedBenchmark { private def doBenchmark(cardinality: Int, exprs: String*): Unit = { - spark.range(cardinality).selectExpr(exprs: _*).write.format("noop").save() + spark.range(cardinality) + .selectExpr(exprs: _*) + .write + .format("noop") + .mode(Overwrite) + .save() } private def run(cardinality: Int, name: String, exprs: String*): Unit = { @@ -132,7 +138,10 @@ object DateTimeBenchmark extends SqlBasedBenchmark { benchmark.addCase("From java.sql.Timestamp", numIters) { _ => spark.range(rowsNum) .map(millis => new Timestamp(millis)) - .write.format("noop").save() + .write + .format("noop") + .mode(Overwrite) + .save() } benchmark.addCase("Collect longs", numIters) { _ => spark.range(0, rowsNum, 1, 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala index a109b11b2d6d..2bd73c1dc14f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark import java.time.Instant import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.internal.SQLConf /** @@ -44,6 +45,7 @@ object ExtractBenchmark extends SqlBasedBenchmark { .selectExpr(exprs: _*) .write .format("noop") + .mode(Overwrite) .save() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MakeDateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MakeDateTimeBenchmark.scala index 7f7908544693..9e5aca70ac62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MakeDateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MakeDateTimeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.internal.SQLConf /** @@ -41,6 +42,7 @@ object MakeDateTimeBenchmark extends SqlBasedBenchmark { .selectExpr(exprs: _*) .write .format("noop") + .mode(Overwrite) .save() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala index 96f90f29707d..4b6da5a02eac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.internal.SQLConf /** @@ -47,7 +48,11 @@ abstract class NestedSchemaPruningBenchmark extends SqlBasedBenchmark { private def addCase(benchmark: Benchmark, name: String, sql: String): Unit = { benchmark.addCase(name) { _ => - spark.sql(sql).write.format("noop").save() + spark.sql(sql) + .write + .format("noop") + .mode(Overwrite) + .save() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala index 9cbd6423f667..04c1b5ade12c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ @@ -42,8 +43,12 @@ object UDFBenchmark extends SqlBasedBenchmark { val nullableIntCol = when( idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) val stringCol = idCol.cast(StringType) - spark.range(cardinality).select( - udf(idCol, nullableIntCol, stringCol)).write.format("noop").save() + spark.range(cardinality) + .select(udf(idCol, nullableIntCol, stringCol)) + .write + .format("noop") + .mode(Overwrite) + .save() } private def doRunBenchmarkWithPrimitiveTypes( @@ -51,7 +56,12 @@ object UDFBenchmark extends SqlBasedBenchmark { val idCol = col("id") val nullableIntCol = when( idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) - spark.range(cardinality).select(udf(idCol, nullableIntCol)).write.format("noop").save() + spark.range(cardinality) + .select(udf(idCol, nullableIntCol)) + .write + .format("noop") + .mode(Overwrite) + .save() } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { @@ -104,16 +114,25 @@ object UDFBenchmark extends SqlBasedBenchmark { val benchmark = new Benchmark("UDF identity overhead", cardinality, output = output) benchmark.addCase(s"Baseline", numIters = 5) { _ => - spark.range(cardinality).select( - col("id"), col("id") * 2, col("id") * 3).write.format("noop").save() + spark.range(cardinality) + .select(col("id"), col("id") * 2, col("id") * 3) + .write + .format("noop") + .mode(Overwrite) + .save() } val identityUDF = udf { x: Long => x } benchmark.addCase(s"With identity UDF", numIters = 5) { _ => - spark.range(cardinality).select( - identityUDF(col("id")), - identityUDF(col("id") * 2), - identityUDF(col("id") * 3)).write.format("noop").save() + spark.range(cardinality) + .select( + identityUDF(col("id")), + identityUDF(col("id") * 2), + identityUDF(col("id") * 3)) + .write + .format("noop") + .mode(Overwrite) + .save() } benchmark.run() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala index e41e81af508f..a4cffedaf82d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{Column, Dataset, Row} +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -42,7 +43,9 @@ import org.apache.spark.sql.types._ object CSVBenchmark extends SqlBasedBenchmark { import spark.implicits._ - private def toNoop(ds: Dataset[_]): Unit = ds.write.format("noop").save() + private def toNoop(ds: Dataset[_]): Unit = { + ds.write.format("noop").mode(Overwrite).save() + } private def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = { val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index f486e603e255..83856541618d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -21,6 +21,7 @@ import java.time.{Instant, LocalDate} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.SaveMode.Overwrite import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -49,7 +50,7 @@ object JSONBenchmark extends SqlBasedBenchmark { } private def run(ds: Dataset[_]): Unit = { - ds.write.format("noop").save() + ds.write.format("noop").mode(Overwrite).save() } def schemaInferring(rowsNum: Int, numIters: Int): Unit = {