diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 188fce72efac..55e538f49fed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options) if (mode == SaveMode.Append) { + val relation = DataSourceV2Relation.create(source, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 7cc8abc9f042..e8f291af13ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing in other modes except append mode") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5).select('id as 'i, -'id as 'j) + try { + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + } catch { + case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) + } + intercept[SchemaReadAttemptException] { + df.write.format(cls.getName).option("path", path).mode("append").save() + } + } + } } @@ -640,3 +658,14 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } } + +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + override def fullSchema(): StructType = { + // This is a bit hacky since this source implements read support but throws + // during schema retrieval. Might have to rewrite but it's done + // such so for minimised changes. + throw new SchemaReadAttemptException("read is not supported") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index a0f4404f4614..a7dfc2d1deac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2 with BatchWriteSupportProvider with SessionConfigSupport { - private val schema = new StructType().add("i", "long").add("j", "long") + protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport { - override def fullSchema(): StructType = schema + override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema() override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { val dataPath = new Path(path) @@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2 schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[BatchWriteSupport] = { - assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) val path = new Path(options.get("path").get())