Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
<scope>${hive.common.scope}</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.util

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils

import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
Expand All @@ -30,6 +33,7 @@ import org.apache.spark.sql.types.{CharType, DataType, StringType, StructField,
import org.apache.spark.unsafe.types.UTF8String

private[sql] object PartitioningUtils {
private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r
Copy link
Contributor

@LuciferYang LuciferYang Feb 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too idealistic, not all partition tables follow this rule. For example, we can use
alter table ... partition(...) set location ... to relocate the partition to any directory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the data corresponding to the partition a=1 is stored in dir /1/, will there be a bad case with this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang Thanks for your review, partition name is always followed this rule in Hive makePartName.
Partition name is only related to partition keys and values, other partition fields like location will not affect it.

Copy link
Contributor

@LuciferYang LuciferYang Feb 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember seeing similar cases in the production environment, but I can't remember the details. Need to have tests to check the corner scenes we can think of

cc @wangyum @sunchao FYI


def castPartitionSpec(value: String, dt: DataType, conf: SQLConf): Expression = {
conf.storeAssignmentPolicy match {
Expand Down Expand Up @@ -136,4 +140,14 @@ private[sql] object PartitioningUtils {
partitionColumnNames, tableName)
}
}

/**
* Extract the partition values from a partition name, e.g., if a partition name is
* "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18").
*/
def partitionNameToValues(name: String): Array[String] = {
name.split(Path.SEPARATOR).map {
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{CircularBuffer, Utils}

/**
Expand Down Expand Up @@ -679,19 +680,18 @@ private[hive] class HiveClientImpl(
purge: Boolean,
retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
val parts = shim.getPartitions(client, hiveTable, s.asJava)
if (parts.isEmpty && !ignoreIfNotExists) {
val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1)
if (partitionNames.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
}
parts.map(_.getValues)
partitionNames.map(PartitioningUtils.partitionNameToValues(_).toList.asJava)
}.distinct
val droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
Expand Down