Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,14 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
connectTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")),
sparkTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")))

val e = intercept[AnalysisException](
transform(connectTestRelation.withColumnsRenamed(
Map("id" -> "duplicatedCol", "name" -> "duplicatedCol"))))
assert(e.getMessage.contains("Found duplicate column(s)"))
checkError(
exception = intercept[AnalysisException] {
transform(
connectTestRelation.withColumnsRenamed(
Map("id" -> "duplicatedCol", "name" -> "duplicatedCol")))
},
errorClass = "COLUMN_ALREADY_EXISTS",
parameters = Map("columnName" -> "`duplicatedcol`"))
}

test("Writes fails without path or table") {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
],
"sqlState" : "22005"
},
"COLUMN_ALREADY_EXISTS" : {
"message" : [
"The column <columnName> already exists. Consider to choose another name or rename the existing column."
]
},
"COLUMN_NOT_IN_GROUP_BY_CLAUSE" : {
"message" : [
"The expression <expression> is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in `first()` (or `first_value()`) if you don't care which value you get."
Expand Down Expand Up @@ -2843,11 +2848,6 @@
"Partition spec is invalid. The spec (<specKeys>) must match the partition spec (<partitionColumnNames>) defined in table '<tableName>'."
]
},
"_LEGACY_ERROR_TEMP_1233" : {
"message" : [
"Found duplicate column(s) <colType>: <duplicateCol>."
]
},
"_LEGACY_ERROR_TEMP_1234" : {
"message" : [
"Temporary view <tableIdent> is not cached for analyzing columns."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3504,8 +3504,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq[NamedExpression] = {
SchemaUtils.checkColumnNameDuplication(
i.userSpecifiedCols, "in the column list", resolver)
SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)

i.userSpecifiedCols.map { col =>
i.table.resolve(Seq(col), resolver).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = {
SchemaUtils.checkColumnNameDuplication(
colsToAdd.map(_.name.quoted),
"in the user specified columns",
alter.conf.resolver)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,9 @@ object ResolveUnion extends Rule[LogicalPlan] {

SchemaUtils.checkColumnNameDuplication(
leftOutputAttrs.map(_.name),
"in the left attributes",
caseSensitiveAnalysis)
SchemaUtils.checkColumnNameDuplication(
rightOutputAttrs.map(_.name),
"in the right attributes",
caseSensitiveAnalysis)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2273,12 +2273,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
"tableName" -> tableName))
}

def foundDuplicateColumnError(colType: String, duplicateCol: Seq[String]): Throwable = {
def columnAlreadyExistsError(columnName: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1233",
messageParameters = Map(
"colType" -> colType,
"duplicateCol" -> duplicateCol.sorted.mkString(", ")))
errorClass = "COLUMN_ALREADY_EXISTS",
messageParameters = Map("columnName" -> toSQLId(columnName)))
}

def noSuchTableError(db: String, table: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ private[sql] object PartitioningUtils {
normalizedFiled.name -> normalizedVal
}

SchemaUtils.checkColumnNameDuplication(
normalizedPartSpec.map(_._1), "in the partition schema", resolver)
SchemaUtils.checkColumnNameDuplication(normalizedPartSpec.map(_._1), resolver)

normalizedPartSpec.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,22 @@ private[spark] object SchemaUtils {
* duplication exists.
*
* @param schema schema to check
* @param colType column type name, used in an exception message
* @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
*/
def checkSchemaColumnNameDuplication(
schema: DataType,
colType: String,
caseSensitiveAnalysis: Boolean = false): Unit = {
schema match {
case ArrayType(elementType, _) =>
checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(elementType, caseSensitiveAnalysis)
case MapType(keyType, valueType, _) =>
checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(keyType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(valueType, caseSensitiveAnalysis)
case structType: StructType =>
val fields = structType.fields
checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis)
checkColumnNameDuplication(fields.map(_.name), caseSensitiveAnalysis)
fields.foreach { field =>
checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(field.dataType, caseSensitiveAnalysis)
}
case _ =>
}
Expand All @@ -67,14 +65,10 @@ private[spark] object SchemaUtils {
* duplication exists.
*
* @param schema schema to check
* @param colType column type name, used in an exception message
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkSchemaColumnNameDuplication(
schema: StructType,
colType: String,
resolver: Resolver): Unit = {
checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver))
def checkSchemaColumnNameDuplication(schema: StructType, resolver: Resolver): Unit = {
checkSchemaColumnNameDuplication(schema, isCaseSensitiveAnalysis(resolver))
}

// Returns true if a given resolver is case-sensitive
Expand All @@ -95,32 +89,28 @@ private[spark] object SchemaUtils {
* the duplication exists.
*
* @param columnNames column names to check
* @param colType column type name, used in an exception message
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkColumnNameDuplication(
columnNames: Seq[String], colType: String, resolver: Resolver): Unit = {
checkColumnNameDuplication(columnNames, colType, isCaseSensitiveAnalysis(resolver))
def checkColumnNameDuplication(columnNames: Seq[String], resolver: Resolver): Unit = {
checkColumnNameDuplication(columnNames, isCaseSensitiveAnalysis(resolver))
}

/**
* Checks if input column names have duplicate identifiers. This throws an exception if
* the duplication exists.
*
* @param columnNames column names to check
* @param colType column type name, used in an exception message
* @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
*/
def checkColumnNameDuplication(
columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = {
def checkColumnNameDuplication(columnNames: Seq[String], caseSensitiveAnalysis: Boolean): Unit = {
// scalastyle:off caselocale
val names = if (caseSensitiveAnalysis) columnNames else columnNames.map(_.toLowerCase)
// scalastyle:on caselocale
if (names.distinct.length != names.length) {
val duplicateColumns = names.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw QueryCompilationErrors.foundDuplicateColumnError(colType, duplicateColumns.toSeq)
val columnName = names.groupBy(identity).collectFirst {
case (x, ys) if ys.length > 1 => x
}.get
throw QueryCompilationErrors.columnAlreadyExistsError(columnName)
}
}

Expand Down Expand Up @@ -178,7 +168,7 @@ private[spark] object SchemaUtils {
case b: BucketTransform =>
val colNames = b.columns.map(c => UnresolvedAttribute(c.fieldNames()).name)
// We need to check that we're not duplicating columns within our bucketing transform
checkColumnNameDuplication(colNames, "in the bucket definition", isCaseSensitive)
checkColumnNameDuplication(colNames, isCaseSensitive)
b.name -> colNames
case NamedTransform(transformName, refs) =>
val fieldNameParts =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,27 @@ class SchemaUtilsSuite extends SparkFunSuite {
val testType = if (caseSensitive) "case-sensitive" else "case-insensitive"
test(s"Check column name duplication in $testType cases") {
def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = {
val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " +
duplicatedColumns.sorted.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ")
val schema = StructType.fromDDL(schemaStr)
var msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
}.getMessage
assert(msg.contains(expectedErrorMsg))
msg = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive))
}.getMessage
assert(msg.contains(expectedErrorMsg))
msg = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
}.getMessage
assert(msg.contains(expectedErrorMsg))
checkError(
exception = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(schema, caseSensitive)
},
errorClass = "COLUMN_ALREADY_EXISTS",
parameters = Map("columnName" -> "`a`"))
checkError(
exception = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(schema.map(_.name), resolver(caseSensitive))
},
errorClass = "COLUMN_ALREADY_EXISTS",
parameters = Map("columnName" -> "`a`"))
checkError(
exception = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
schema.map(_.name), caseSensitiveAnalysis = caseSensitive)
},
errorClass = "COLUMN_ALREADY_EXISTS",
parameters = Map("columnName" -> "`a`"))
}

checkExceptionCases(s"$a0 INT, b INT, $a1 INT", a0 :: Nil)
Expand All @@ -70,11 +73,11 @@ class SchemaUtilsSuite extends SparkFunSuite {
def checkNoExceptionCases(schemaStr: String, caseSensitive: Boolean): Unit = {
val schema = StructType.fromDDL(schemaStr)
SchemaUtils.checkSchemaColumnNameDuplication(
schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
schema, caseSensitiveAnalysis = caseSensitive)
SchemaUtils.checkColumnNameDuplication(
schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive))
schema.map(_.name), resolver(caseSensitive))
SchemaUtils.checkColumnNameDuplication(
schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
schema.map(_.name), caseSensitiveAnalysis = caseSensitive)
}

checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = true)
Expand All @@ -99,11 +102,12 @@ class SchemaUtilsSuite extends SparkFunSuite {
val schemaE = MapType(LongType, schemaD)
val schemaF = MapType(schemaD, LongType)
Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema =>
val msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false)
}.getMessage
assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`"))
checkError(
exception = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(schema)
},
errorClass = "COLUMN_ALREADY_EXISTS",
parameters = Map("columnName" -> "`camelcase`"))
}
}
}
2 changes: 0 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2755,7 +2755,6 @@ class Dataset[T] private[sql](
s"the size of columns: ${cols.size}")
SchemaUtils.checkColumnNameDuplication(
colNames,
"in given column names",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val resolver = sparkSession.sessionState.analyzer.resolver
Expand Down Expand Up @@ -2855,7 +2854,6 @@ class Dataset[T] private[sql](
}
SchemaUtils.checkColumnNameDuplication(
projectList.map(_.name),
"in given column names for withColumnsRenamed",
sparkSession.sessionState.conf.caseSensitiveAnalysis)
withPlan(Project(projectList, logicalPlan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object CommandCheck extends (LogicalPlan => Unit) with SQLConfHelper {
plan.foreach {
case AnalyzeColumnCommand(_, colsOpt, allColumns) if !allColumns =>
colsOpt.foreach(SchemaUtils.checkColumnNameDuplication(
_, "in analyze columns.", conf.caseSensitiveAnalysis))
_, conf.caseSensitiveAnalysis))

case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ case class AlterTableAddColumnsCommand(

SchemaUtils.checkColumnNameDuplication(
(colsWithProcessedDefaults ++ catalogTable.schema).map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ object ViewHelper extends SQLConfHelper with Logging {

// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
SchemaUtils.checkColumnNameDuplication(
fieldNames, "in the view definition", conf.resolver)
SchemaUtils.checkColumnNameDuplication(fieldNames, conf.resolver)

// Generate the view default catalog and namespace, as well as captured SQL configs.
val manager = session.sessionState.catalogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,8 @@ case class DataSource(
}

bucketSpec.foreach { bucket =>
SchemaUtils.checkColumnNameDuplication(
bucket.bucketColumnNames, "in the bucket definition", equality)
SchemaUtils.checkColumnNameDuplication(
bucket.sortColumnNames, "in the sort definition", equality)
SchemaUtils.checkColumnNameDuplication(bucket.bucketColumnNames, equality)
SchemaUtils.checkColumnNameDuplication(bucket.sortColumnNames, equality)
}

/**
Expand Down Expand Up @@ -220,7 +218,6 @@ case class DataSource(
try {
SchemaUtils.checkColumnNameDuplication(
(dataSchema ++ partitionSchema).map(_.name),
"in the data schema and the partition schema",
equality)
} catch {
case e: AnalysisException => logWarning(e.getMessage)
Expand Down Expand Up @@ -428,17 +425,14 @@ case class DataSource(
case hs: HadoopFsRelation =>
SchemaUtils.checkSchemaColumnNameDuplication(
hs.dataSchema,
"in the data schema",
equality)
SchemaUtils.checkSchemaColumnNameDuplication(
hs.partitionSchema,
"in the partition schema",
equality)
DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema)
case _ =>
SchemaUtils.checkSchemaColumnNameDuplication(
relation.schema,
"in the data schema",
equality)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ case class InsertIntoHadoopFsRelationCommand(
// Most formats don't do well with duplicate columns, so lets not allow that
SchemaUtils.checkColumnNameDuplication(
outputColumnNames,
s"when inserting into $outputPath",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,7 @@ object PartitioningUtils extends SQLConfHelper {
partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {

SchemaUtils.checkColumnNameDuplication(
partitionColumns, partitionColumns.mkString(", "), caseSensitive)
SchemaUtils.checkColumnNameDuplication(partitionColumns, caseSensitive)

partitionColumnsSchema(schema, partitionColumns).foreach {
field => field.dataType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,8 +823,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes)

// checks duplicate columns in the user specified column types.
SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver)
SchemaUtils.checkColumnNameDuplication(userSchema.map(_.name), conf.resolver)

// checks if user specified column names exist in the DataFrame schema
userSchema.fieldNames.foreach { col =>
Expand All @@ -849,10 +848,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
if (null != customSchema && customSchema.nonEmpty) {
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)

SchemaUtils.checkSchemaColumnNameDuplication(
userSchema,
"in the customSchema option value",
nameEquality)
SchemaUtils.checkSchemaColumnNameDuplication(userSchema, nameEquality)

// This is resolved by names, use the custom filed dataType to replace the default dataType.
val newSchema = tableSchema.map { col =>
Expand Down
Loading