diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e764e0510d96..0f0d2880ec5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1227,6 +1227,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HIVE_METASTORE_DROP_PARTITION_BY_NAME = + buildConf("spark.sql.hive.dropPartitionByName.enabled") + .doc("When true, Spark will get partition name rather than partition object " + + "to drop partition, which can improve the performance of drop partition.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val HIVE_METASTORE_PARTITION_PRUNING = buildConf("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + @@ -4460,6 +4468,8 @@ class SQLConf extends Serializable with Logging { def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) + def metastoreDropPartitionsByName: Boolean = getConf(HIVE_METASTORE_DROP_PARTITION_BY_NAME) + def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) def metastorePartitionPruningInSetThreshold: Int = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala index 3f15533ca5fe..eaf305414f1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala @@ -284,4 +284,22 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil } } } + + test("SPARK-42480: drop partition when dropPartitionByName enabled") { + withSQLConf(SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t(name STRING, age INT) USING PARQUET PARTITIONED BY (region STRING)") + sql(s"ALTER TABLE $t ADD PARTITION (region='=reg1') LOCATION 'loc1'") + checkPartitions(t, Map("region" -> "=reg1")) + sql(s"ALTER TABLE $t PARTITION (region='=reg1') RENAME TO PARTITION (region='=%reg1')") + checkPartitions(t, Map("region" -> "=%reg1")) + sql(s"ALTER TABLE $t DROP PARTITION (region='=%reg1')") + checkPartitions(t) + sql(s"ALTER TABLE $t ADD PARTITION (region='reg?2') LOCATION 'loc2'") + checkPartitions(t, Map("region" -> "reg?2")) + sql(s"ALTER TABLE $t DROP PARTITION (region='reg?2')") + checkPartitions(t) + } + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index fe9bdef3d0e1..0b59011f4e2a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -28,6 +28,8 @@ import scala.util.Try import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState @@ -50,6 +52,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} private[spark] object HiveUtils extends Logging { + private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r /** The version of hive used internally by Spark SQL. */ val builtinHiveVersion: String = HiveVersionInfo.getVersion @@ -591,4 +594,14 @@ private[spark] object HiveUtils extends Logging { table.copy(schema = StructType((dataCols ++ partCols).toArray)) } } + + /** + * Extract the partition values from a partition name, e.g., if a partition name is + * "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18"). + */ + def partitionNameToValues(name: String): Array[String] = { + name.split(Path.SEPARATOR).map { + case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0a83ec2689c8..aaa0afc344de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -679,7 +679,6 @@ private[hive] class HiveClientImpl( purge: Boolean, retainData: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call - val hiveTable = shim.getTable(client, db, table, true /* throw exception */) // do the check at first and collect all the matching partitions val matchingParts = specs.flatMap { s => @@ -687,11 +686,21 @@ private[hive] class HiveClientImpl( // The provided spec here can be a partial spec, i.e. it will match all partitions // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. - val parts = shim.getPartitions(client, hiveTable, s.asJava) - if (parts.isEmpty && !ignoreIfNotExists) { - throw new NoSuchPartitionsException(db, table, Seq(s)) + val dropPartitionByName = SQLConf.get.metastoreDropPartitionsByName + if (dropPartitionByName) { + val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1) + if (partitionNames.isEmpty && !ignoreIfNotExists) { + throw new NoSuchPartitionsException(db, table, Seq(s)) + } + partitionNames.map(HiveUtils.partitionNameToValues(_).toList.asJava) + } else { + val hiveTable = shim.getTable(client, db, table, true /* throw exception */) + val parts = shim.getPartitions(client, hiveTable, s.asJava) + if (parts.isEmpty && !ignoreIfNotExists) { + throw new NoSuchPartitionsException(db, table, Seq(s)) + } + parts.map(_.getValues) } - parts.map(_.getValues) }.distinct val droppedParts = ArrayBuffer.empty[java.util.List[String]] matchingParts.foreach { partition => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala index de995c120e6f..050332603513 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableDropPartitionSuite.scala @@ -50,4 +50,29 @@ class AlterTableDropPartitionSuite } } } + + test("SPARK-42480: hive client calls when dropPartitionByName enabled") { + Seq(false, true).foreach { statsOn => + withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> statsOn.toString, + SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)") + sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0") + sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1") + sql(s"ALTER TABLE $t ADD PARTITION (part=2)") // empty partition + checkHiveClientCalls(expected = if (statsOn) 25 else 17) { + sql(s"ALTER TABLE $t DROP PARTITION (part=2)") + } + checkHiveClientCalls(expected = if (statsOn) 30 else 17) { + sql(s"ALTER TABLE $t DROP PARTITION (part=0)") + } + sql(s"CACHE TABLE $t") + checkHiveClientCalls(expected = if (statsOn) 30 else 17) { + sql(s"ALTER TABLE $t DROP PARTITION (part=1)") + } + } + } + } + } }