diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 14dd707fa0f1..259008f183b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -154,7 +154,8 @@ abstract class ExternalCatalog { table: String, parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit + purge: Boolean, + retainData: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index a3ffeaa63f69..880a7a0dc422 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -385,7 +385,8 @@ class InMemoryCatalog( table: String, partSpecs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = synchronized { + purge: Boolean, + retainData: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { @@ -395,7 +396,12 @@ class InMemoryCatalog( } } - val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED + val shouldRemovePartitionLocation = if (retainData) { + false + } else { + getTable(db, table).tableType == CatalogTableType.MANAGED + } + // TODO: we should follow hive to roll back if one partition path failed to delete, and support // partial partition spec. partSpecs.foreach { p => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0b6a91fff71f..da3a2079f42d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -687,13 +687,14 @@ class SessionCatalog( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = { + purge: Boolean, + retainData: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) - externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge) + externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 303a8662d3f4..3b39f420af49 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false, + retainData = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } @@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false) + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false) } intercept[AnalysisException] { catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false) + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false) } } @@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false, + retainData = false) } catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false) } test("get partition") { @@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(exists(tableLocation, "partCol1=5", "partCol2=6")) catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, - purge = false) + purge = false, retainData = false) assert(!exists(tableLocation, "partCol1=3", "partCol2=4")) assert(!exists(tableLocation, "partCol1=5", "partCol2=6")) @@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val fs = partPath.getFileSystem(new Configuration) assert(fs.exists(partPath)) - catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false) + catalog.dropPartitions( + "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(fs.exists(partPath)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 3f27160d6393..f9c4b2687bf7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2)) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") @@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( @@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } intercept[NoSuchTableException] { catalog.dropPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } } @@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true, - purge = false) + purge = false, + retainData = false) } test("drop partitions with invalid partition spec") { @@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(partWithMoreColumns.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, b, c) must be contained within " + @@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(partWithUnknownColumns.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, unknown) must be contained within " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ffd6b0146b0b..4400174e9272 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -843,8 +843,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), - ctx.EXISTS != null, - ctx.PURGE != null) + ifExists = ctx.EXISTS != null, + purge = ctx.PURGE != null, + retainData = false) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0f126d0200ef..c62c14200c24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], ifExists: Boolean, - purge: Boolean) + purge: Boolean, + retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand( } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) + table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f3d92bf7cc24..4468dc58e404 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( l.catalogTable.get.identifier, deletedPartitions.toSeq, - ifExists = true, purge = true).run(t.sparkSession) + ifExists = true, purge = false, + retainData = true /* already deleted */).run(t.sparkSession) } } } t.location.refresh() } + val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { + overwrite.staticPartitionKeys.map { case (k, v) => + (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) + } + } else { + Map.empty + } + val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, - if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty, + staticPartitionKeys, customPartitionLocations, partitionSchema, t.bucketSpec, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index d31e7aeb3a78..5ef5f8ee7741 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest { Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), ifExists = true, - purge = false) + purge = false, + retainData = false) val expected2_table = expected1_table.copy(ifExists = false) val expected1_purge = expected1_table.copy(purge = true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1a9943bc3105..57ecd1ba0d61 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -848,9 +848,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withClient { + purge: Boolean, + retainData: Boolean): Unit = withClient { requireTableExists(db, table) - client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge) + client.dropPartitions( + db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 569a9c11398e..4c76932b6175 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -125,7 +125,8 @@ private[hive] trait HiveClient { table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit + purge: Boolean, + retainData: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 590029a517e0..bd840af5b164 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -453,7 +453,8 @@ private[hive] class HiveClientImpl( table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withHiveState { + purge: Boolean, + retainData: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) // do the check at first and collect all the matching partitions @@ -473,8 +474,7 @@ private[hive] class HiveClientImpl( var droppedParts = ArrayBuffer.empty[java.util.List[String]] matchingParts.foreach { partition => try { - val deleteData = true - shim.dropPartition(client, db, table, partition, deleteData, purge) + shim.dropPartition(client, db, table, partition, !retainData, purge) } catch { case e: Exception => val remainingParts = matchingParts.toBuffer -- droppedParts diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index e8e4238d1c5a..c2ac03276078 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite } } } + + test(s"SPARK-18659 insert overwrite table files - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + spark.sql("insert overwrite table test select id, id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 1) + + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + spark.sql( + "insert overwrite table test partition (A, B) select id, id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 1) + } + } + } + + test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + // note that 'A', 'B' are lowercase instead of their original case here + spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 10) + } + } + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 081b0ed9bd68..16ae345de6d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging { // with a version that is older than the minimum (1.2 in this case). try { client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = true) + purge = true, retainData = false) assert(!versionsWithoutPurge.contains(version)) } catch { case _: UnsupportedOperationException => assert(versionsWithoutPurge.contains(version)) client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = false) + purge = false, retainData = false) } assert(client.getPartitionOption("default", "src_part", spec).isEmpty)