Skip to content

Commit 46b2e49

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-18173][SQL] data source tables should support truncating partition
## What changes were proposed in this pull request? Previously `TRUNCATE TABLE ... PARTITION` will always truncate the whole table for data source tables, this PR fixes it and improve `InMemoryCatalog` to make this command work with it. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #15688 from cloud-fan/truncate.
1 parent 556a3b7 commit 46b2e49

File tree

5 files changed

+146
-17
lines changed

5 files changed

+146
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,26 @@ class InMemoryCatalog(
487487
table: String,
488488
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized {
489489
requireTableExists(db, table)
490-
if (partialSpec.nonEmpty) {
491-
throw new UnsupportedOperationException(
492-
"listPartition with partial partition spec is not implemented")
490+
491+
partialSpec match {
492+
case None => catalog(db).tables(table).partitions.values.toSeq
493+
case Some(partial) =>
494+
catalog(db).tables(table).partitions.toSeq.collect {
495+
case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition
496+
}
497+
}
498+
}
499+
500+
/**
501+
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
502+
* partial partition spec w.r.t. PARTITION (a=1,b=2).
503+
*/
504+
private def isPartialPartitionSpec(
505+
spec1: TablePartitionSpec,
506+
spec2: TablePartitionSpec): Boolean = {
507+
spec1.forall {
508+
case (partitionColumn, value) => spec2(partitionColumn) == value
493509
}
494-
catalog(db).tables(table).partitions.values.toSeq
495510
}
496511

497512
override def listPartitionsByFilter(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
320320
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
321321
}
322322

323+
test("list partitions with partial partition spec") {
324+
val catalog = newBasicCatalog()
325+
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
326+
assert(parts.length == 1)
327+
assert(parts.head.spec == part1.spec)
328+
329+
// if no partition is matched for the given partition spec, an empty list should be returned.
330+
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty)
331+
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty)
332+
}
333+
323334
test("drop partitions") {
324335
val catalog = newBasicCatalog()
325336
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,19 @@ case class TruncateTableCommand(
343343
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
344344
}
345345
val locations =
346-
// TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
347-
if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
348-
Seq(table.storage.locationUri)
349-
} else if (table.partitionColumnNames.isEmpty) {
346+
if (table.partitionColumnNames.isEmpty) {
350347
Seq(table.storage.locationUri)
351348
} else {
352-
catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
349+
// Here we diverge from Hive when the given partition spec contains all partition columns
350+
// but no partition is matched: Hive will throw an exception and we just do nothing.
351+
val normalizedSpec = partitionSpec.map { spec =>
352+
PartitioningUtils.normalizePartitionSpec(
353+
spec,
354+
table.partitionColumnNames,
355+
table.identifier.quotedString,
356+
spark.sessionState.conf.resolver)
357+
}
358+
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
353359
}
354360
val hadoopConf = spark.sessionState.newHadoopConf()
355361
locations.foreach { location =>

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
16281628

16291629
test("truncate table - datasource table") {
16301630
import testImplicits._
1631-
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
16321631

1632+
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
16331633
// Test both a Hive compatible and incompatible code path.
16341634
Seq("json", "parquet").foreach { format =>
16351635
withTable("rectangles") {
16361636
data.write.format(format).saveAsTable("rectangles")
16371637
assume(spark.table("rectangles").collect().nonEmpty,
16381638
"bad test; table was empty to begin with")
1639+
16391640
sql("TRUNCATE TABLE rectangles")
16401641
assert(spark.table("rectangles").collect().isEmpty)
1642+
1643+
// not supported since the table is not partitioned
1644+
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
16411645
}
16421646
}
1647+
}
16431648

1644-
withTable("rectangles", "rectangles2") {
1645-
data.write.saveAsTable("rectangles")
1646-
data.write.partitionBy("length").saveAsTable("rectangles2")
1649+
test("truncate partitioned table - datasource table") {
1650+
import testImplicits._
16471651

1648-
// not supported since the table is not partitioned
1649-
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
1652+
val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")
16501653

1654+
withTable("partTable") {
1655+
data.write.partitionBy("width", "length").saveAsTable("partTable")
16511656
// supported since partitions are stored in the metastore
1652-
sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
1653-
assert(spark.table("rectangles2").collect().isEmpty)
1657+
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
1658+
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
1659+
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
1660+
}
1661+
1662+
withTable("partTable") {
1663+
data.write.partitionBy("width", "length").saveAsTable("partTable")
1664+
// support partial partition spec
1665+
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
1666+
assert(spark.table("partTable").collect().nonEmpty)
1667+
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
1668+
}
1669+
1670+
withTable("partTable") {
1671+
data.write.partitionBy("width", "length").saveAsTable("partTable")
1672+
// do nothing if no partition is matched for the given partial partition spec
1673+
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
1674+
assert(spark.table("partTable").count() == data.count())
1675+
1676+
// do nothing if no partition is matched for the given non-partial partition spec
1677+
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
1678+
// Hive's behaviour or stick with our existing behaviour later.
1679+
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
1680+
assert(spark.table("partTable").count() == data.count())
1681+
1682+
// throw exception if the column in partition spec is not a partition column.
1683+
val e = intercept[AnalysisException] {
1684+
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
1685+
}
1686+
assert(e.message.contains("unknown is not a valid partition column"))
16541687
}
16551688
}
16561689

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,4 +1098,68 @@ class HiveDDLSuite
10981098
}
10991099
}
11001100
}
1101+
1102+
test("truncate table - datasource table") {
1103+
import testImplicits._
1104+
1105+
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
1106+
// Test both a Hive compatible and incompatible code path.
1107+
Seq("json", "parquet").foreach { format =>
1108+
withTable("rectangles") {
1109+
data.write.format(format).saveAsTable("rectangles")
1110+
assume(spark.table("rectangles").collect().nonEmpty,
1111+
"bad test; table was empty to begin with")
1112+
1113+
sql("TRUNCATE TABLE rectangles")
1114+
assert(spark.table("rectangles").collect().isEmpty)
1115+
1116+
// not supported since the table is not partitioned
1117+
val e = intercept[AnalysisException] {
1118+
sql("TRUNCATE TABLE rectangles PARTITION (width=1)")
1119+
}
1120+
assert(e.message.contains("Operation not allowed"))
1121+
}
1122+
}
1123+
}
1124+
1125+
test("truncate partitioned table - datasource table") {
1126+
import testImplicits._
1127+
1128+
val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")
1129+
1130+
withTable("partTable") {
1131+
data.write.partitionBy("width", "length").saveAsTable("partTable")
1132+
// supported since partitions are stored in the metastore
1133+
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
1134+
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
1135+
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
1136+
}
1137+
1138+
withTable("partTable") {
1139+
data.write.partitionBy("width", "length").saveAsTable("partTable")
1140+
// support partial partition spec
1141+
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
1142+
assert(spark.table("partTable").collect().nonEmpty)
1143+
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
1144+
}
1145+
1146+
withTable("partTable") {
1147+
data.write.partitionBy("width", "length").saveAsTable("partTable")
1148+
// do nothing if no partition is matched for the given partial partition spec
1149+
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
1150+
assert(spark.table("partTable").count() == data.count())
1151+
1152+
// do nothing if no partition is matched for the given non-partial partition spec
1153+
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
1154+
// Hive's behaviour or stick with our existing behaviour later.
1155+
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
1156+
assert(spark.table("partTable").count() == data.count())
1157+
1158+
// throw exception if the column in partition spec is not a partition column.
1159+
val e = intercept[AnalysisException] {
1160+
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
1161+
}
1162+
assert(e.message.contains("unknown is not a valid partition column"))
1163+
}
1164+
}
11011165
}

0 commit comments

Comments
 (0)