diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 51f275f50dc5..21575f7b96be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -103,22 +103,11 @@ object TableOutputResolver { errors += _, fillDefaultValue = supportColDefaultValue) } else { - // If the target table needs more columns than the input query, fill them with - // the columns' default values, if the `supportColDefaultValue` parameter is true. - val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size - val queryOutputCols = if (fillDefaultValue) { - query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol => - getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) - } - } else { - query.output - } - if (actualExpectedCols.size > queryOutputCols.size) { + if (actualExpectedCols.size > query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( tableName, actualExpectedCols.map(_.name), query) } - - resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _) + resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _) } if (errors.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4cbd54e6d209..f9b3f73ff022 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -404,7 +404,11 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } val newQuery = try { TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf, + tblName, + expectedColumns, + query, + byName = hasColumnList || insert.byName, + conf, supportColDefaultValue = true) } catch { case e: AnalysisException if staticPartCols.nonEmpty && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala index b2cc4e3b746a..29b2796d25aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala @@ -35,9 +35,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT without user-defined columns sql("truncate table t") - sql("insert into t values (timestamp'2020-12-31')") - checkAnswer(spark.table("t"), - sql("select timestamp'2020-12-31', null").collect().head) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (timestamp'2020-12-31')") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`col1`")) } } @@ -57,9 +63,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT without user-defined columns sql("truncate table t") - sql("insert into t values (timestamp'2020-12-31')") - checkAnswer(spark.table("t"), - sql("select timestamp'2020-12-31', timestamp'2020-01-01'").collect().head) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (timestamp'2020-12-31')") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`col1`")) } } @@ -67,8 +79,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { sql("create table t(c1 int, c2 int, c3 int, c4 int) using parquet partitioned by (c3, c4)") // INSERT without static partitions - sql("insert into t values (1, 2, 3)") - checkAnswer(spark.table("t"), Row(1, 2, 3, null)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (1, 2, 3)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`, `col2`, `col3`")) // INSERT without static partitions but with column list sql("truncate table t") @@ -77,8 +96,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT with static partitions sql("truncate table t") - sql("insert into t partition(c3=3, c4=4) values (1)") - checkAnswer(spark.table("t"), Row(1, null, 3, 4)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(c3=3, c4=4) values (1)") + }, + errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`", + "staticPartCols" -> "`c3`, `c4`")) // INSERT with static partitions and with column list sql("truncate table t") @@ -87,8 +114,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT with partial static partitions sql("truncate table t") - sql("insert into t partition(c3=3, c4) values (1, 2)") - checkAnswer(spark.table("t"), Row(1, 2, 3, null)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(c3=3, c4) values (1, 2)") + }, + errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`, `col2`", + "staticPartCols" -> "`c3`")) // INSERT with partial static partitions and with column list is not allowed intercept[AnalysisException](sql("insert into t partition(c3=3, c4) (c1) values (1, 4)")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index ff82d178c34a..cf1f4d4d4f28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -962,11 +962,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { (1 to 10).map(i => Row(i, null)) ) - sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") - checkAnswer( - sql("SELECT a, b FROM jsonTable"), - (1 to 10).map(i => Row(i, null)) - ) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`unknown`", + "tableColumns" -> "`a`, `b`", + "dataColumns" -> "`a`")) sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt") checkAnswer( @@ -1027,7 +1031,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } withTable("t") { sql("create table t(i int, s bigint default 42, x bigint) using parquet") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42L, null)) } // The table has a partitioning column and a default value is injected. @@ -1495,7 +1499,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql(createTableIntCol) sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, null)) } // The table has a partitioning column and a default value is injected. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 420b4fc83ec9..ea43f1d2c672 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -391,7 +391,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12") // The data is missing a column. The default value for the missing column is null. - sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13") + sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) (a) SELECT 13") // c is defined twice. Analyzer will complain. intercept[ParseException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4eae3933bf51..82b88ec9f35d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1258,11 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd """INSERT INTO TABLE dp_test PARTITION(dp) |SELECT key, value, key % 5 FROM src""".stripMargin) }, - errorClass = "_LEGACY_ERROR_TEMP_1169", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`dp_test`", - "normalizedPartSpec" -> "dp", - "partColNames" -> "dp,sp")) + "tableColumns" -> "`key`, `value`, `dp`, `sp`", + "dataColumns" -> "`key`, `value`, `(key % 5)`")) sql("SET hive.exec.dynamic.partition.mode=nonstrict")