diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0b7b67ed56d2..d285e007dac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1452,7 +1452,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("orc") + .createWithDefault("") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 450828172b93..6bd4e0fb29a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode @@ -266,13 +266,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { provider.getTable(dsOptions) match { case table: SupportsBatchWrite => lazy val relation = DataSourceV2Relation.create(table, options) + val isFileSource = table.isInstanceOf[FileTable] mode match { - case SaveMode.Append => + case SaveMode.Append if !isFileSource => runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } - case SaveMode.Overwrite => + case SaveMode.Overwrite if !isFileSource => // truncate the table runCommand(df.sparkSession, "save") { OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 58522f7b1376..b6176a371f98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -329,7 +329,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "orc" } else { @@ -374,7 +374,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "orc" } else { @@ -469,6 +469,25 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("File data sources V2 supports overwriting with different schema") { + withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "") { + Seq("orc", "parquet", "json").foreach { format => + withTempPath { p => + val path = p.getCanonicalPath + spark.range(10).write.format(format).save(path) + val newDF = spark.range(20).map(id => (id.toDouble, id.toString)).toDF("double", "string") + newDF.write.format(format).mode("overwrite").save(path) + + val readDF = spark.read.format(format).load(path) + val expectedSchema = StructType(Seq( + StructField("double", DoubleType, true), StructField("string", StringType, true))) + assert(readDF.schema == expectedSchema) + checkAnswer(readDF, newDF) + } + } + } + } + test("SPARK-25237 compute correct input metrics in FileScanRDD") { withTempPath { p => val path = p.getAbsolutePath