diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index 404581445d0c..e87d9d5a5663 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -346,10 +346,14 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { connectTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")), sparkTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2"))) - val e = intercept[AnalysisException]( - transform(connectTestRelation.withColumnsRenamed( - Map("id" -> "duplicatedCol", "name" -> "duplicatedCol")))) - assert(e.getMessage.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + transform( + connectTestRelation.withColumnsRenamed( + Map("id" -> "duplicatedCol", "name" -> "duplicatedCol"))) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`duplicatedcol`")) } test("Writes fails without path or table") { diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index f4ebed68f49c..da62d30b81c4 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -94,6 +94,11 @@ ], "sqlState" : "22005" }, + "COLUMN_ALREADY_EXISTS" : { + "message" : [ + "The column already exists. Consider to choose another name or rename the existing column." + ] + }, "COLUMN_NOT_IN_GROUP_BY_CLAUSE" : { "message" : [ "The expression is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in `first()` (or `first_value()`) if you don't care which value you get." @@ -2843,11 +2848,6 @@ "Partition spec is invalid. The spec () must match the partition spec () defined in table ''." ] }, - "_LEGACY_ERROR_TEMP_1233" : { - "message" : [ - "Found duplicate column(s) : ." - ] - }, "_LEGACY_ERROR_TEMP_1234" : { "message" : [ "Temporary view is not cached for analyzing columns." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b84a03e77d6e..013e71b737e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3504,8 +3504,7 @@ class Analyzer(override val catalogManager: CatalogManager) } private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq[NamedExpression] = { - SchemaUtils.checkColumnNameDuplication( - i.userSpecifiedCols, "in the column list", resolver) + SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver) i.userSpecifiedCols.map { col => i.table.resolve(Seq(col), resolver).getOrElse { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 115b10144048..05835928de54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1294,7 +1294,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = { SchemaUtils.checkColumnNameDuplication( colsToAdd.map(_.name.quoted), - "in the user specified columns", alter.conf.resolver) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 6209e076573e..bdffce0b9af5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -196,11 +196,9 @@ object ResolveUnion extends Rule[LogicalPlan] { SchemaUtils.checkColumnNameDuplication( leftOutputAttrs.map(_.name), - "in the left attributes", caseSensitiveAnalysis) SchemaUtils.checkColumnNameDuplication( rightOutputAttrs.map(_.name), - "in the right attributes", caseSensitiveAnalysis) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e6ce12756ca7..db27d9064028 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2273,12 +2273,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "tableName" -> tableName)) } - def foundDuplicateColumnError(colType: String, duplicateCol: Seq[String]): Throwable = { + def columnAlreadyExistsError(columnName: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1233", - messageParameters = Map( - "colType" -> colType, - "duplicateCol" -> duplicateCol.sorted.mkString(", "))) + errorClass = "COLUMN_ALREADY_EXISTS", + messageParameters = Map("columnName" -> toSQLId(columnName))) } def noSuchTableError(db: String, table: String): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 87f140cb3c4a..35a30431616c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -107,8 +107,7 @@ private[sql] object PartitioningUtils { normalizedFiled.name -> normalizedVal } - SchemaUtils.checkColumnNameDuplication( - normalizedPartSpec.map(_._1), "in the partition schema", resolver) + SchemaUtils.checkColumnNameDuplication(normalizedPartSpec.map(_._1), resolver) normalizedPartSpec.toMap } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 63c1f1869d2b..aac96a9b56c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -39,24 +39,22 @@ private[spark] object SchemaUtils { * duplication exists. * * @param schema schema to check - * @param colType column type name, used in an exception message * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not */ def checkSchemaColumnNameDuplication( schema: DataType, - colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { schema match { case ArrayType(elementType, _) => - checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(elementType, caseSensitiveAnalysis) case MapType(keyType, valueType, _) => - checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis) - checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(keyType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(valueType, caseSensitiveAnalysis) case structType: StructType => val fields = structType.fields - checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis) + checkColumnNameDuplication(fields.map(_.name), caseSensitiveAnalysis) fields.foreach { field => - checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(field.dataType, caseSensitiveAnalysis) } case _ => } @@ -67,14 +65,10 @@ private[spark] object SchemaUtils { * duplication exists. * * @param schema schema to check - * @param colType column type name, used in an exception message * @param resolver resolver used to determine if two identifiers are equal */ - def checkSchemaColumnNameDuplication( - schema: StructType, - colType: String, - resolver: Resolver): Unit = { - checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver)) + def checkSchemaColumnNameDuplication(schema: StructType, resolver: Resolver): Unit = { + checkSchemaColumnNameDuplication(schema, isCaseSensitiveAnalysis(resolver)) } // Returns true if a given resolver is case-sensitive @@ -95,12 +89,10 @@ private[spark] object SchemaUtils { * the duplication exists. * * @param columnNames column names to check - * @param colType column type name, used in an exception message * @param resolver resolver used to determine if two identifiers are equal */ - def checkColumnNameDuplication( - columnNames: Seq[String], colType: String, resolver: Resolver): Unit = { - checkColumnNameDuplication(columnNames, colType, isCaseSensitiveAnalysis(resolver)) + def checkColumnNameDuplication(columnNames: Seq[String], resolver: Resolver): Unit = { + checkColumnNameDuplication(columnNames, isCaseSensitiveAnalysis(resolver)) } /** @@ -108,19 +100,17 @@ private[spark] object SchemaUtils { * the duplication exists. * * @param columnNames column names to check - * @param colType column type name, used in an exception message * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not */ - def checkColumnNameDuplication( - columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = { + def checkColumnNameDuplication(columnNames: Seq[String], caseSensitiveAnalysis: Boolean): Unit = { // scalastyle:off caselocale val names = if (caseSensitiveAnalysis) columnNames else columnNames.map(_.toLowerCase) // scalastyle:on caselocale if (names.distinct.length != names.length) { - val duplicateColumns = names.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => s"`$x`" - } - throw QueryCompilationErrors.foundDuplicateColumnError(colType, duplicateColumns.toSeq) + val columnName = names.groupBy(identity).collectFirst { + case (x, ys) if ys.length > 1 => x + }.get + throw QueryCompilationErrors.columnAlreadyExistsError(columnName) } } @@ -178,7 +168,7 @@ private[spark] object SchemaUtils { case b: BucketTransform => val colNames = b.columns.map(c => UnresolvedAttribute(c.fieldNames()).name) // We need to check that we're not duplicating columns within our bucketing transform - checkColumnNameDuplication(colNames, "in the bucket definition", isCaseSensitive) + checkColumnNameDuplication(colNames, isCaseSensitive) b.name -> colNames case NamedTransform(transformName, refs) => val fieldNameParts = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 75caab414593..c5f19b438f27 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -40,24 +40,27 @@ class SchemaUtilsSuite extends SparkFunSuite { val testType = if (caseSensitive) "case-sensitive" else "case-insensitive" test(s"Check column name duplication in $testType cases") { def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = { - val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " + duplicatedColumns.sorted.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ") val schema = StructType.fromDDL(schemaStr) - var msg = intercept[AnalysisException] { - SchemaUtils.checkSchemaColumnNameDuplication( - schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) - }.getMessage - assert(msg.contains(expectedErrorMsg)) - msg = intercept[AnalysisException] { - SchemaUtils.checkColumnNameDuplication( - schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive)) - }.getMessage - assert(msg.contains(expectedErrorMsg)) - msg = intercept[AnalysisException] { - SchemaUtils.checkColumnNameDuplication( - schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) - }.getMessage - assert(msg.contains(expectedErrorMsg)) + checkError( + exception = intercept[AnalysisException] { + SchemaUtils.checkSchemaColumnNameDuplication(schema, caseSensitive) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`a`")) + checkError( + exception = intercept[AnalysisException] { + SchemaUtils.checkColumnNameDuplication(schema.map(_.name), resolver(caseSensitive)) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`a`")) + checkError( + exception = intercept[AnalysisException] { + SchemaUtils.checkColumnNameDuplication( + schema.map(_.name), caseSensitiveAnalysis = caseSensitive) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`a`")) } checkExceptionCases(s"$a0 INT, b INT, $a1 INT", a0 :: Nil) @@ -70,11 +73,11 @@ class SchemaUtilsSuite extends SparkFunSuite { def checkNoExceptionCases(schemaStr: String, caseSensitive: Boolean): Unit = { val schema = StructType.fromDDL(schemaStr) SchemaUtils.checkSchemaColumnNameDuplication( - schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) + schema, caseSensitiveAnalysis = caseSensitive) SchemaUtils.checkColumnNameDuplication( - schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive)) + schema.map(_.name), resolver(caseSensitive)) SchemaUtils.checkColumnNameDuplication( - schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) + schema.map(_.name), caseSensitiveAnalysis = caseSensitive) } checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = true) @@ -99,11 +102,12 @@ class SchemaUtilsSuite extends SparkFunSuite { val schemaE = MapType(LongType, schemaD) val schemaF = MapType(schemaD, LongType) Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema => - val msg = intercept[AnalysisException] { - SchemaUtils.checkSchemaColumnNameDuplication( - schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false) - }.getMessage - assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`")) + checkError( + exception = intercept[AnalysisException] { + SchemaUtils.checkSchemaColumnNameDuplication(schema) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`camelcase`")) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5215858a9fd5..5f6512d4e4b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2755,7 +2755,6 @@ class Dataset[T] private[sql]( s"the size of columns: ${cols.size}") SchemaUtils.checkColumnNameDuplication( colNames, - "in given column names", sparkSession.sessionState.conf.caseSensitiveAnalysis) val resolver = sparkSession.sessionState.analyzer.resolver @@ -2855,7 +2854,6 @@ class Dataset[T] private[sql]( } SchemaUtils.checkColumnNameDuplication( projectList.map(_.name), - "in given column names for withColumnsRenamed", sparkSession.sessionState.conf.caseSensitiveAnalysis) withPlan(Project(projectList, logicalPlan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala index 216636c7ea14..4c8d434c9a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala @@ -30,7 +30,7 @@ object CommandCheck extends (LogicalPlan => Unit) with SQLConfHelper { plan.foreach { case AnalyzeColumnCommand(_, colsOpt, allColumns) if !allColumns => colsOpt.foreach(SchemaUtils.checkColumnNameDuplication( - _, "in analyze columns.", conf.caseSensitiveAnalysis)) + _, conf.caseSensitiveAnalysis)) case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ccdce5ec0ce2..e6fac8a8774a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -238,7 +238,6 @@ case class AlterTableAddColumnsCommand( SchemaUtils.checkColumnNameDuplication( (colsWithProcessedDefaults ++ catalogTable.schema).map(_.name), - "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3a2e32cfdc7a..3ad98fa0d0c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -470,8 +470,7 @@ object ViewHelper extends SQLConfHelper with Logging { // Generate the query column names, throw an AnalysisException if there exists duplicate column // names. - SchemaUtils.checkColumnNameDuplication( - fieldNames, "in the view definition", conf.resolver) + SchemaUtils.checkColumnNameDuplication(fieldNames, conf.resolver) // Generate the view default catalog and namespace, as well as captured SQL configs. val manager = session.sessionState.catalogManager diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d50fd88f65c2..b2bc7301ade8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -130,10 +130,8 @@ case class DataSource( } bucketSpec.foreach { bucket => - SchemaUtils.checkColumnNameDuplication( - bucket.bucketColumnNames, "in the bucket definition", equality) - SchemaUtils.checkColumnNameDuplication( - bucket.sortColumnNames, "in the sort definition", equality) + SchemaUtils.checkColumnNameDuplication(bucket.bucketColumnNames, equality) + SchemaUtils.checkColumnNameDuplication(bucket.sortColumnNames, equality) } /** @@ -220,7 +218,6 @@ case class DataSource( try { SchemaUtils.checkColumnNameDuplication( (dataSchema ++ partitionSchema).map(_.name), - "in the data schema and the partition schema", equality) } catch { case e: AnalysisException => logWarning(e.getMessage) @@ -428,17 +425,14 @@ case class DataSource( case hs: HadoopFsRelation => SchemaUtils.checkSchemaColumnNameDuplication( hs.dataSchema, - "in the data schema", equality) SchemaUtils.checkSchemaColumnNameDuplication( hs.partitionSchema, - "in the partition schema", equality) DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema) case _ => SchemaUtils.checkSchemaColumnNameDuplication( relation.schema, - "in the data schema", equality) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 41b55a3b6e93..fe6ec094812e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -82,7 +82,6 @@ case class InsertIntoHadoopFsRelationCommand( // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( outputColumnNames, - s"when inserting into $outputPath", sparkSession.sessionState.conf.caseSensitiveAnalysis) val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index d4989606927b..4f43c1305259 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -558,8 +558,7 @@ object PartitioningUtils extends SQLConfHelper { partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { - SchemaUtils.checkColumnNameDuplication( - partitionColumns, partitionColumns.mkString(", "), caseSensitive) + SchemaUtils.checkColumnNameDuplication(partitionColumns, caseSensitive) partitionColumnsSchema(schema, partitionColumns).foreach { field => field.dataType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 60ecd2ff60b3..76599c53db9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -823,8 +823,7 @@ object JdbcUtils extends Logging with SQLConfHelper { val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes) // checks duplicate columns in the user specified column types. - SchemaUtils.checkColumnNameDuplication( - userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver) + SchemaUtils.checkColumnNameDuplication(userSchema.map(_.name), conf.resolver) // checks if user specified column names exist in the DataFrame schema userSchema.fieldNames.foreach { col => @@ -849,10 +848,7 @@ object JdbcUtils extends Logging with SQLConfHelper { if (null != customSchema && customSchema.nonEmpty) { val userSchema = CatalystSqlParser.parseTableSchema(customSchema) - SchemaUtils.checkSchemaColumnNameDuplication( - userSchema, - "in the customSchema option value", - nameEquality) + SchemaUtils.checkSchemaColumnNameDuplication(userSchema, nameEquality) // This is resolved by names, use the custom filed dataType to replace the default dataType. val newSchema = tableSchema.map { col => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4a0541c04ca4..dc4fed49c1cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -250,7 +250,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema) SchemaUtils.checkColumnNameDuplication( flattenedSchema, - s"in the table definition of $identifier", isCaseSensitive) // Check that columns are not duplicated in the partitioning statement @@ -289,7 +288,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable = { SchemaUtils.checkSchemaColumnNameDuplication( schema, - "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) val normalizedPartCols = normalizePartitionColumns(schema, table) @@ -316,10 +314,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi partCols = table.partitionColumnNames, resolver = conf.resolver) - SchemaUtils.checkColumnNameDuplication( - normalizedPartitionCols, - "in the partition schema", - conf.resolver) + SchemaUtils.checkColumnNameDuplication(normalizedPartitionCols, conf.resolver) if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { failAnalysis("Cannot use all columns for partition columns") @@ -344,11 +339,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi SchemaUtils.checkColumnNameDuplication( normalizedBucketSpec.bucketColumnNames, - "in the bucket definition", conf.resolver) SchemaUtils.checkColumnNameDuplication( normalizedBucketSpec.sortColumnNames, - "in the sort definition", conf.resolver) normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 29a2e1be7acd..91e6ef70c760 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -79,16 +79,14 @@ abstract class FileTable( override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - SchemaUtils.checkSchemaColumnNameDuplication(dataSchema, - "in the data schema", caseSensitive) + SchemaUtils.checkSchemaColumnNameDuplication(dataSchema, caseSensitive) dataSchema.foreach { field => if (!supportsDataType(field.dataType)) { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field) } } val partitionSchema = fileIndex.partitionSchema - SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema, - "in the partition schema", caseSensitive) + SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema, caseSensitive) val partitionNameSet: Set[String] = partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index ccc467aae1f1..e65ff13ba228 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -89,8 +89,7 @@ trait FileWrite extends Write { s"got: ${paths.mkString(", ")}") } val pathName = paths.head - SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name), - s"when inserting into $pathName", caseSensitiveAnalysis) + SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name), caseSensitiveAnalysis) DataSource.validateSchema(schema) // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 2d74a8936151..dda9390f4b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} +import java.util.Locale import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion import org.apache.spark.sql.catalyst.plans.logical.Union @@ -585,16 +586,20 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { var df1 = Seq((1, 1)).toDF(c0, c1) var df2 = Seq((1, 1)).toDF("c0", "c1") - var errMsg = intercept[AnalysisException] { - df1.unionByName(df2) - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the left attributes:")) + checkError( + exception = intercept[AnalysisException] { + df1.unionByName(df2) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) df1 = Seq((1, 1)).toDF("c0", "c1") df2 = Seq((1, 1)).toDF(c0, c1) - errMsg = intercept[AnalysisException] { - df1.unionByName(df2) - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the right attributes:")) + checkError( + exception = intercept[AnalysisException] { + df1.unionByName(df2) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6be1e424719e..aab680653192 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -688,11 +688,13 @@ class DataFrameSuite extends QueryTest assert( err.getMessage.contains("The size of column names: 1 isn't equal to the size of columns: 2")) - val err2 = intercept[AnalysisException] { - testData.toDF().withColumns(Seq("newCol1", "newCOL1"), - Seq(col("key") + 1, col("key") + 2)) - } - assert(err2.getMessage.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + testData.toDF().withColumns(Seq("newCol1", "newCOL1"), + Seq(col("key") + 1, col("key") + 2)) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`newcol1`")) } test("withColumns: internal method, case sensitive") { @@ -706,11 +708,13 @@ class DataFrameSuite extends QueryTest }.toSeq) assert(df.schema.map(_.name) === Seq("key", "value", "newCol1", "newCOL1")) - val err = intercept[AnalysisException] { - testData.toDF().withColumns(Seq("newCol1", "newCol1"), - Seq(col("key") + 1, col("key") + 2)) - } - assert(err.getMessage.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + testData.toDF().withColumns(Seq("newCol1", "newCol1"), + Seq(col("key") + 1, col("key") + 2)) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`newCol1`")) } } @@ -923,12 +927,12 @@ class DataFrameSuite extends QueryTest } test("SPARK-40311: withColumnsRenamed duplicate column names simple") { - val e = intercept[AnalysisException] { - person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "renamed")) - } - assert(e.getMessage.contains("Found duplicate column(s)")) - assert(e.getMessage.contains("in given column names for withColumnsRenamed:")) - assert(e.getMessage.contains("`renamed`")) + checkError( + exception = intercept[AnalysisException] { + person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "renamed")) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`renamed`")) } test("SPARK-40311: withColumnsRenamed duplicate column names simple case sensitive") { @@ -939,12 +943,12 @@ class DataFrameSuite extends QueryTest } test("SPARK-40311: withColumnsRenamed duplicate column names indirect") { - val e = intercept[AnalysisException] { - person.withColumnsRenamed(Map("id" -> "renamed1", "renamed1" -> "age")) - } - assert(e.getMessage.contains("Found duplicate column(s)")) - assert(e.getMessage.contains("in given column names for withColumnsRenamed:")) - assert(e.getMessage.contains("`age`")) + checkError( + exception = intercept[AnalysisException] { + person.withColumnsRenamed(Map("id" -> "renamed1", "renamed1" -> "age")) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`age`")) } test("SPARK-20384: Value class filter") { @@ -1759,24 +1763,25 @@ class DataFrameSuite extends QueryTest test("SPARK-8072: Better Exception for Duplicate Columns") { // only one duplicate column present - val e = intercept[org.apache.spark.sql.AnalysisException] { + val e = intercept[AnalysisException] { Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1") .write.format("parquet").save("temp") } - assert(e.getMessage.contains("Found duplicate column(s) when inserting into")) - assert(e.getMessage.contains("column1")) - assert(!e.getMessage.contains("column2")) + checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) // multiple duplicate columns present - val f = intercept[org.apache.spark.sql.AnalysisException] { + val f = intercept[AnalysisException] { Seq((1, 2, 3, 4, 5), (2, 3, 4, 5, 6), (3, 4, 5, 6, 7)) .toDF("column1", "column2", "column3", "column1", "column3") .write.format("json").save("temp") } - assert(f.getMessage.contains("Found duplicate column(s) when inserting into")) - assert(f.getMessage.contains("column1")) - assert(f.getMessage.contains("column3")) - assert(!f.getMessage.contains("column2")) + checkError( + exception = f, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) } test("SPARK-6941: Better error message for inserting into RDD-based Table") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala index 78b314272aa0..ded2a80c6fa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -62,9 +62,10 @@ trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { .schema(caseInsensitiveSchema) .format(format) .load(path) - .show + .collect() } - assert(e.getMessage.contains(s"Found duplicate column(s) $colType: `camelcase`")) + assert(e.getErrorClass == "COLUMN_ALREADY_EXISTS") + assert(e.getMessageParameters().get("columnName") == "`camelcase`") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 3173f0594182..f620c0b4c868 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -156,8 +156,11 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { withTable("t1") { val cols = Seq("c1", "c2", "c3") createTable("t1", cols, Seq("int", "long", "string")) - val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c2) values(1, 2, 3)")) - assert(e1.getMessage.contains("Found duplicate column(s) in the column list: `c2`")) + checkError( + exception = intercept[AnalysisException]( + sql(s"INSERT INTO t1 (c1, c2, c2) values(1, 2, 3)")), + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`c2`")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 95cc5f297208..d4077274d5c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -721,10 +721,12 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared withTable(table) { sql(s"CREATE TABLE $table (value string, name string) USING PARQUET") val dupCol = if (caseSensitive) "value" else "VaLuE" - val errorMsg = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS value, name, $dupCol") - }.getMessage - assert(errorMsg.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS value, name, $dupCol") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`value`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index fc078e997449..b69a0628f3e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -25,11 +25,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -trait AlterTableTests extends SharedSparkSession { +trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { protected def getTableMetadata(tableName: String): Table @@ -431,10 +432,12 @@ trait AlterTableTests extends SharedSparkSession { val t = s"${catalogAndNamespace}table_name" withTable(t) { sql(s"CREATE TABLE $t (id int) USING $v2Format") - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data string)") - } - assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data string)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`data`")) } } @@ -442,11 +445,12 @@ trait AlterTableTests extends SharedSparkSession { val t = s"${catalogAndNamespace}table_name" withTable(t) { sql(s"CREATE TABLE $t (id int, point struct) USING $v2Format") - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, point.xx double)") - } - assert(e.message.contains( - "Found duplicate column(s) in the user specified columns: `point.z`")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, point.xx double)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> toSQLId("point.z"))) } } @@ -1221,10 +1225,12 @@ trait AlterTableTests extends SharedSparkSession { val t = s"${catalogAndNamespace}table_name" withTable(t) { sql(s"CREATE TABLE $t (data string) USING $v2Format") - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data1 string, data string)") - } - assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data1 string, data string)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`data`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index de8612c3348e..1375042be78e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, ResolveDefaultColumns} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership +import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -68,7 +69,11 @@ abstract class DataSourceV2SQLSuite } } -class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableTests { +class DataSourceV2SQLSuiteV1Filter + extends DataSourceV2SQLSuite + with AlterTableTests + with QueryErrorsBase { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override protected val catalogAndNamespace = "testcat.ns1.ns2." @@ -1395,69 +1400,51 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { checkError( exception = analysisException(s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of default.t", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of t", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of default.t", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of t", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) } } } test("tableCreation: duplicate nested column names in the table definition") { - val errorMsg = "Found duplicate column(s) in the table definition of" Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { checkError( exception = analysisException( s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of default.t", - "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`" - ) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> toSQLId(s"d.${c0.toLowerCase(Locale.ROOT)}")) ) checkError( exception = analysisException( s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of t", - "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> toSQLId(s"d.${c0.toLowerCase(Locale.ROOT)}"))) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of default.t", - "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> toSQLId(s"d.${c0.toLowerCase(Locale.ROOT)}"))) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the table definition of t", - "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> toSQLId(s"d.${c0.toLowerCase(Locale.ROOT)}"))) } } } @@ -1538,34 +1525,27 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT exception = analysisException( s"CREATE TABLE t ($c0 INT) USING $v2Source " + s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"), - errorClass = "_LEGACY_ERROR_TEMP_1233", + errorClass = "COLUMN_ALREADY_EXISTS", parameters = Map( - "colType" -> "in the bucket definition", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + "columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " + s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the bucket definition", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " + s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the bucket definition", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " + s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"), - errorClass = "_LEGACY_ERROR_TEMP_1233", - parameters = Map( - "colType" -> "in the bucket definition", - "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c0.toLowerCase(Locale.ROOT)}`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 08de29be27e4..ffe8bbb2833f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -23,12 +23,17 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.Expressions +import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.datasources.PreprocessTableCreation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType} -class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTest { +class V2CommandsCaseSensitivitySuite + extends SharedSparkSession + with AnalysisTest + with QueryErrorsBase { + import CreateTablePartitioningValidationSuite._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -238,7 +243,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes } test("SPARK-36372: Adding duplicate columns should not be allowed") { - alterTableTest( + assertAnalysisErrorClass( AddColumns( table, Seq(QualifiedColType( @@ -257,8 +262,9 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes None, None, None))), - Seq("Found duplicate column(s) in the user specified columns: `point.z`"), - expectErrorOnCaseSensitive = false) + "COLUMN_ALREADY_EXISTS", + Map("columnName" -> toSQLId("point.z")), + caseSensitive = false) } test("SPARK-36381: Check column name exist case sensitive and insensitive when add column") { @@ -341,13 +347,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes } test("SPARK-36449: Replacing columns with duplicate name should not be allowed") { - alterTableTest( + assertAnalysisErrorClass( ReplaceColumns( table, Seq(QualifiedColType(None, "f", LongType, true, None, None, None), QualifiedColType(None, "F", LongType, true, None, None, None))), - Seq("Found duplicate column(s) in the user specified columns: `f`"), - expectErrorOnCaseSensitive = false) + "COLUMN_ALREADY_EXISTS", + Map("columnName" -> toSQLId("f")), + caseSensitive = false) } private def alterTableTest( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5aa364717704..d486ba49dfb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -519,12 +519,12 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { test("create table - duplicate column names in the table definition") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - val errMsg = intercept[AnalysisException] { - sql(s"CREATE TABLE t($c0 INT, $c1 INT) USING parquet") - }.getMessage - assert(errMsg.contains( - "Found duplicate column(s) in the table definition of " + - s"`$SESSION_CATALOG_NAME`.`default`.`t`")) + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE t($c0 INT, $c1 INT) USING parquet") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } @@ -558,10 +558,12 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { test("create table - column repeated in partition columns") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - val errMsg = intercept[AnalysisException] { - sql(s"CREATE TABLE t($c0 INT) USING parquet PARTITIONED BY ($c0, $c1)") - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the partition schema")) + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE t($c0 INT) USING parquet PARTITIONED BY ($c0, $c1)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } @@ -569,18 +571,22 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { test("create table - column repeated in bucket/sort columns") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - var errMsg = intercept[AnalysisException] { - sql(s"CREATE TABLE t($c0 INT) USING parquet CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS") - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the bucket definition")) - - errMsg = intercept[AnalysisException] { - sql(s""" - |CREATE TABLE t($c0 INT, col INT) USING parquet CLUSTERED BY (col) - | SORTED BY ($c0, $c1) INTO 2 BUCKETS - """.stripMargin) - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the sort definition")) + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE t($c0 INT) USING parquet CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) + + checkError( + exception = intercept[AnalysisException] { + sql(s""" + |CREATE TABLE t($c0 INT, col INT) USING parquet CLUSTERED BY (col) + | SORTED BY ($c0, $c1) INTO 2 BUCKETS + """.stripMargin) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } @@ -667,10 +673,12 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { test("create view - duplicate column names in the view definition") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - val errMsg = intercept[AnalysisException] { - sql(s"CREATE VIEW t AS SELECT * FROM VALUES (1, 1) AS t($c0, $c1)") - }.getMessage - assert(errMsg.contains("Found duplicate column(s) in the view definition")) + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE VIEW t AS SELECT * FROM VALUES (1, 1) AS t($c0, $c1)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } @@ -1905,10 +1913,12 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { test("alter table add columns with existing column name") { withTable("t1") { sql("CREATE TABLE t1 (c1 int) USING PARQUET") - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (c1 string)") - }.getMessage - assert(e.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (c1 string)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`c1`")) } } @@ -1918,10 +1928,12 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { withTable("t1") { sql("CREATE TABLE t1 (c1 int) USING PARQUET") if (!caseSensitive) { - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - }.getMessage - assert(e.contains("Found duplicate column(s)")) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`c1`")) } else { sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") assert(spark.table("t1").schema == diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 2835f0cc03c3..eaa7b057374e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -49,8 +49,10 @@ class JdbcUtilsSuite extends SparkFunSuite { JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) === StructType(Seq(StructField("c1", DateType, false), StructField("c1", StringType, false))) } - assert(duplicate.getMessage.contains( - "Found duplicate column(s) in the customSchema option value")) + checkError( + exception = duplicate, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`c1`")) // Throw ParseException checkError( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a1722d1b09c1..6cf5ec74ab0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3032,10 +3032,12 @@ abstract class JsonSuite checkAnswer(readback.filter($"AAA" === 0 && $"bbb" === 1), Seq(Row(0, 1))) checkAnswer(readback.filter($"AAA" === 2 && $"bbb" === 3), Seq()) // Schema inferring - val errorMsg = intercept[AnalysisException] { - spark.read.json(path.getCanonicalPath).collect() - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the data schema")) + checkError( + exception = intercept[AnalysisException] { + spark.read.json(path.getCanonicalPath).collect() + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`aaa`")) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { val readback = spark.read.schema("aaa integer, BBB integer") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index cee9efa16d0b..e19df4b9bd47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -521,13 +521,15 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { test("SPARK-10849: jdbc CreateTableColumnTypes duplicate columns") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val msg = intercept[AnalysisException] { + val e = intercept[AnalysisException] { df.write.mode(SaveMode.Overwrite) .option("createTableColumnTypes", "name CHAR(20), id int, NaMe VARCHAR(100)") .jdbc(url1, "TEST.USERDBTYPETEST", properties) - }.getMessage() - assert(msg.contains( - "Found duplicate column(s) in the createTableColumnTypes option value: `name`")) + } + checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`name`")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 9085eff69dc1..730b63850d99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -164,9 +164,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { test("SPARK-31968: duplicate partition columns check") { withTempPath { f => - val e = intercept[AnalysisException]( - Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath)) - assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`")) + checkError( + exception = intercept[AnalysisException] { + Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`b`")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 49cbbe152d34..568b1df4c400 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -371,10 +371,12 @@ abstract class FileStreamSinkSuite extends StreamTest { } } - val errorMsg = intercept[AnalysisException] { - spark.read.schema(s"$c0 INT, $c1 INT").json(outputDir).as[(Int, Int)] - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the data schema: ")) + checkError( + exception = intercept[AnalysisException] { + spark.read.schema(s"$c0 INT, $c1 INT").json(outputDir).as[(Int, Int)] + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 5517a33dc21b..3f2414d21784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -962,15 +962,18 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with test("SPARK-20460 Check name duplication in buckets") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - var errorMsg = intercept[AnalysisException] { - Seq((1, 1)).toDF("col", c0).write.bucketBy(2, c0, c1).saveAsTable("t") - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the bucket definition")) - - errorMsg = intercept[AnalysisException] { - Seq((1, 1)).toDF("col", c0).write.bucketBy(2, "col").sortBy(c0, c1).saveAsTable("t") - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the sort definition")) + checkError( + exception = intercept[AnalysisException] { + Seq((1, 1)).toDF("col", c0).write.bucketBy(2, c0, c1).saveAsTable("t") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) + checkError( + exception = intercept[AnalysisException] { + Seq((1, 1)).toDF("col", c0).write.bucketBy(2, "col").sortBy(c0, c1).saveAsTable("t") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) } } } @@ -978,22 +981,26 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with test("SPARK-20460 Check name duplication in schema") { def checkWriteDataColumnDuplication( format: String, colName0: String, colName1: String, tempDir: File): Unit = { - val errorMsg = intercept[AnalysisException] { - Seq((1, 1)).toDF(colName0, colName1).write.format(format).mode("overwrite") - .save(tempDir.getAbsolutePath) - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) when inserting into")) + checkError( + exception = intercept[AnalysisException] { + Seq((1, 1)).toDF(colName0, colName1).write.format(format).mode("overwrite") + .save(tempDir.getAbsolutePath) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${colName1.toLowerCase(Locale.ROOT)}`")) } def checkReadUserSpecifiedDataColumnDuplication( df: DataFrame, format: String, colName0: String, colName1: String, tempDir: File): Unit = { val testDir = Utils.createTempDir(tempDir.getAbsolutePath) df.write.format(format).mode("overwrite").save(testDir.getAbsolutePath) - val errorMsg = intercept[AnalysisException] { - spark.read.format(format).schema(s"$colName0 INT, $colName1 INT") - .load(testDir.getAbsolutePath) - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the data schema:")) + checkError( + exception = intercept[AnalysisException] { + spark.read.format(format).schema(s"$colName0 INT, $colName1 INT") + .load(testDir.getAbsolutePath) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${colName1.toLowerCase(Locale.ROOT)}`")) } def checkReadPartitionColumnDuplication( @@ -1001,10 +1008,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with val testDir = Utils.createTempDir(tempDir.getAbsolutePath) Seq(1).toDF("col").write.format(format).mode("overwrite") .save(s"${testDir.getAbsolutePath}/$colName0=1/$colName1=1") - val errorMsg = intercept[AnalysisException] { - spark.read.format(format).load(testDir.getAbsolutePath) - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the partition schema:")) + checkError( + exception = intercept[AnalysisException] { + spark.read.format(format).load(testDir.getAbsolutePath) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${colName1.toLowerCase(Locale.ROOT)}`")) } Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => @@ -1030,10 +1039,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with testDir = Utils.createTempDir(src.getAbsolutePath) Seq(s"""{"$c0":3, "$c1":5}""").toDF().write.mode("overwrite") .text(testDir.getAbsolutePath) - val errorMsg = intercept[AnalysisException] { - spark.read.format("json").option("inferSchema", true).load(testDir.getAbsolutePath) - }.getMessage - assert(errorMsg.contains("Found duplicate column(s) in the data schema:")) + checkError( + exception = intercept[AnalysisException] { + spark.read.format("json").option("inferSchema", true).load(testDir.getAbsolutePath) + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> s"`${c1.toLowerCase(Locale.ROOT)}`")) checkReadPartitionColumnDuplication("json", c0, c1, src) // Check Parquet format diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index a200959441d1..fb15432013d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -61,7 +61,6 @@ case class InsertIntoHiveDirCommand( assert(storage.locationUri.nonEmpty) SchemaUtils.checkColumnNameDuplication( outputColumnNames, - s"when inserting into ${storage.locationUri.get}", sparkSession.sessionState.conf.caseSensitiveAnalysis) val table = CatalogTable( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 7e9052fb530e..f62d941746b4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -792,15 +792,18 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter s"(caseSensitivity=$caseSensitivity, format=$format)") { withTempDir { dir => withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitivity") { - val m = intercept[AnalysisException] { + val e = intercept[AnalysisException] { sql( s""" |INSERT OVERWRITE $local DIRECTORY '${dir.toURI}' |STORED AS $format |SELECT 'id', 'id2' ${if (caseSensitivity) "id" else "ID"} """.stripMargin) - }.getMessage - assert(m.contains("Found duplicate column(s) when inserting into")) + } + checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`id`")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 653906366b3a..fbcc6f8caa94 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -549,10 +549,10 @@ class HiveDDLSuite } test("create table: partition column names exist in table definition") { - assertAnalysisError( + assertAnalysisErrorClass( "CREATE TABLE tbl(a int) PARTITIONED BY (a string)", - "Found duplicate column(s) in the table definition of " + - s"`$SESSION_CATALOG_NAME`.`default`.`tbl`: `a`") + "COLUMN_ALREADY_EXISTS", + Map("columnName" -> "`a`")) } test("create partitioned table without specifying data type for the partition columns") { @@ -2356,14 +2356,16 @@ class HiveDDLSuite sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET") if (!caseSensitive) { // duplicating partitioning column name - assertAnalysisError( + assertAnalysisErrorClass( "ALTER TABLE tab ADD COLUMNS (C2 string)", - "Found duplicate column(s)") + "COLUMN_ALREADY_EXISTS", + Map("columnName" -> "`c2`")) // duplicating data column name - assertAnalysisError( + assertAnalysisErrorClass( "ALTER TABLE tab ADD COLUMNS (C1 string)", - "Found duplicate column(s)") + "COLUMN_ALREADY_EXISTS", + Map("columnName" -> "`c1`")) } else { // hive catalog will still complains that c1 is duplicate column name because hive // identifiers are case insensitive.