Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] {
if (i.userSpecifiedCols.size != i.query.output.size) {
if (i.userSpecifiedCols.size > i.query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tblName, i.userSpecifiedCols, i.query)
tblName, i.userSpecifiedCols, i.query.output)
} else {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tblName, i.userSpecifiedCols, i.query)
tblName, i.userSpecifiedCols, i.query.output)
}
}
val projectByName = i.userSpecifiedCols.zip(i.query.output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object TableOutputResolver {

if (actualExpectedCols.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query)
tableName, actualExpectedCols.map(_.name), query.output)
}

val errors = new mutable.ArrayBuffer[String]()
Expand Down Expand Up @@ -77,7 +77,7 @@ object TableOutputResolver {
}
if (actualExpectedCols.size > queryOutputCols.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query)
tableName, actualExpectedCols.map(_.name), query.output)
}

resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _)
Expand Down Expand Up @@ -238,11 +238,17 @@ object TableOutputResolver {

if (reordered.length == expectedCols.length) {
if (matchedCols.size < inputCols.length) {
val extraCols = inputCols.filterNot(col => matchedCols.contains(col.name))
.map(col => s"${toSQLId(col.name)}").mkString(", ")
throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
tableName, colPath.quoted, extraCols
)
if (colPath.isEmpty) {
val cannotFindCol = expectedCols.filter(col => !matchedCols.contains(col.name)).head.name
throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
cannotFindCol)
} else {
val extraCols = inputCols.filterNot(col => matchedCols.contains(col.name))
.map(col => s"${toSQLId(col.name)}").mkString(", ")
throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
tableName, colPath.quoted, extraCols
)
}
} else {
reordered
}
Expand All @@ -263,16 +269,26 @@ object TableOutputResolver {
val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
tableName, colPath.quoted, extraColsStr
)
if (colPath.isEmpty) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName,
expectedCols.map(_.name), inputCols.map(_.toAttribute))
} else {
throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
tableName, colPath.quoted, extraColsStr
)
}
} else if (inputCols.size < expectedCols.size) {
val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
tableName, colPath.quoted, missingColsStr
)
if (colPath.isEmpty) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(tableName,
expectedCols.map(_.name), inputCols.map(_.toAttribute))
} else {
throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
tableName, colPath.quoted, missingColsStr
)
}
}

inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2130,25 +2130,25 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
def cannotWriteTooManyColumnsToTableError(
tableName: String,
expected: Seq[String],
query: LogicalPlan): Throwable = {
queryOutput: Seq[Attribute]): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
"dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
"dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", ")))
}

def cannotWriteNotEnoughColumnsToTableError(
tableName: String,
expected: Seq[String],
query: LogicalPlan): Throwable = {
queryOutput: Seq[Attribute]): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
"dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
"dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", ")))
}

def incompatibleDataToTableCannotFindDataError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
exception = intercept[AnalysisException] {
processInsert("t1", df, overwrite = false, byName = true)
},
v1ErrorClass = "_LEGACY_ERROR_TEMP_1186",
v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
v1Parameters = Map.empty[String, String],
v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`", "colName" -> "`c1`"),
v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`")
)
val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*)
Expand Down