Skip to content

Commit 34bfb3a

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-33787][SQL] Allow partition purge for v2 tables
### What changes were proposed in this pull request? 1. Add new methods `purgePartition()`/`purgePartitions()` to the interfaces `SupportsPartitionManagement`/`SupportsAtomicPartitionManagement`. 2. Default implementation of new methods throw the exception `UnsupportedOperationException`. 3. Add tests for new methods to `SupportsPartitionManagementSuite`/`SupportsAtomicPartitionManagementSuite`. 4. Add `ALTER TABLE .. DROP PARTITION` tests for DS v1 and v2. Closes #30776 Closes #30821 ### Why are the changes needed? Currently, the `PURGE` option that user can set in `ALTER TABLE .. DROP PARTITION` is completely ignored. We should pass this flag to the catalog implementation, so, the catalog should decide how to handle the flag. ### Does this PR introduce _any_ user-facing change? The changes can impact on behavior of `ALTER TABLE .. DROP PARTITION` for v2 tables. ### How was this patch tested? By running the affected test suites, for instance: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite" ``` Closes #30886 from MaxGekk/purge-partition. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d98c216 commit 34bfb3a

File tree

8 files changed

+99
-5
lines changed

8 files changed

+99
-5
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
2425
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
2526
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
2627

@@ -33,6 +34,9 @@
3334
* add an array of partitions and any data they contain to the table
3435
* ${@link #dropPartitions}:
3536
* remove an array of partitions and any data they contain from the table
37+
* ${@link #purgePartitions}:
38+
* remove an array of partitions and any data they contain from the table by skipping
39+
* a trash even if it is supported
3640
*
3741
* @since 3.1.0
3842
*/
@@ -82,4 +86,23 @@ void createPartitions(
8286
* @return true if partitions were deleted, false if any partition not exists
8387
*/
8488
boolean dropPartitions(InternalRow[] idents);
89+
90+
/**
91+
* Drop an array of partitions atomically from table, and completely remove partitions data
92+
* by skipping a trash even if it is supported.
93+
* <p>
94+
* If any partition doesn't exists,
95+
* the operation of purgePartitions need to be safely rolled back.
96+
*
97+
* @param idents an array of partition identifiers
98+
* @return true if partitions were deleted, false if any partition not exists
99+
* @throws NoSuchPartitionException If any partition identifier to alter doesn't exist
100+
* @throws UnsupportedOperationException If partition purging is not supported
101+
*
102+
* @since 3.2.0
103+
*/
104+
default boolean purgePartitions(InternalRow[] idents)
105+
throws NoSuchPartitionException, UnsupportedOperationException {
106+
throw new UnsupportedOperationException("Partition purge is not supported");
107+
}
85108
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
* add a partition and any data it contains to the table
3737
* ${@link #dropPartition}:
3838
* remove a partition and any data it contains from the table
39+
* ${@link #purgePartition}:
40+
* remove a partition and any data it contains from the table by skipping a trash
41+
* even if it is supported.
3942
* ${@link #replacePartitionMetadata}:
4043
* point a partition to a new location, which will swap one location's data for the other
4144
*
@@ -72,6 +75,22 @@ void createPartition(
7275
*/
7376
boolean dropPartition(InternalRow ident);
7477

78+
/**
79+
* Drop a partition from the table and completely remove partition data by skipping a trash
80+
* even if it is supported.
81+
*
82+
* @param ident a partition identifier
83+
* @return true if a partition was deleted, false if no partition exists for the identifier
84+
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
85+
* @throws UnsupportedOperationException If partition purging is not supported
86+
*
87+
* @since 3.2.0
88+
*/
89+
default boolean purgePartition(InternalRow ident)
90+
throws NoSuchPartitionException, UnsupportedOperationException {
91+
throw new UnsupportedOperationException("Partition purge is not supported");
92+
}
93+
7594
/**
7695
* Test whether a partition exists using an {@link InternalRow ident} from the table.
7796
*

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,20 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
110110
assert(!hasPartitions(partTable))
111111
}
112112

113+
test("purgePartitions") {
114+
val table = catalog.loadTable(ident)
115+
val partTable = new InMemoryAtomicPartitionTable(
116+
table.name(), table.schema(), table.partitioning(), table.properties())
117+
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
118+
partTable.createPartitions(
119+
partIdents,
120+
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
121+
val errMsg = intercept[UnsupportedOperationException] {
122+
partTable.purgePartitions(partIdents)
123+
}.getMessage
124+
assert(errMsg.contains("purge is not supported"))
125+
}
126+
113127
test("dropPartitions failed if partition not exists") {
114128
val table = catalog.loadTable(ident)
115129
val partTable = new InMemoryAtomicPartitionTable(

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
8585
assert(!hasPartitions(partTable))
8686
}
8787

88+
test("purgePartition") {
89+
val table = catalog.loadTable(ident)
90+
val partTable = new InMemoryPartitionTable(
91+
table.name(), table.schema(), table.partitioning(), table.properties())
92+
val errMsg = intercept[UnsupportedOperationException] {
93+
partTable.purgePartition(InternalRow.apply("3"))
94+
}.getMessage
95+
assert(errMsg.contains("purge is not supported"))
96+
}
97+
8898
test("replacePartitionMetadata") {
8999
val table = catalog.loadTable(ident)
90100
val partTable = new InMemoryPartitionTable(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableDropPartitionExec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement
2828
case class AlterTableDropPartitionExec(
2929
table: SupportsPartitionManagement,
3030
partSpecs: Seq[ResolvedPartitionSpec],
31-
ignoreIfNotExists: Boolean) extends V2CommandExec {
31+
ignoreIfNotExists: Boolean,
32+
purge: Boolean) extends V2CommandExec {
3233
import DataSourceV2Implicits._
3334

3435
override def output: Seq[Attribute] = Seq.empty
@@ -45,9 +46,11 @@ case class AlterTableDropPartitionExec(
4546
existsPartIdents match {
4647
case Seq() => // Nothing will be done
4748
case Seq(partIdent) =>
48-
table.dropPartition(partIdent)
49+
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
4950
case _ if table.isInstanceOf[SupportsAtomicPartitionManagement] =>
50-
table.asAtomicPartitionable.dropPartitions(existsPartIdents.toArray)
51+
val idents = existsPartIdents.toArray
52+
val atomicTable = table.asAtomicPartitionable
53+
if (purge) atomicTable.purgePartitions(idents) else atomicTable.dropPartitions(idents)
5154
case _ =>
5255
throw new UnsupportedOperationException(
5356
s"Nonatomic partition table ${table.name()} can not drop multiple partitions.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
348348
table, parts.asResolvedPartitionSpecs, ignoreIfExists) :: Nil
349349

350350
case AlterTableDropPartition(
351-
ResolvedTable(_, _, table: SupportsPartitionManagement), parts, ignoreIfNotExists, _) =>
351+
ResolvedTable(_, _, table: SupportsPartitionManagement), parts, ignoreIfNotExists, purge) =>
352352
AlterTableDropPartitionExec(
353-
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists) :: Nil
353+
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists, purge) :: Nil
354354

355355
case AlterTableRenamePartition(_: ResolvedTable, _: ResolvedPartitionSpec, _) =>
356356
throw new AnalysisException(

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableDropPartitionSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ import org.apache.spark.sql.execution.command
2121

2222
trait AlterTableDropPartitionSuiteBase extends command.AlterTableDropPartitionSuiteBase {
2323
override protected val notFullPartitionSpecErr = "The following partitions not found in table"
24+
25+
test("purge partition data") {
26+
withNamespaceAndTable("ns", "tbl") { t =>
27+
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
28+
sql(s"ALTER TABLE $t ADD PARTITION (id = 1)")
29+
checkPartitions(t, Map("id" -> "1"))
30+
sql(s"ALTER TABLE $t DROP PARTITION (id = 1) PURGE")
31+
checkPartitions(t) // no partitions
32+
}
33+
}
2434
}
2535

2636
class AlterTableDropPartitionSuite

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,19 @@ class AlterTableDropPartitionSuite
3535
assert(errMsg.contains("can not alter partitions"))
3636
}
3737
}
38+
39+
test("purge partition data") {
40+
withNamespaceAndTable("ns", "tbl") { t =>
41+
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
42+
sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
43+
try {
44+
val errMsg = intercept[UnsupportedOperationException] {
45+
sql(s"ALTER TABLE $t DROP PARTITION (id=1) PURGE")
46+
}.getMessage
47+
assert(errMsg.contains("purge is not supported"))
48+
} finally {
49+
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
50+
}
51+
}
52+
}
3853
}

0 commit comments

Comments
 (0)