diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6ce24bc3b90d..c6c9d421d208 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1025,6 +1025,11 @@ "Cannot safely cast to ." ] }, + "EXTRA_COLUMNS" : { + "message" : [ + "Cannot write extra columns ." + ] + }, "EXTRA_STRUCT_FIELDS" : { "message" : [ "Cannot write extra fields to the struct ." diff --git a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md index f70b69ba6c5b..0dd28e9d55c5 100644 --- a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md +++ b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md @@ -37,6 +37,10 @@ Cannot find data for the output column ``. Cannot safely cast `` `` to ``. +## EXTRA_COLUMNS + +Cannot write extra columns ``. + ## EXTRA_STRUCT_FIELDS Cannot write extra fields `` to the struct ``. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala index 8b120095bc60..ad89005a093e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala @@ -36,10 +36,10 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] { if (i.userSpecifiedCols.size != i.query.output.size) { if (i.userSpecifiedCols.size > i.query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tblName, i.userSpecifiedCols, i.query) + tblName, i.userSpecifiedCols, i.query.output) } else { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( - tblName, i.userSpecifiedCols, i.query) + tblName, i.userSpecifiedCols, i.query.output) } } val projectByName = i.userSpecifiedCols.zip(i.query.output) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 51f275f50dc5..ddb17d6c43e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -87,7 +87,7 @@ object TableOutputResolver { if (actualExpectedCols.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) + tableName, actualExpectedCols.map(_.name), query.output) } val errors = new mutable.ArrayBuffer[String]() @@ -115,7 +115,7 @@ object TableOutputResolver { } if (actualExpectedCols.size > queryOutputCols.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) + tableName, actualExpectedCols.map(_.name), query.output) } resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _) @@ -278,9 +278,13 @@ object TableOutputResolver { if (matchedCols.size < inputCols.length) { val extraCols = inputCols.filterNot(col => matchedCols.contains(col.name)) .map(col => s"${toSQLId(col.name)}").mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( - tableName, colPath.quoted, extraCols - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.incompatibleDataToTableExtraColumnsError(tableName, + extraCols) + } else { + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraCols) + } } else { reordered } @@ -301,16 +305,26 @@ object TableOutputResolver { val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size) .map(col => toSQLId(col.name)) .mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( - tableName, colPath.quoted, extraColsStr - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName, + expectedCols.map(_.name), inputCols.map(_.toAttribute)) + } else { + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraColsStr + ) + } } else if (inputCols.size < expectedCols.size) { val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size) .map(col => toSQLId(col.name)) .mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( - tableName, colPath.quoted, missingColsStr - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(tableName, + expectedCols.map(_.name), inputCols.map(_.toAttribute)) + } else { + throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( + tableName, colPath.quoted, missingColsStr + ) + } } inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 52ccba8541de..32688c8641e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2148,25 +2148,25 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def cannotWriteTooManyColumnsToTableError( tableName: String, expected: Seq[String], - query: LogicalPlan): Throwable = { + queryOutput: Seq[Attribute]): Throwable = { new AnalysisException( errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", messageParameters = Map( "tableName" -> toSQLId(tableName), "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), - "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) + "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", "))) } def cannotWriteNotEnoughColumnsToTableError( tableName: String, expected: Seq[String], - query: LogicalPlan): Throwable = { + queryOutput: Seq[Attribute]): Throwable = { new AnalysisException( errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", messageParameters = Map( "tableName" -> toSQLId(tableName), "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), - "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) + "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", "))) } def incompatibleDataToTableCannotFindDataError( @@ -2191,6 +2191,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } + def incompatibleDataToTableExtraColumnsError( + tableName: String, extraColumns: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "extraColumns" -> extraColumns + ) + ) + } + def incompatibleDataToTableExtraStructFieldsError( tableName: String, colName: String, extraFields: String): Throwable = { new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 0bbed51d0a90..34e4ded09b5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -213,9 +213,10 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { exception = intercept[AnalysisException] { processInsert("t1", df, overwrite = false, byName = true) }, - v1ErrorClass = "_LEGACY_ERROR_TEMP_1186", + v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS", v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - v1Parameters = Map.empty[String, String], + v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`", + "extraColumns" -> "`x1`"), v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`") ) val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*)