Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,15 @@ object ExternalCatalogUtils {
spec1: TablePartitionSpec,
spec2: TablePartitionSpec): Boolean = {
spec1.forall {
case (partitionColumn, null | DEFAULT_PARTITION_NAME) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a util function isInvalidPartitionValue? then the code can be

case (partitionColumn, value) if isInvalidPartitionValue(value) =>
  isInvalidPartitionValue(spec2(partitionColumn))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we hit empty string partition value here?

Copy link
Member Author

@MaxGekk MaxGekk Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a util function isInvalidPartitionValue?

  1. Why are partition values invalid? They are still valid here
  2. Where else will the function be used. Since this is only the place, wouldn't be better to keep the code embedded here?

will we hit empty string partition value here?

Empty string is handling earlier. We cannot have it here. For example, SessionCatalog.createPartitions -> requireNonEmptyValueInPartitionSpec which is called before externalCatalog.createPartitions where we convert null to __HIVE_DEFAULT_PARTITION__.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about isNullPartitionValue

spec2(partitionColumn) == null || spec2(partitionColumn) == DEFAULT_PARTITION_NAME
case (partitionColumn, value) => spec2(partitionColumn) == value
}
}

def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = {
spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v)
}
}

object CatalogUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ class SessionCatalog(
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
if (s.values.exists(v => v != null && v.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) {
ctx match {
case _: NullLiteralContext => null
case s: StringLiteralContext => createString(s)
case o => o.getText
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
catalogTable.get.tracksPartitionsInCatalog
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
if (normalizedPartSpec.map(_._2)
.filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3718,6 +3718,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
}
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,23 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
|""".stripMargin
sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
assert(partTable.partitionExists(expectedPartition))
sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)")
sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)")
assert(!partTable.partitionExists(expectedPartition))
}
}

test("SPARK-33591: null as a partition value") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
.asPartitionable
val expectedIdent = InternalRow.fromSeq(Seq(null))
assert(partTable.partitionExists(expectedIdent))
sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
assert(!partTable.partitionExists(expectedIdent))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
assert(errMsg.contains(s"Table or view '$viewName' not found"))
}
}

test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__"))
checkAnswer(
sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
Row("p1=__HIVE_DEFAULT_PARTITION__"))
}
}
}

class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,9 +950,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Hive metastore is not case preserving and the partition columns are always lower cased. We need
// to lower case the column names in partition specification before calling partition related Hive
// APIs, to match this behaviour.
private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
// scalastyle:off caselocale
spec.map { case (k, v) => k.toLowerCase -> v }
val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
ExternalCatalogUtils.convertNullPartitionValues(lowNames)
// scalastyle:on caselocale
}

Expand Down Expand Up @@ -1001,8 +1002,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
val metaStoreParts = partsWithLocation
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
}

override def dropPartitions(
Expand All @@ -1014,7 +1016,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
client.dropPartitions(
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData)
}

override def renamePartitions(
Expand All @@ -1023,7 +1025,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
client.renamePartitions(
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec))

val tableMeta = getTable(db, table)
val partitionColumnNames = tableMeta.partitionColumnNames
Expand All @@ -1039,7 +1041,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val fs = tablePath.getFileSystem(hadoopConf)
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
}
alterPartitions(db, table, newParts)
Expand Down Expand Up @@ -1149,12 +1151,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))

val rawTable = getRawTable(db, table)

// convert partition statistics to properties so that we can persist them through hive api
val withStatsProps = lowerCasedParts.map { p =>
val withStatsProps = metaStoreParts.map { p =>
if (p.stats.isDefined) {
val statsProperties = statsToProperties(p.stats.get)
p.copy(parameters = p.parameters ++ statsProperties)
Expand All @@ -1170,7 +1172,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
restorePartitionMetadata(part, getTable(db, table))
}

Expand Down Expand Up @@ -1208,7 +1210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part =>
restorePartitionMetadata(part, getTable(db, table))
}
}
Expand All @@ -1223,7 +1225,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec))
clientPartitionNames.map { partitionPath =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) =>
Expand All @@ -1242,11 +1244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
val res = client.getPartitions(db, table, metaStoreSpec)
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}

partialSpec match {
metaStoreSpec match {
// This might be a bug of Hive: When the partition value inside the partial partition spec
// contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
// treats dot as matching any single character and may return more partitions than we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ case class InsertIntoHiveTable(
val numDynamicPartitions = partition.values.count(_.isEmpty)
val numStaticPartitions = partition.values.count(_.nonEmpty)
val partitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about empty string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test:

  test("SPARK-33591: '' as a partition value") {
    val t = "part_table"
    withTable(t) {
      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
      sql(s"INSERT INTO TABLE $t PARTITION (p1 = '') SELECT 0")
    }
  }

fails with:

Partition spec is invalid. The spec ([p1=Some()]) contains an empty partition column value
org.apache.spark.sql.AnalysisException: Partition spec is invalid. The spec ([p1=Some()]) contains an empty partition column value
	at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:412)

at

// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {

case (key, Some(value)) => key -> value
case (key, None) => key -> ""
}
Expand Down Expand Up @@ -229,6 +230,7 @@ case class InsertIntoHiveTable(
val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)

val updatedPartitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
case (key, Some(value)) => key -> value
case (key, None) if caseInsensitiveDpMap.contains(key) =>
key -> caseInsensitiveDpMap(key)
Expand Down