Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,9 @@ object TableOutputResolver extends SQLConfHelper with Logging {
// TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well.
supportColDefaultValue: Boolean = false): LogicalPlan = {

val actualExpectedCols = expected.map { attr =>
attr.withDataType(CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType))
}

if (actualExpectedCols.size < query.output.size) {
if (expected.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query.output)
tableName, expected.map(_.name), query.output)
}

val errors = new mutable.ArrayBuffer[String]()
Expand All @@ -100,21 +96,21 @@ object TableOutputResolver extends SQLConfHelper with Logging {
reorderColumnsByName(
tableName,
query.output,
actualExpectedCols,
expected,
conf,
errors += _,
fillDefaultValue = supportColDefaultValue)
} else {
if (actualExpectedCols.size > query.output.size) {
if (expected.size > query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, actualExpectedCols.map(_.name), query.output)
tableName, expected.map(_.name), query.output)
}
resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _)
resolveColumnsByPosition(tableName, query.output, expected, conf, errors += _)
}

if (errors.nonEmpty) {
throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
tableName, actualExpectedCols.map(_.name).map(toSQLId).mkString(", "))
tableName, expected.map(_.name).map(toSQLId).mkString(", "))
}

if (resolved == query.output) {
Expand Down Expand Up @@ -246,22 +242,25 @@ object TableOutputResolver extends SQLConfHelper with Logging {
case a: Alias => a.withName(expectedName)
case other => other
}
(matchedCol.dataType, expectedCol.dataType) match {
val replacedExpectedCol = expectedCol.withDataType {
CharVarcharUtils.getRawType(expectedCol.metadata).getOrElse(expectedCol.dataType)
}
(matchedCol.dataType, replacedExpectedCol.dataType) match {
case (matchedType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, replacedExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case (matchedType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, replacedExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case (matchedType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, matchedCol, matchedType, expectedCol, expectedType,
tableName, matchedCol, matchedType, replacedExpectedCol, expectedType,
byName = true, conf, addError, newColPath)
case _ =>
checkField(
tableName, expectedCol, matchedCol, byName = true, conf, addError, newColPath)
tableName, replacedExpectedCol, matchedCol, byName = true, conf, addError, newColPath)
}
}
}
Expand All @@ -288,11 +287,13 @@ object TableOutputResolver extends SQLConfHelper with Logging {
private def resolveColumnsByPosition(
tableName: String,
inputCols: Seq[NamedExpression],
expectedCols: Seq[Attribute],
expected: Seq[Attribute],
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String] = Nil): Seq[NamedExpression] = {

val expectedCols = expected.map { attr =>
attr.withDataType { CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
}
if (inputCols.size > expectedCols.size) {
val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size)
.map(col => toSQLId(col.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
checkPlainResult(spark.table("t"), typ, v)
}
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkPlainResult(spark.table("t"), typ, null)
(spark.table("t"), typ, null)
}
}
}
Expand Down Expand Up @@ -661,6 +661,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-48792: Fix INSERT with partial column list to a table with char/varchar") {
Seq("char", "varchar").foreach { typ =>
withTable("students") {
sql(s"CREATE TABLE students (name $typ(64), address $typ(64)) USING $format")
sql("INSERT INTO students VALUES ('Kent Yao', 'Hangzhou')")
sql("INSERT INTO students (address) VALUES ('<unknown>')")
checkAnswer(sql("SELECT count(*) FROM students"), Row(2))
}
}
}
}

// Some basic char/varchar tests which doesn't rely on table implementation.
Expand Down