Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ abstract class ExternalCatalog {
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit
purge: Boolean,
retainData: Boolean): Unit

/**
* Override the specs of one or many existing table partitions, assuming they exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ class InMemoryCatalog(
table: String,
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = synchronized {
purge: Boolean,
retainData: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
Expand All @@ -395,7 +396,12 @@ class InMemoryCatalog(
}
}

val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
val shouldRemovePartitionLocation = if (retainData) {
false
} else {
getTable(db, table).tableType == CatalogTableType.MANAGED
}

// TODO: we should follow hive to roll back if one partition path failed to delete, and support
// partial partition spec.
partSpecs.foreach { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,14 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
purge: Boolean,
retainData: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we provide a default value? looks like most of the time we want it to be false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed a little safer to make it mandatory to avoid some component forgetting to propagate it.

val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,36 +361,40 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions(
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
resetState()
val catalog2 = newBasicCatalog()
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions(
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false,
retainData = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}

test("drop partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false,
retainData = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false,
retainData = false)
}
}

test("drop partitions that do not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false,
retainData = false)
}
catalog.dropPartitions(
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false)
}

test("get partition") {
Expand Down Expand Up @@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(exists(tableLocation, "partCol1=5", "partCol2=6"))

catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
purge = false)
purge = false, retainData = false)
assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))

Expand Down Expand Up @@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val fs = partPath.getFileSystem(new Configuration)
assert(fs.exists(partPath))

catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
catalog.dropPartitions(
"db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(fs.exists(partPath))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,15 +618,17 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.dropPartitions(
TableIdentifier("tbl2"),
Seq(part2.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
// Drop multiple partitions at once
sessionCatalog.createPartitions(
Expand All @@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec, part2.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
}

Expand All @@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
}
intercept[NoSuchTableException] {
catalog.dropPartitions(
TableIdentifier("does_not_exist", Some("db2")),
Seq(),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
}
}

Expand All @@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
}
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = true,
purge = false)
purge = false,
retainData = false)
}

test("drop partitions with invalid partition spec") {
Expand All @@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithMoreColumns.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
Expand All @@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithUnknownColumns.spec),
ignoreIfNotExists = false,
purge = false)
purge = false,
retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)
purge: Boolean,
retainData: Boolean)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand All @@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand(
}

catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
l.catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = true).run(t.sparkSession)
ifExists = true, purge = false,
retainData = true /* already deleted */).run(t.sparkSession)
}
}
}
t.location.refresh()
}

val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
overwrite.staticPartitionKeys.map { case (k, v) =>
(partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
}
} else {
Map.empty
}

val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
staticPartitionKeys,
customPartitionLocations,
partitionSchema,
t.bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest {
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)
purge = false,
retainData = false)
val expected2_table = expected1_table.copy(ifExists = false)
val expected1_purge = expected1_table.copy(purge = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withClient {
purge: Boolean,
retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
client.dropPartitions(
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
}

override def renamePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ private[hive] trait HiveClient {
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit
purge: Boolean,
retainData: Boolean): Unit

/**
* Rename one or many existing table partitions, assuming they exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ private[hive] class HiveClientImpl(
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withHiveState {
purge: Boolean,
retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
Expand All @@ -473,8 +474,7 @@ private[hive] class HiveClientImpl(
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
try {
val deleteData = true
shim.dropPartition(client, db, table, partition, deleteData, purge)
shim.dropPartition(client, db, table, partition, !retainData, purge)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite
}
}
}

test(s"SPARK-18659 insert overwrite table files - partition management $enabled") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
withTable("test") {
spark.range(10)
.selectExpr("id", "id as A", "'x' as B")
.write.partitionBy("A", "B").mode("overwrite")
.saveAsTable("test")
spark.sql("insert overwrite table test select id, id, 'x' from range(1)")
assert(spark.sql("select * from test").count() == 1)

spark.range(10)
.selectExpr("id", "id as A", "'x' as B")
.write.partitionBy("A", "B").mode("overwrite")
.saveAsTable("test")
spark.sql(
"insert overwrite table test partition (A, B) select id, id, 'x' from range(1)")
assert(spark.sql("select * from test").count() == 1)
}
}
}

test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
withTable("test") {
spark.range(10)
.selectExpr("id", "id as A", "'x' as B")
.write.partitionBy("A", "B").mode("overwrite")
.saveAsTable("test")
// note that 'A', 'B' are lowercase instead of their original case here
spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)")
assert(spark.sql("select * from test").count() == 10)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
// with a version that is older than the minimum (1.2 in this case).
try {
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
purge = true)
purge = true, retainData = false)
assert(!versionsWithoutPurge.contains(version))
} catch {
case _: UnsupportedOperationException =>
assert(versionsWithoutPurge.contains(version))
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
purge = false)
purge = false, retainData = false)
}

assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
Expand Down