Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ license: |
- Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f).
- Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`.
- Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`.
- Since Spark 3.4, Spark will do validation for partition spec in ALTER PARTITION to follow the behavior of `spark.sql.storeAssignmentPolicy` which may cause an exception if type conversion fails, e.g. `ALTER TABLE .. ADD PARTITION(p='a')` if column `p` is int type. To restore the legacy behavior, set `spark.sql.legacy.skipPartitionSpecTypeValidation` to `true`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conf should be spark.sql.legacy.skipTypeValidationOnAlterPartition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just made a PR :-) #38667


## Upgrading from Spark SQL 3.2 to 3.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2995,6 +2995,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION =
buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition")
.internal()
.doc("When true, skip validation for partition spec in ALTER PARTITION. E.g., " +
"`ALTER TABLE .. ADD PARTITION(p='a')` would work even the partition type is int. " +
s"When false, the behavior follows ${STORE_ASSIGNMENT_POLICY.key}")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val SORT_BEFORE_REPARTITION =
buildConf("spark.sql.execution.sortBeforeRepartition")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,47 @@ package org.apache.spark.sql.util
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{CharType, StructType, VarcharType}
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.{CharType, DataType, StringType, StructField, StructType, VarcharType}
import org.apache.spark.unsafe.types.UTF8String

private[sql] object PartitioningUtils {

def castPartitionSpec(value: String, dt: DataType, conf: SQLConf): Expression = {
conf.storeAssignmentPolicy match {
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
val cast = Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone),
ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
cast
case _ =>
Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
}
}

private def normalizePartitionStringValue(value: String, field: StructField): String = {
val casted = Cast(
castPartitionSpec(value, field.dataType, SQLConf.get),
StringType,
Option(SQLConf.get.sessionLocalTimeZone)
).eval()
if (casted != null) {
casted.asInstanceOf[UTF8String].toString
} else {
null
}
}

/**
* Normalize the column names in partition specification, w.r.t. the real partition column names
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
Expand Down Expand Up @@ -61,6 +94,14 @@ private[sql] object PartitioningUtils {
case other => other
}
v.asInstanceOf[T]
case _ if !SQLConf.get.getConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION) &&
value != null && value != DEFAULT_PARTITION_NAME =>
val v = value match {
case Some(str: String) => Some(normalizePartitionStringValue(str, normalizedFiled))
case str: String => normalizePartitionStringValue(str, normalizedFiled)
case other => other
}
v.asInstanceOf[T]
case _ => value
}
normalizedFiled.name -> normalizedVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -106,22 +106,8 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
conf.storeAssignmentPolicy match {
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
val cast = Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone),
ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
Some(Alias(cast, field.name)())
case _ =>
val castExpression =
Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
Some(Alias(castExpression, field.name)())
}
Some(Alias(CatalystPartitioningUtils.castPartitionSpec(
partValue, field.dataType, conf), field.name)())
} else {
throw QueryCompilationErrors.multiplePartitionColumnValuesSpecifiedError(
field, potentialSpecs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command

import java.time.{Duration, Period}

import org.apache.spark.SparkNumberFormatException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand All @@ -40,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE .. ADD PARTITION"
def defaultPartitionName: String

test("one partition") {
withNamespaceAndTable("ns", "tbl") { t =>
Expand Down Expand Up @@ -213,4 +215,46 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils
Row(Period.ofYears(1), Duration.ofDays(-1), "bbb")))
}
}

test("SPARK-40798: Alter partition should verify partition value") {
def shouldThrowException(policy: SQLConf.StoreAssignmentPolicy.Value): Boolean = policy match {
case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
true
case SQLConf.StoreAssignmentPolicy.LEGACY =>
false
}

SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
if (shouldThrowException(policy)) {
checkError(
exception = intercept[SparkNumberFormatException] {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
},
errorClass = "CAST_INVALID_INPUT",
parameters = Map(
"ansiConfig" -> "\"spark.sql.ansi.enabled\"",
"expression" -> "'aaa'",
"sourceType" -> "\"STRING\"",
"targetType" -> "\"INT\""),
context = ExpectedContext(
fragment = s"ALTER TABLE $t ADD PARTITION (p='aaa')",
start = 0,
stop = 35 + t.length))
} else {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}

sql(s"ALTER TABLE $t ADD PARTITION (p=null)")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v1

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -33,6 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
* `org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite`
*/
trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase {
override def defaultPartitionName: String = DEFAULT_PARTITION_NAME

test("empty string as partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
Expand Down Expand Up @@ -157,6 +160,18 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}

test("SPARK-40798: Alter partition should verify partition value - legacy") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> "aaa"))
sql(s"ALTER TABLE $t DROP PARTITION (p='aaa')")
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.v2
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf

/**
* The class contains tests for the `ALTER TABLE .. ADD PARTITION` command
Expand All @@ -28,6 +29,8 @@ import org.apache.spark.sql.execution.command
class AlterTableAddPartitionSuite
extends command.AlterTableAddPartitionSuiteBase
with CommandSuiteBase {
override def defaultPartitionName: String = "null"

test("SPARK-33650: add partition into a table which doesn't support partition management") {
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
Expand Down Expand Up @@ -121,4 +124,16 @@ class AlterTableAddPartitionSuite
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}

test("SPARK-40798: Alter partition should verify partition value - legacy") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ class HiveDDLSuite
}

test("SPARK-19129: drop partition with a empty string will drop the whole table") {
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
val df = spark.createDataFrame(Seq(("0", "a"), ("1", "b"))).toDF("partCol1", "name")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change partition column from int type to string to make the test work

df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
assertAnalysisError(
"alter table partitionedTable drop partition(partCol1='')",
Expand Down