diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 15281f24fa628..f35938a80ef9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => - val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) - val tableType = if (storage.locationUri.isDefined) { + val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) + } else { + None + } + val storage = if (tableExists) { + existingTable.get.storage + } else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) + } + val tableType = if (tableExists) { + existingTable.get.tableType + } else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd - if (tableDesc.partitionColumnNames.nonEmpty && - df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( - AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7e16e43f2bb0e..d8cd716901f26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -208,7 +208,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index a1aa07456fd36..cace5fa95cad0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite } } + for (enabled <- Seq(true, false)) { + test(s"SPARK-18544 append with saveAsTable - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable("test", dir) + if (enabled) { + spark.sql("msck repair table test") + } + assert(spark.sql("select * from test").count() == 5) + spark.range(10).selectExpr("id as fieldOne", "id as partCol") + .write.partitionBy("partCol").mode("append").saveAsTable("test") + assert(spark.sql("select * from test").count() == 15) + } + } + } + } + } + /** * Runs a test against a multi-level partitioned table, then validates that the custom locations * were respected by the output writer.