Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object ResolveUnion extends Rule[LogicalPlan] {
* already contain them. Currently we don't support merging structs nested inside of arrays
* or maps.
*/
private def addFields(col: Expression, targetType: StructType): Expression = {
private def addFields(col: Expression,
targetType: StructType, allowMissing: Boolean): Expression = {
assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")

val resolver = conf.resolver
Expand All @@ -54,11 +55,18 @@ object ResolveUnion extends Rule[LogicalPlan] {
val newExpression = (currentField, expectedField.dataType) match {
case (Some(cf), expectedType: StructType) if cf.dataType.isInstanceOf[StructType] =>
val extractedValue = ExtractValue(col, Literal(cf.name), resolver)
addFields(extractedValue, expectedType)
addFields(extractedValue, expectedType, allowMissing)
case (Some(cf), _) =>
ExtractValue(col, Literal(cf.name), resolver)
case (None, expectedType) =>
Literal(null, expectedType)
if (allowMissing) {
// for allowMissingCol allow the null values
Literal(null, expectedType)
} else {
// for allowMissingCol as false throw exception for missing col
throw QueryCompilationErrors.noSuchStructFieldInGivenFieldsError(
expectedField.name, colType.fields)
}
}
newStructFields ++= Literal(expectedField.name) :: newExpression :: Nil
}
Expand All @@ -77,7 +85,6 @@ object ResolveUnion extends Rule[LogicalPlan] {
}
}


/**
* This method will compare right to left plan's outputs. If there is one struct attribute
* at right side has same name with left side struct attribute, but two structs are not the
Expand All @@ -101,12 +108,12 @@ object ResolveUnion extends Rule[LogicalPlan] {
val foundDt = foundAttr.dataType
(foundDt, lattr.dataType) match {
case (source: StructType, target: StructType)
if allowMissingCol && !source.sameType(target) =>
if !source.sameType(target) =>
// We have two structs with different types, so make sure the two structs have their
// fields in the same order by using `target`'s fields and then inluding any remaining
// in `foundAttr`.
// fields in the same order by using `target`'s fields and then including any remaining
// in `foundAttr` in case of allowMissingCol is true.
aliased += foundAttr
Alias(addFields(foundAttr, target), foundAttr.name)()
Alias(addFields(foundAttr, target, allowMissingCol), foundAttr.name)()
case _ =>
// We don't need/try to add missing fields if:
// 1. The attributes of left and right side are the same struct type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,12 +922,70 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-35756: unionByName support struct having same col names but different sequence") {
// struct having same col names but different sequence
var df1 = Seq(("d1", Struct1(1, 2))).toDF("a", "b")
var df2 = Seq(("d2", Struct2(1, 2))).toDF("a", "b")
var unionDF = df1.unionByName(df2)
var expected = Row("d1", Row(1, 2)) :: Row("d2", Row(2, 1)) :: Nil
val schema = StructType(Seq(StructField("a", StringType),
StructField("b", StructType(Seq(StructField("c1", IntegerType),
StructField("c2", IntegerType))))))

assert(unionDF.schema === schema)
checkAnswer(unionDF, expected)

// nested struct, inner struct having different col name
df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2")))).toDF("id", "a")
df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L)))).toDF("id", "a")
var errMsg = intercept[AnalysisException] {
df1.unionByName(df2)
}.getMessage
assert(errMsg.contains("No such struct field c in a, b"))

// If right side of the nested struct has extra col.
df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c")
df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1, 5)))).toDF("a", "b", "c")
errMsg = intercept[AnalysisException] {
df1.unionByName(df2)
}.getMessage
assert(errMsg.contains("Union can only be performed on tables with" +
" the compatible column types." +
" struct<c1:int,c2:int,c3:struct<c3:int,c5:int>> <> struct<c1:int,c2:int,c3:struct<c3:int>>" +
" at the third column of the second table"))

// diff Case sensitive attributes names and diff sequence scenario for unionByName
df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c")
df2 = Seq((1, 2, UnionClass1f(1, 2, Struct3a(1)))).toDF("a", "b", "c")
expected =
Row(1, 2, Row(1, 2, Row(1))) :: Row(1, 2, Row(2, 1, Row(1))) :: Nil

unionDF = df1.unionByName(df2)
checkAnswer(unionDF, expected)

df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
df2 = Seq((1, Struct2a(1, 2))).toDF("a", "b")
expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil

unionDF = df1.unionByName(df2)
checkAnswer(unionDF, expected)
}
}

case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)
case class UnionClass1b(a: Int, b: Long, nested: UnionClass3)
case class UnionClass1c(a: Int, b: Long, nested: UnionClass4)
case class UnionClass1d(c1: Int, c2: Int, c3: Struct3)
case class UnionClass1e(c2: Int, c1: Int, c3: Struct4)
case class UnionClass1f(c2: Int, c1: Int, c3: Struct3a)

case class UnionClass2(a: Int, c: String)
case class UnionClass3(a: Int, b: Long)
case class UnionClass4(A: Int, b: Long)
case class Struct1(c1: Int, c2: Int)
case class Struct2(c2: Int, c1: Int)
case class Struct2a(C2: Int, c1: Int)
case class Struct3(c3: Int)
case class Struct3a(C3: Int)
case class Struct4(c3: Int, c5: Int)