diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index fdda13b1bf6e..cad580ef615d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -400,6 +400,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("topic", topic) + .mode("append") .save() checkAnswer( createKafkaReader(topic, includeHeaders = true).selectExpr( @@ -423,6 +424,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { df.write .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .mode("append") .save() } TestUtils.assertExceptionMsg(ex, "null topic present in the data") @@ -457,6 +459,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("topic", topic) + .mode("append") .save() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0ec661fc16c8..73302521b639 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1643,7 +1643,7 @@ object SQLConf { "implementation class names for which Data Source V2 code path is disabled. These data " + "sources will fallback to Data Source V1 code path.") .stringConf - .createWithDefault("") + .createWithDefault("kafka") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index edf0963e71e8..3f7016df2eb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -67,7 +67,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter[T] = { - this.mode = Some(saveMode) + this.mode = saveMode this } @@ -267,7 +267,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { "if partition columns are specified.") } lazy val relation = DataSourceV2Relation.create(table, dsOptions) - modeForDSV2 match { + mode match { case SaveMode.Append => runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan, extraOptions.toMap) @@ -308,7 +308,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan) + options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) } } @@ -319,6 +319,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @note Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based * resolution. For example: * + * @note SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in `insertInto` as + * `insertInto` is not a table creating operation. + * * {{{ * scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") * scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") @@ -380,8 +383,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { DataSourceV2Relation.create(t) } - val command = modeForDSV2 match { - case SaveMode.Append => + val command = mode match { + case SaveMode.Append | SaveMode.ErrorIfExists | SaveMode.Ignore => AppendData.byPosition(table, df.logicalPlan, extraOptions.toMap) case SaveMode.Overwrite => @@ -394,10 +397,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } else { OverwriteByExpression.byPosition(table, df.logicalPlan, Literal(true), extraOptions.toMap) } - - case other => - throw new AnalysisException(s"insertInto does not support $other mode, " + - s"please use Append or Overwrite mode instead.") } runCommand(df.sparkSession, "insertInto") { @@ -411,7 +410,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { table = UnresolvedRelation(tableIdent), partitionSpec = Map.empty[String, Option[String]], query = df.logicalPlan, - overwrite = modeForDSV1 == SaveMode.Overwrite, + overwrite = mode == SaveMode.Overwrite, ifPartitionNotExists = false) } } @@ -490,12 +489,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => - saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) + saveAsTable(catalog.asTableCatalog, ident) case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => - // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility - // for now. - saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1) + saveAsTable(sessionCatalog.asTableCatalog, ident) case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) @@ -507,7 +504,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } - private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = { + private def saveAsTable(catalog: TableCatalog, ident: Identifier): Unit = { val partitioning = partitioningColumns.map { colNames => colNames.map(name => IdentityTransform(FieldReference(name))) }.getOrElse(Seq.empty[Transform]) @@ -568,7 +565,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableIdentWithDB = tableIdent.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString - (tableExists, modeForDSV1) match { + (tableExists, mode) match { case (true, SaveMode.Ignore) => // Do nothing @@ -624,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec) runCommand(df.sparkSession, "saveAsTable")( - CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan))) + CreateTable(tableDesc, mode, Some(df.logicalPlan))) } /** @@ -830,10 +827,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd) } - private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists) - - private def modeForDSV2 = mode.getOrElse(SaveMode.Append) - private def lookupV2Provider(): Option[TableProvider] = { DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match { // TODO(SPARK-28396): File source v2 write path is currently broken. @@ -848,7 +841,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName - private var mode: Option[SaveMode] = None + private var mode: SaveMode = SaveMode.ErrorIfExists private val extraOptions = new scala.collection.mutable.HashMap[String, String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index cd811bb7afb5..8e921aaeb66f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException class DataSourceV2DataFrameSuite extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) { @@ -75,13 +76,15 @@ class DataSourceV2DataFrameSuite withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - // Default saveMode is append, therefore this doesn't throw a table already exists exception - df.write.saveAsTable(t1) + // Default saveMode is ErrorIfExists + intercept[TableAlreadyExistsException] { + df.write.saveAsTable(t1) + } + assert(spark.table(t1).count() === 0) + + // appends are by name not by position + df.select('data, 'id).write.mode("append").saveAsTable(t1) checkAnswer(spark.table(t1), df) - - // also appends are by name not by position - df.select('data, 'id).write.saveAsTable(t1) - checkAnswer(spark.table(t1), df.union(df)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 00a8b430d33c..138bbc3f04f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -225,8 +225,12 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { spark.read.format(cls.getName).option("path", path).load(), spark.range(10).select('id, -'id)) - // default save mode is append - spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) + // default save mode is ErrorIfExists + intercept[AnalysisException] { + spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) + .option("path", path).save() + } + spark.range(10).select('id as 'i, -'id as 'j).write.mode("append").format(cls.getName) .option("path", path).save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), @@ -281,7 +285,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { val numPartition = 6 spark.range(0, 10, 1, numPartition).select('id as 'i, -'id as 'j).write.format(cls.getName) - .option("path", path).save() + .mode("append").option("path", path).save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), spark.range(10).select('id, -'id)) @@ -368,7 +372,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { val format = classOf[SimpleWritableDataSource].getName val df = Seq((1L, 2L)).toDF("i", "j") - df.write.format(format).option("path", optionPath).save() + df.write.format(format).mode("append").option("path", optionPath).save() assert(!new File(sessionPath).exists) checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala index c5a03cb8ef6d..b4073bedf559 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala @@ -32,6 +32,7 @@ class NoopSuite extends SharedSparkSession { } .write .format("noop") + .mode("append") .save() assert(accum.value == numElems) } @@ -54,7 +55,7 @@ class NoopSuite extends SharedSparkSession { accum.add(1) x } - .write.format("noop").save() + .write.mode("append").format("noop").save() assert(accum.value == numElems) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 01a03e484c90..cef0e5ab4756 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -289,18 +289,20 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(plan.isInstanceOf[OverwriteByExpression]) // By default the save mode is `ErrorIfExists` for data source v2. - spark.range(10).write - .format(classOf[NoopDataSource].getName) - .save() - sparkContext.listenerBus.waitUntilEmpty() - assert(plan.isInstanceOf[AppendData]) + val e = intercept[AnalysisException] { + spark.range(10).write + .format(classOf[NoopDataSource].getName) + .save() + } + assert(e.getMessage.contains("ErrorIfExists")) - spark.range(10).write - .format(classOf[NoopDataSource].getName) - .mode("default") - .save() - sparkContext.listenerBus.waitUntilEmpty() - assert(plan.isInstanceOf[AppendData]) + val e2 = intercept[AnalysisException] { + spark.range(10).write + .format(classOf[NoopDataSource].getName) + .mode("default") + .save() + } + assert(e2.getMessage.contains("ErrorIfExists")) } finally { spark.listenerManager.unregister(listener) }