From 6741c3e1b8fd99739f48ee4cda919f1e3eca51d0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 3 Mar 2021 22:01:05 +0800 Subject: [PATCH] Static partition should also follow StoreAssignmentPolicy when insert into v2 tables --- .../sql/catalyst/analysis/Analyzer.scala | 6 +++- .../apache/spark/sql/SQLInsertTestSuite.scala | 29 ++++++++++++++++++- .../spark/sql/sources/InsertSuite.scala | 21 -------------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 282cb37d514f4..ed0d0fb4e1192 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1371,7 +1371,11 @@ class Analyzer(override val catalogManager: CatalogManager) relation.output.flatMap { col => outputNameToStaticName.get(col.name).flatMap(staticPartitions.get) match { case Some(staticValue) => - Some(Alias(Cast(Literal(staticValue), col.dataType), col.name)()) + // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition + // values but not completely follow because we can't do static type checking due to + // the reason that the parser has erased the type info of static partition values + // and converted them to string. + Some(Alias(AnsiCast(Literal(staticValue), col.dataType), col.name)()) case _ if queryColumns.hasNext => Some(queryColumns.next) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index e248f63355f1b..92a9dc520f455 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.unsafe.types.UTF8String /** - * The base trait for DML - insert syntax + * The base trait for SQL INSERT. */ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { @@ -278,6 +278,33 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") { + val testingPolicies = if (format == "foo") { + // DS v2 doesn't support the legacy policy + Seq(SQLConf.StoreAssignmentPolicy.ANSI, SQLConf.StoreAssignmentPolicy.STRICT) + } else { + SQLConf.StoreAssignmentPolicy.values + } + testingPolicies.foreach { policy => + withSQLConf( + SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) { + withTable("t") { + sql("create table t(a int, b string) using parquet partitioned by (a)") + policy match { + case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT => + val errorMsg = intercept[NumberFormatException] { + sql("insert into t partition(a='ansi') values('ansi')") + }.getMessage + assert(errorMsg.contains("invalid input syntax for type numeric: ansi")) + case SQLConf.StoreAssignmentPolicy.LEGACY => + sql("insert into t partition(a='ansi') values('ansi')") + checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil) + } + } + } + } + } } class FileSourceSQLInsertTestSuite extends SQLInsertTestSuite with SharedSparkSession { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index bfd04ffaaf754..62fa4e6960607 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -797,27 +797,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } - test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") { - SQLConf.StoreAssignmentPolicy.values.foreach { policy => - withSQLConf( - SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) { - withTable("t") { - sql("create table t(a int, b string) using parquet partitioned by (a)") - policy match { - case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT => - val errorMsg = intercept[NumberFormatException] { - sql("insert into t partition(a='ansi') values('ansi')") - }.getMessage - assert(errorMsg.contains("invalid input syntax for type numeric: ansi")) - case SQLConf.StoreAssignmentPolicy.LEGACY => - sql("insert into t partition(a='ansi') values('ansi')") - checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil) - } - } - } - } - } - test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") { withTempPath { path => Seq((1, 1), (2, 2)).toDF("i", "part")