diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala index 2574bf7ab485..98c529f57898 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala @@ -40,7 +40,8 @@ object ResolveUnion extends Rule[LogicalPlan] { * already contain them. Currently we don't support merging structs nested inside of arrays * or maps. */ - private def addFields(col: Expression, targetType: StructType): Expression = { + private def addFields(col: Expression, + targetType: StructType, allowMissing: Boolean): Expression = { assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") val resolver = conf.resolver @@ -54,11 +55,18 @@ object ResolveUnion extends Rule[LogicalPlan] { val newExpression = (currentField, expectedField.dataType) match { case (Some(cf), expectedType: StructType) if cf.dataType.isInstanceOf[StructType] => val extractedValue = ExtractValue(col, Literal(cf.name), resolver) - addFields(extractedValue, expectedType) + addFields(extractedValue, expectedType, allowMissing) case (Some(cf), _) => ExtractValue(col, Literal(cf.name), resolver) case (None, expectedType) => - Literal(null, expectedType) + if (allowMissing) { + // for allowMissingCol allow the null values + Literal(null, expectedType) + } else { + // for allowMissingCol as false throw exception for missing col + throw QueryCompilationErrors.noSuchStructFieldInGivenFieldsError( + expectedField.name, colType.fields) + } } newStructFields ++= Literal(expectedField.name) :: newExpression :: Nil } @@ -77,7 +85,6 @@ object ResolveUnion extends Rule[LogicalPlan] { } } - /** * This method will compare right to left plan's outputs. If there is one struct attribute * at right side has same name with left side struct attribute, but two structs are not the @@ -101,12 +108,12 @@ object ResolveUnion extends Rule[LogicalPlan] { val foundDt = foundAttr.dataType (foundDt, lattr.dataType) match { case (source: StructType, target: StructType) - if allowMissingCol && !source.sameType(target) => + if !source.sameType(target) => // We have two structs with different types, so make sure the two structs have their - // fields in the same order by using `target`'s fields and then inluding any remaining - // in `foundAttr`. + // fields in the same order by using `target`'s fields and then including any remaining + // in `foundAttr` in case of allowMissingCol is true. aliased += foundAttr - Alias(addFields(foundAttr, target), foundAttr.name)() + Alias(addFields(foundAttr, target, allowMissingCol), foundAttr.name)() case _ => // We don't need/try to add missing fields if: // 1. The attributes of left and right side are the same struct type diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e622528afc69..fcd3e8315b3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -922,12 +922,70 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-35756: unionByName support struct having same col names but different sequence") { + // struct having same col names but different sequence + var df1 = Seq(("d1", Struct1(1, 2))).toDF("a", "b") + var df2 = Seq(("d2", Struct2(1, 2))).toDF("a", "b") + var unionDF = df1.unionByName(df2) + var expected = Row("d1", Row(1, 2)) :: Row("d2", Row(2, 1)) :: Nil + val schema = StructType(Seq(StructField("a", StringType), + StructField("b", StructType(Seq(StructField("c1", IntegerType), + StructField("c2", IntegerType)))))) + + assert(unionDF.schema === schema) + checkAnswer(unionDF, expected) + + // nested struct, inner struct having different col name + df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2")))).toDF("id", "a") + df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L)))).toDF("id", "a") + var errMsg = intercept[AnalysisException] { + df1.unionByName(df2) + }.getMessage + assert(errMsg.contains("No such struct field c in a, b")) + + // If right side of the nested struct has extra col. + df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") + df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1, 5)))).toDF("a", "b", "c") + errMsg = intercept[AnalysisException] { + df1.unionByName(df2) + }.getMessage + assert(errMsg.contains("Union can only be performed on tables with" + + " the compatible column types." + + " struct> <> struct>" + + " at the third column of the second table")) + + // diff Case sensitive attributes names and diff sequence scenario for unionByName + df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c") + df2 = Seq((1, 2, UnionClass1f(1, 2, Struct3a(1)))).toDF("a", "b", "c") + expected = + Row(1, 2, Row(1, 2, Row(1))) :: Row(1, 2, Row(2, 1, Row(1))) :: Nil + + unionDF = df1.unionByName(df2) + checkAnswer(unionDF, expected) + + df1 = Seq((1, Struct1(1, 2))).toDF("a", "b") + df2 = Seq((1, Struct2a(1, 2))).toDF("a", "b") + expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil + + unionDF = df1.unionByName(df2) + checkAnswer(unionDF, expected) + } } case class UnionClass1a(a: Int, b: Long, nested: UnionClass2) case class UnionClass1b(a: Int, b: Long, nested: UnionClass3) case class UnionClass1c(a: Int, b: Long, nested: UnionClass4) +case class UnionClass1d(c1: Int, c2: Int, c3: Struct3) +case class UnionClass1e(c2: Int, c1: Int, c3: Struct4) +case class UnionClass1f(c2: Int, c1: Int, c3: Struct3a) case class UnionClass2(a: Int, c: String) case class UnionClass3(a: Int, b: Long) case class UnionClass4(A: Int, b: Long) +case class Struct1(c1: Int, c2: Int) +case class Struct2(c2: Int, c1: Int) +case class Struct2a(C2: Int, c1: Int) +case class Struct3(c3: Int) +case class Struct3a(C3: Int) +case class Struct4(c3: Int, c5: Int)