Skip to content

Commit e374b24

Browse files
ericlcloud-fan
authored andcommitted
[SPARK-18659][SQL] Incorrect behaviors in overwrite table for datasource tables
## What changes were proposed in this pull request? Two bugs are addressed here 1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management was enabled. This was because when dropping partitions after an overwrite operation, the Hive client will attempt to delete the partition files. If the entire partition directory was dropped, this would fail. The PR fixes this by adding a flag to control whether the Hive client should attempt to delete files. 2. The static partition spec for OVERWRITE TABLE was not correctly resolved to the case-sensitive original partition names. This resulted in the entire table being overwritten if you did not correctly capitalize your partition names. cc yhuai cloud-fan ## How was this patch tested? Unit tests. Surprisingly, the existing overwrite table tests did not catch these edge cases. Author: Eric Liang <[email protected]> Closes #16088 from ericl/spark-18659. (cherry picked from commit 7935c84) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 415730e commit e374b24

File tree

14 files changed

+110
-37
lines changed

14 files changed

+110
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ abstract class ExternalCatalog {
154154
table: String,
155155
parts: Seq[TablePartitionSpec],
156156
ignoreIfNotExists: Boolean,
157-
purge: Boolean): Unit
157+
purge: Boolean,
158+
retainData: Boolean): Unit
158159

159160
/**
160161
* Override the specs of one or many existing table partitions, assuming they exist.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,8 @@ class InMemoryCatalog(
385385
table: String,
386386
partSpecs: Seq[TablePartitionSpec],
387387
ignoreIfNotExists: Boolean,
388-
purge: Boolean): Unit = synchronized {
388+
purge: Boolean,
389+
retainData: Boolean): Unit = synchronized {
389390
requireTableExists(db, table)
390391
val existingParts = catalog(db).tables(table).partitions
391392
if (!ignoreIfNotExists) {
@@ -395,7 +396,12 @@ class InMemoryCatalog(
395396
}
396397
}
397398

398-
val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
399+
val shouldRemovePartitionLocation = if (retainData) {
400+
false
401+
} else {
402+
getTable(db, table).tableType == CatalogTableType.MANAGED
403+
}
404+
399405
// TODO: we should follow hive to roll back if one partition path failed to delete, and support
400406
// partial partition spec.
401407
partSpecs.foreach { p =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,13 +687,14 @@ class SessionCatalog(
687687
tableName: TableIdentifier,
688688
specs: Seq[TablePartitionSpec],
689689
ignoreIfNotExists: Boolean,
690-
purge: Boolean): Unit = {
690+
purge: Boolean,
691+
retainData: Boolean): Unit = {
691692
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
692693
val table = formatTableName(tableName.table)
693694
requireDbExists(db)
694695
requireTableExists(TableIdentifier(table, Option(db)))
695696
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
696-
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
697+
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
697698
}
698699

699700
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -361,36 +361,40 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
361361
val catalog = newBasicCatalog()
362362
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
363363
catalog.dropPartitions(
364-
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
364+
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
365365
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
366366
resetState()
367367
val catalog2 = newBasicCatalog()
368368
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
369369
catalog2.dropPartitions(
370-
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
370+
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false,
371+
retainData = false)
371372
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
372373
}
373374

374375
test("drop partitions when database/table does not exist") {
375376
val catalog = newBasicCatalog()
376377
intercept[AnalysisException] {
377378
catalog.dropPartitions(
378-
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
379+
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false,
380+
retainData = false)
379381
}
380382
intercept[AnalysisException] {
381383
catalog.dropPartitions(
382-
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
384+
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false,
385+
retainData = false)
383386
}
384387
}
385388

386389
test("drop partitions that do not exist") {
387390
val catalog = newBasicCatalog()
388391
intercept[AnalysisException] {
389392
catalog.dropPartitions(
390-
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
393+
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false,
394+
retainData = false)
391395
}
392396
catalog.dropPartitions(
393-
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
397+
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false)
394398
}
395399

396400
test("get partition") {
@@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
713717
assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
714718

715719
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
716-
purge = false)
720+
purge = false, retainData = false)
717721
assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
718722
assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
719723

@@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
745749
val fs = partPath.getFileSystem(new Configuration)
746750
assert(fs.exists(partPath))
747751

748-
catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
752+
catalog.dropPartitions(
753+
"db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
749754
assert(fs.exists(partPath))
750755
}
751756
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -618,15 +618,17 @@ class SessionCatalogSuite extends SparkFunSuite {
618618
TableIdentifier("tbl2", Some("db2")),
619619
Seq(part1.spec),
620620
ignoreIfNotExists = false,
621-
purge = false)
621+
purge = false,
622+
retainData = false)
622623
assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
623624
// Drop partitions without explicitly specifying database
624625
sessionCatalog.setCurrentDatabase("db2")
625626
sessionCatalog.dropPartitions(
626627
TableIdentifier("tbl2"),
627628
Seq(part2.spec),
628629
ignoreIfNotExists = false,
629-
purge = false)
630+
purge = false,
631+
retainData = false)
630632
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
631633
// Drop multiple partitions at once
632634
sessionCatalog.createPartitions(
@@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
636638
TableIdentifier("tbl2", Some("db2")),
637639
Seq(part1.spec, part2.spec),
638640
ignoreIfNotExists = false,
639-
purge = false)
641+
purge = false,
642+
retainData = false)
640643
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
641644
}
642645

@@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
647650
TableIdentifier("tbl1", Some("unknown_db")),
648651
Seq(),
649652
ignoreIfNotExists = false,
650-
purge = false)
653+
purge = false,
654+
retainData = false)
651655
}
652656
intercept[NoSuchTableException] {
653657
catalog.dropPartitions(
654658
TableIdentifier("does_not_exist", Some("db2")),
655659
Seq(),
656660
ignoreIfNotExists = false,
657-
purge = false)
661+
purge = false,
662+
retainData = false)
658663
}
659664
}
660665

@@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
665670
TableIdentifier("tbl2", Some("db2")),
666671
Seq(part3.spec),
667672
ignoreIfNotExists = false,
668-
purge = false)
673+
purge = false,
674+
retainData = false)
669675
}
670676
catalog.dropPartitions(
671677
TableIdentifier("tbl2", Some("db2")),
672678
Seq(part3.spec),
673679
ignoreIfNotExists = true,
674-
purge = false)
680+
purge = false,
681+
retainData = false)
675682
}
676683

677684
test("drop partitions with invalid partition spec") {
@@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
681688
TableIdentifier("tbl2", Some("db2")),
682689
Seq(partWithMoreColumns.spec),
683690
ignoreIfNotExists = false,
684-
purge = false)
691+
purge = false,
692+
retainData = false)
685693
}
686694
assert(e.getMessage.contains(
687695
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
@@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
691699
TableIdentifier("tbl2", Some("db2")),
692700
Seq(partWithUnknownColumns.spec),
693701
ignoreIfNotExists = false,
694-
purge = false)
702+
purge = false,
703+
retainData = false)
695704
}
696705
assert(e.getMessage.contains(
697706
"Partition spec is invalid. The spec (a, unknown) must be contained within " +

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -833,8 +833,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
833833
AlterTableDropPartitionCommand(
834834
visitTableIdentifier(ctx.tableIdentifier),
835835
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
836-
ctx.EXISTS != null,
837-
ctx.PURGE != null)
836+
ifExists = ctx.EXISTS != null,
837+
purge = ctx.PURGE != null,
838+
retainData = false)
838839
}
839840

840841
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand(
421421
tableName: TableIdentifier,
422422
specs: Seq[TablePartitionSpec],
423423
ifExists: Boolean,
424-
purge: Boolean)
424+
purge: Boolean,
425+
retainData: Boolean)
425426
extends RunnableCommand {
426427

427428
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand(
439440
}
440441

441442
catalog.dropPartitions(
442-
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
443+
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
444+
retainData = retainData)
443445
Seq.empty[Row]
444446
}
445447

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
217217
if (deletedPartitions.nonEmpty) {
218218
AlterTableDropPartitionCommand(
219219
l.catalogTable.get.identifier, deletedPartitions.toSeq,
220-
ifExists = true, purge = true).run(t.sparkSession)
220+
ifExists = true, purge = false,
221+
retainData = true /* already deleted */).run(t.sparkSession)
221222
}
222223
}
223224
}
224225
t.location.refresh()
225226
}
226227

228+
val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
229+
overwrite.staticPartitionKeys.map { case (k, v) =>
230+
(partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
231+
}
232+
} else {
233+
Map.empty
234+
}
235+
227236
val insertCmd = InsertIntoHadoopFsRelationCommand(
228237
outputPath,
229-
if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
238+
staticPartitionKeys,
230239
customPartitionLocations,
231240
partitionSchema,
232241
t.bucketSpec,

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest {
615615
Map("dt" -> "2008-08-08", "country" -> "us"),
616616
Map("dt" -> "2009-09-09", "country" -> "uk")),
617617
ifExists = true,
618-
purge = false)
618+
purge = false,
619+
retainData = false)
619620
val expected2_table = expected1_table.copy(ifExists = false)
620621
val expected1_purge = expected1_table.copy(purge = true)
621622

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
850850
table: String,
851851
parts: Seq[TablePartitionSpec],
852852
ignoreIfNotExists: Boolean,
853-
purge: Boolean): Unit = withClient {
853+
purge: Boolean,
854+
retainData: Boolean): Unit = withClient {
854855
requireTableExists(db, table)
855-
client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
856+
client.dropPartitions(
857+
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
856858
}
857859

858860
override def renamePartitions(

0 commit comments

Comments
 (0)