Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_DROP_PARTITION_BY_NAME =
buildConf("spark.sql.hive.dropPartitionByName.enabled")
.doc("When true, Spark will get partition name rather than partition object " +
"to drop partition, which can improve the performance of drop partition.")
.version("3.4.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @sunchao . You need to backport this to branch-3.4.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do backporting still if you need this. Otherwise, we need to change this to 3.5.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing out @dongjoon-hyun ! yes, let me backport this to 3.4.0 too and update the JIRA accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's pretty safe to backport to branch-3.4 since the feature is turned off by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the decision. I also support your decision. Here is my +1.

.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
Expand Down Expand Up @@ -4460,6 +4468,8 @@ class SQLConf extends Serializable with Logging {

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

def metastoreDropPartitionsByName: Boolean = getConf(HIVE_METASTORE_DROP_PARTITION_BY_NAME)

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def metastorePartitionPruningInSetThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,22 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
}
}
}

test("SPARK-42480: drop partition when dropPartitionByName enabled") {
withSQLConf(SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t(name STRING, age INT) USING PARQUET PARTITIONED BY (region STRING)")
sql(s"ALTER TABLE $t ADD PARTITION (region='=reg1') LOCATION 'loc1'")
checkPartitions(t, Map("region" -> "=reg1"))
sql(s"ALTER TABLE $t PARTITION (region='=reg1') RENAME TO PARTITION (region='=%reg1')")
checkPartitions(t, Map("region" -> "=%reg1"))
sql(s"ALTER TABLE $t DROP PARTITION (region='=%reg1')")
checkPartitions(t)
sql(s"ALTER TABLE $t ADD PARTITION (region='reg?2') LOCATION 'loc2'")
checkPartitions(t, Map("region" -> "reg?2"))
sql(s"ALTER TABLE $t DROP PARTITION (region='reg?2')")
checkPartitions(t)
}
}
}
}
13 changes: 13 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import scala.util.Try

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
Expand All @@ -50,6 +52,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}


private[spark] object HiveUtils extends Logging {
private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r

/** The version of hive used internally by Spark SQL. */
val builtinHiveVersion: String = HiveVersionInfo.getVersion
Expand Down Expand Up @@ -591,4 +594,14 @@ private[spark] object HiveUtils extends Logging {
table.copy(schema = StructType((dataCols ++ partCols).toArray))
}
}

/**
* Extract the partition values from a partition name, e.g., if a partition name is
* "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18").
*/
def partitionNameToValues(name: String): Array[String] = {
name.split(Path.SEPARATOR).map {
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -679,19 +679,28 @@ private[hive] class HiveClientImpl(
purge: Boolean,
retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
val parts = shim.getPartitions(client, hiveTable, s.asJava)
if (parts.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
val dropPartitionByName = SQLConf.get.metastoreDropPartitionsByName
if (dropPartitionByName) {
val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1)
if (partitionNames.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
}
partitionNames.map(HiveUtils.partitionNameToValues(_).toList.asJava)
} else {
val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
val parts = shim.getPartitions(client, hiveTable, s.asJava)
if (parts.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
}
parts.map(_.getValues)
}
parts.map(_.getValues)
}.distinct
val droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,29 @@ class AlterTableDropPartitionSuite
}
}
}

test("SPARK-42480: hive client calls when dropPartitionByName enabled") {
Seq(false, true).foreach { statsOn =>
withSQLConf(
SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> statsOn.toString,
SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
sql(s"ALTER TABLE $t ADD PARTITION (part=2)") // empty partition
checkHiveClientCalls(expected = if (statsOn) 25 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=2)")
}
checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
}
sql(s"CACHE TABLE $t")
checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=1)")
}
}
}
}
}
}