Skip to content

Commit 41df7ed

Browse files
committed
[SPARK-33591][SQL] Recognize null in partition spec values
1. Recognize `null` while parsing partition specs, and put `null` instead of `"null"` as partition values. 2. For V1 catalog: replace `null` by `__HIVE_DEFAULT_PARTITION__`. 3. For V2 catalogs: pass `null` AS IS, and let catalog implementations to decide how to handle `null`s as partition values in spec. Currently, `null` in partition specs is recognized as the `"null"` string which could lead to incorrect results, for example: ```sql spark-sql> CREATE TABLE tbl5 (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1); spark-sql> INSERT INTO TABLE tbl5 PARTITION (p1 = null) SELECT 0; spark-sql> SELECT isnull(p1) FROM tbl5; false ``` Even we inserted a row to the partition with the `null` value, **the resulted table doesn't contain `null`**. Yes. After the changes, the example above works as expected: ```sql spark-sql> SELECT isnull(p1) FROM tbl5; true ``` 1. By running the affected test suites `SQLQuerySuite`, `AlterTablePartitionV2SQLSuite` and `v1/ShowPartitionsSuite`. 2. Compiling by Scala 2.13: ``` $ ./dev/change-scala-version.sh 2.13 $ ./build/sbt -Pscala-2.13 compile ``` Closes apache#30538 from MaxGekk/partition-spec-value-null. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 157b72a) Signed-off-by: Max Gekk <[email protected]>
1 parent c8bf22e commit 41df7ed

File tree

11 files changed

+84
-18
lines changed

11 files changed

+84
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ object ExternalCatalogUtils {
161161
}
162162
}
163163

164+
private def isNullPartitionValue(value: String): Boolean = {
165+
value == null || value == DEFAULT_PARTITION_NAME
166+
}
167+
164168
/**
165169
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
166170
* partial partition spec w.r.t. PARTITION (a=1,b=2).
@@ -169,9 +173,15 @@ object ExternalCatalogUtils {
169173
spec1: TablePartitionSpec,
170174
spec2: TablePartitionSpec): Boolean = {
171175
spec1.forall {
176+
case (partitionColumn, value) if isNullPartitionValue(value) =>
177+
isNullPartitionValue(spec2(partitionColumn))
172178
case (partitionColumn, value) => spec2(partitionColumn) == value
173179
}
174180
}
181+
182+
def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = {
183+
spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap
184+
}
175185
}
176186

177187
object CatalogUtils {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,12 @@ class InMemoryCatalog(
541541

542542
listPartitions(db, table, partialSpec).map { partition =>
543543
partitionColumnNames.map { name =>
544-
escapePathName(name) + "=" + escapePathName(partition.spec(name))
544+
val partValue = if (partition.spec(name) == null) {
545+
DEFAULT_PARTITION_NAME
546+
} else {
547+
escapePathName(partition.spec(name))
548+
}
549+
escapePathName(name) + "=" + partValue
545550
}.mkString("/")
546551
}.sorted
547552
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ class SessionCatalog(
11851185
*/
11861186
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
11871187
specs.foreach { s =>
1188-
if (s.values.exists(_.isEmpty)) {
1188+
if (s.values.exists(v => v != null && v.isEmpty)) {
11891189
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
11901190
throw new AnalysisException(
11911191
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
511511
*/
512512
protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) {
513513
ctx match {
514+
case _: NullLiteralContext => null
514515
case s: StringLiteralContext => createString(s)
515516
case o => o.getText
516517
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
406406
catalogTable.get.tracksPartitionsInCatalog
407407
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
408408
// empty partition column value
409-
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
409+
if (normalizedPartSpec.map(_._2)
410+
.filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) {
410411
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
411412
throw new AnalysisException(
412413
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3773,6 +3773,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
37733773
}
37743774
}
37753775
}
3776+
3777+
test("SPARK-33591: null as a partition value") {
3778+
val t = "part_table"
3779+
withTable(t) {
3780+
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
3781+
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
3782+
checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
3783+
}
3784+
}
37763785
}
37773786

37783787
case class Foo(bar: Option[String])

sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,4 +281,19 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
281281
}
282282
}
283283
}
284+
285+
test("SPARK-33591: null as a partition value") {
286+
val t = "testpart.ns1.ns2.tbl"
287+
withTable(t) {
288+
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING foo PARTITIONED BY (p1)")
289+
sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
290+
291+
val partTable = catalog("testpart").asTableCatalog
292+
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
293+
.asPartitionable
294+
assert(partTable.partitionExists(InternalRow(null)))
295+
sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
296+
assert(!partTable.partitionExists(InternalRow(null)))
297+
}
298+
}
284299
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,6 +1730,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
17301730
// use int literal as partition value for int type partition column
17311731
sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")
17321732
assert(catalog.listPartitions(tableIdent).isEmpty)
1733+
1734+
// null partition values
1735+
createTablePartition(catalog, Map("a" -> null, "b" -> null), tableIdent)
1736+
val nullPartValue = if (isUsingHiveMetastore) "__HIVE_DEFAULT_PARTITION__" else null
1737+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
1738+
Set(Map("a" -> nullPartValue, "b" -> nullPartValue)))
1739+
sql("ALTER TABLE tab1 DROP PARTITION (a = null, b = null)")
1740+
assert(catalog.listPartitions(tableIdent).isEmpty)
17331741
}
17341742

17351743
protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
6767
assert(errMsg.contains("'SHOW PARTITIONS' expects a table"))
6868
}
6969
}
70+
71+
test("SPARK-33591: null as a partition value") {
72+
val t = "part_table"
73+
withTable(t) {
74+
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
75+
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
76+
checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__"))
77+
checkAnswer(
78+
sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
79+
Row("p1=__HIVE_DEFAULT_PARTITION__"))
80+
}
81+
}
7082
}
7183

7284
class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSession {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -950,9 +950,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
950950
// Hive metastore is not case preserving and the partition columns are always lower cased. We need
951951
// to lower case the column names in partition specification before calling partition related Hive
952952
// APIs, to match this behaviour.
953-
private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
953+
private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
954954
// scalastyle:off caselocale
955-
spec.map { case (k, v) => k.toLowerCase -> v }
955+
val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
956+
ExternalCatalogUtils.convertNullPartitionValues(lowNames)
956957
// scalastyle:on caselocale
957958
}
958959

@@ -1001,8 +1002,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10011002
}
10021003
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
10031004
}
1004-
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
1005-
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
1005+
val metaStoreParts = partsWithLocation
1006+
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
1007+
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
10061008
}
10071009

10081010
override def dropPartitions(
@@ -1014,7 +1016,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10141016
retainData: Boolean): Unit = withClient {
10151017
requireTableExists(db, table)
10161018
client.dropPartitions(
1017-
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
1019+
db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData)
10181020
}
10191021

10201022
override def renamePartitions(
@@ -1023,7 +1025,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10231025
specs: Seq[TablePartitionSpec],
10241026
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
10251027
client.renamePartitions(
1026-
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
1028+
db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec))
10271029

10281030
val tableMeta = getTable(db, table)
10291031
val partitionColumnNames = tableMeta.partitionColumnNames
@@ -1039,7 +1041,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10391041
val fs = tablePath.getFileSystem(hadoopConf)
10401042
val newParts = newSpecs.map { spec =>
10411043
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
1042-
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
1044+
val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
10431045
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
10441046
}
10451047
alterPartitions(db, table, newParts)
@@ -1149,12 +1151,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
11491151
db: String,
11501152
table: String,
11511153
newParts: Seq[CatalogTablePartition]): Unit = withClient {
1152-
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
1154+
val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
11531155

11541156
val rawTable = getRawTable(db, table)
11551157

11561158
// convert partition statistics to properties so that we can persist them through hive api
1157-
val withStatsProps = lowerCasedParts.map { p =>
1159+
val withStatsProps = metaStoreParts.map { p =>
11581160
if (p.stats.isDefined) {
11591161
val statsProperties = statsToProperties(p.stats.get)
11601162
p.copy(parameters = p.parameters ++ statsProperties)
@@ -1170,7 +1172,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
11701172
db: String,
11711173
table: String,
11721174
spec: TablePartitionSpec): CatalogTablePartition = withClient {
1173-
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
1175+
val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
11741176
restorePartitionMetadata(part, getTable(db, table))
11751177
}
11761178

@@ -1208,7 +1210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
12081210
db: String,
12091211
table: String,
12101212
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
1211-
client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
1213+
client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part =>
12121214
restorePartitionMetadata(part, getTable(db, table))
12131215
}
12141216
}
@@ -1223,7 +1225,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
12231225
val catalogTable = getTable(db, table)
12241226
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
12251227
val clientPartitionNames =
1226-
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
1228+
client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec))
12271229
clientPartitionNames.map { partitionPath =>
12281230
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
12291231
partSpec.map { case (partName, partValue) =>
@@ -1242,11 +1244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
12421244
table: String,
12431245
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
12441246
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
1245-
val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
1246-
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
1247+
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
1248+
val res = client.getPartitions(db, table, metaStoreSpec)
1249+
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
12471250
}
12481251

1249-
partialSpec match {
1252+
metaStoreSpec match {
12501253
// This might be a bug of Hive: When the partition value inside the partial partition spec
12511254
// contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
12521255
// treats dot as matching any single character and may return more partitions than we

0 commit comments

Comments
 (0)