diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index ae3b75dc3334..9cb71bb51294 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -159,6 +159,10 @@ object ExternalCatalogUtils { } } + private def isNullPartitionValue(value: String): Boolean = { + value == null || value == DEFAULT_PARTITION_NAME + } + /** * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a * partial partition spec w.r.t. PARTITION (a=1,b=2). @@ -167,9 +171,15 @@ object ExternalCatalogUtils { spec1: TablePartitionSpec, spec2: TablePartitionSpec): Boolean = { spec1.forall { + case (partitionColumn, value) if isNullPartitionValue(value) => + isNullPartitionValue(spec2(partitionColumn)) case (partitionColumn, value) => spec2(partitionColumn) == value } } + + def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = { + spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap + } } object CatalogUtils { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 31644a5ae4e3..c05950300279 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -541,7 +541,12 @@ class InMemoryCatalog( listPartitions(db, table, partialSpec).map { partition => partitionColumnNames.map { name => - escapePathName(name) + "=" + escapePathName(partition.spec(name)) + val partValue = if (partition.spec(name) == null) { + DEFAULT_PARTITION_NAME + } else { + escapePathName(partition.spec(name)) + } + escapePathName(name) + "=" + partValue }.mkString("/") }.sorted } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd2..389e41023415 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1104,7 +1104,7 @@ class SessionCatalog( */ private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { specs.foreach { s => - if (s.values.exists(_.isEmpty)) { + if (s.values.exists(v => v != null && v.isEmpty)) { val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") throw new AnalysisException( s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2fcee5a9e86e..91519c021481 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -520,6 +520,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) { ctx match { + case _: NullLiteralContext => null case s: StringLiteralContext => createString(s) case o => o.getText } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index fdad69d1f1bb..8039c9b6f04b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3575,6 +3575,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } }) } + + test("SPARK-33591: null as a partition value") { + val t = "part_table" + withTable(t) { + sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)") + sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0") + checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null)) + } + } } case class Foo(bar: Option[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index cf464d1c37d4..5986cdc78d6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1731,6 +1731,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { // use int literal as partition value for int type partition column sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)") assert(catalog.listPartitions(tableIdent).isEmpty) + + // null partition values + createTablePartition(catalog, Map("a" -> null, "b" -> null), tableIdent) + val nullPartValue = if (isUsingHiveMetastore) "__HIVE_DEFAULT_PARTITION__" else null + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> nullPartValue, "b" -> nullPartValue))) + sql("ALTER TABLE tab1 DROP PARTITION (a = null, b = null)") + assert(catalog.listPartitions(tableIdent).isEmpty) } protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 571c25e356c0..bd44b0b8f139 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -951,9 +951,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Hive metastore is not case preserving and the partition columns are always lower cased. We need // to lower case the column names in partition specification before calling partition related Hive // APIs, to match this behaviour. - private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = { + private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = { // scalastyle:off caselocale - spec.map { case (k, v) => k.toLowerCase -> v } + val lowNames = spec.map { case (k, v) => k.toLowerCase -> v } + ExternalCatalogUtils.convertNullPartitionValues(lowNames) // scalastyle:on caselocale } @@ -1002,8 +1003,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))) } - val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) - client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) + val metaStoreParts = partsWithLocation + .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) + client.createPartitions(db, table, metaStoreParts, ignoreIfExists) } override def dropPartitions( @@ -1015,7 +1017,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat retainData: Boolean): Unit = withClient { requireTableExists(db, table) client.dropPartitions( - db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) + db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData) } override def renamePartitions( @@ -1024,7 +1026,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = withClient { client.renamePartitions( - db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec)) + db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec)) val tableMeta = getTable(db, table) val partitionColumnNames = tableMeta.partitionColumnNames @@ -1040,7 +1042,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val fs = tablePath.getFileSystem(hadoopConf) val newParts = newSpecs.map { spec => val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec) - val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec)) partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri))) } alterPartitions(db, table, newParts) @@ -1150,12 +1152,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { - val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) val rawTable = getRawTable(db, table) // convert partition statistics to properties so that we can persist them through hive api - val withStatsProps = lowerCasedParts.map { p => + val withStatsProps = metaStoreParts.map { p => if (p.stats.isDefined) { val statsProperties = statsToProperties(p.stats.get) p.copy(parameters = p.parameters ++ statsProperties) @@ -1171,7 +1173,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = withClient { - val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec)) restorePartitionMetadata(part, getTable(db, table)) } @@ -1209,7 +1211,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { - client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => + client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part => restorePartitionMetadata(part, getTable(db, table)) } } @@ -1224,7 +1226,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val catalogTable = getTable(db, table) val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) val clientPartitionNames = - client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) + client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec)) clientPartitionNames.map { partitionPath => val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath) partSpec.map { case (partName, partValue) => @@ -1243,11 +1245,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) - val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) + val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec) + val res = client.getPartitions(db, table, metaStoreSpec) + .map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } - partialSpec match { + metaStoreSpec match { // This might be a bug of Hive: When the partition value inside the partial partition spec // contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive // treats dot as matching any single character and may return more partitions than we diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 05acca46135c..39d5b711ab87 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -131,6 +131,7 @@ case class InsertIntoHiveTable( val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME case (key, Some(value)) => key -> value case (key, None) => key -> "" } @@ -227,6 +228,7 @@ case class InsertIntoHiveTable( val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap) val updatedPartitionSpec = partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME case (key, Some(value)) => key -> value case (key, None) if caseInsensitiveDpMap.contains(key) => key -> caseInsensitiveDpMap(key)