Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ object ExternalCatalogUtils {
}
}

private def isNullPartitionValue(value: String): Boolean = {
value == null || value == DEFAULT_PARTITION_NAME
}

/**
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
* partial partition spec w.r.t. PARTITION (a=1,b=2).
Expand All @@ -169,9 +173,15 @@ object ExternalCatalogUtils {
spec1: TablePartitionSpec,
spec2: TablePartitionSpec): Boolean = {
spec1.forall {
case (partitionColumn, value) if isNullPartitionValue(value) =>
isNullPartitionValue(spec2(partitionColumn))
case (partitionColumn, value) => spec2(partitionColumn) == value
}
}

def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = {
spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap
}
}

object CatalogUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,12 @@ class InMemoryCatalog(

listPartitions(db, table, partialSpec).map { partition =>
partitionColumnNames.map { name =>
escapePathName(name) + "=" + escapePathName(partition.spec(name))
val partValue = if (partition.spec(name) == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partition.spec(name))
}
escapePathName(name) + "=" + partValue
}.mkString("/")
}.sorted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ class SessionCatalog(
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
if (s.values.exists(v => v != null && v.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw QueryCompilationErrors.invalidPartitionSpecError(
s"The spec ($spec) contains an empty partition column value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) {
ctx match {
case _: NullLiteralContext => null
case s: StringLiteralContext => createString(s)
case o => o.getText
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
catalogTable.get.tracksPartitionsInCatalog
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
if (normalizedPartSpec.map(_._2)
.filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark

assert(unions.size == 1)
}

test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
}
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
override val command = "ALTER TABLE .. DROP PARTITION"

protected def notFullPartitionSpecErr: String
protected def nullPartitionValue: String

protected def checkDropPartition(
t: String,
Expand Down Expand Up @@ -170,4 +171,14 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
}
}

test("SPARK-33591: null as a partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
checkPartitions(t, Map("p1" -> nullPartitionValue))
sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
checkPartitions(t)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command
*/
trait AlterTableDropPartitionSuiteBase extends command.AlterTableDropPartitionSuiteBase {
override protected val notFullPartitionSpecErr = "The following partitions not found in table"
override protected def nullPartitionValue: String = "__HIVE_DEFAULT_PARTITION__"

test("purge partition data") {
withNamespaceAndTable("ns", "tbl") { t =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
assert(errMsg.contains("'SHOW PARTITIONS' expects a table"))
}
}

test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__"))
checkAnswer(
sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
Row("p1=__HIVE_DEFAULT_PARTITION__"))
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.sql.execution.command
class AlterTableDropPartitionSuite
extends command.AlterTableDropPartitionSuiteBase
with CommandSuiteBase {

override protected val notFullPartitionSpecErr = "Partition spec is invalid"
override protected def nullPartitionValue: String = "null"

test("SPARK-33650: drop partition into a table which doesn't support partition management") {
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,9 +942,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Hive metastore is not case preserving and the partition columns are always lower cased. We need
// to lower case the column names in partition specification before calling partition related Hive
// APIs, to match this behaviour.
private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
// scalastyle:off caselocale
spec.map { case (k, v) => k.toLowerCase -> v }
val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
ExternalCatalogUtils.convertNullPartitionValues(lowNames)
// scalastyle:on caselocale
}

Expand Down Expand Up @@ -993,8 +994,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
val metaStoreParts = partsWithLocation
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
}

override def dropPartitions(
Expand All @@ -1006,7 +1008,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
client.dropPartitions(
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData)
}

override def renamePartitions(
Expand All @@ -1015,7 +1017,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
client.renamePartitions(
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec))

val tableMeta = getTable(db, table)
val partitionColumnNames = tableMeta.partitionColumnNames
Expand All @@ -1031,7 +1033,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val fs = tablePath.getFileSystem(hadoopConf)
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
}
alterPartitions(db, table, newParts)
Expand Down Expand Up @@ -1141,12 +1143,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))

val rawTable = getRawTable(db, table)

// convert partition statistics to properties so that we can persist them through hive api
val withStatsProps = lowerCasedParts.map { p =>
val withStatsProps = metaStoreParts.map { p =>
if (p.stats.isDefined) {
val statsProperties = statsToProperties(p.stats.get)
p.copy(parameters = p.parameters ++ statsProperties)
Expand All @@ -1162,7 +1164,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
restorePartitionMetadata(part, getTable(db, table))
}

Expand Down Expand Up @@ -1200,7 +1202,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part =>
restorePartitionMetadata(part, getTable(db, table))
}
}
Expand All @@ -1215,7 +1217,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec))
clientPartitionNames.map { partitionPath =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) =>
Expand All @@ -1234,11 +1236,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
val res = client.getPartitions(db, table, metaStoreSpec)
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}

partialSpec match {
metaStoreSpec match {
// This might be a bug of Hive: When the partition value inside the partial partition spec
// contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
// treats dot as matching any single character and may return more partitions than we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ case class InsertIntoHiveTable(
val numDynamicPartitions = partition.values.count(_.isEmpty)
val numStaticPartitions = partition.values.count(_.nonEmpty)
val partitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about empty string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test:

  test("SPARK-33591: '' as a partition value") {
    val t = "part_table"
    withTable(t) {
      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
      sql(s"INSERT INTO TABLE $t PARTITION (p1 = '') SELECT 0")
    }
  }

fails with:

Partition spec is invalid. The spec ([p1=Some()]) contains an empty partition column value
org.apache.spark.sql.AnalysisException: Partition spec is invalid. The spec ([p1=Some()]) contains an empty partition column value
	at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:412)

at

// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {

case (key, Some(value)) => key -> value
case (key, None) => key -> ""
}
Expand Down Expand Up @@ -229,6 +230,7 @@ case class InsertIntoHiveTable(
val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)

val updatedPartitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
case (key, Some(value)) => key -> value
case (key, None) if caseInsensitiveDpMap.contains(key) =>
key -> caseInsensitiveDpMap(key)
Expand Down