Skip to content

Conversation

@SaurabhChawla100
Copy link
Contributor

@SaurabhChawla100 SaurabhChawla100 commented Jun 19, 2021

What changes were proposed in this pull request?

unionByName does not supports struct having same col names but different sequence

val df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
val df2 = Seq((1, Struct2(1, 2))).toDF("a", "b")
val unionDF = df1.unionByName(df2)

it gives the exception

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<c2:int,c1:int> <> struct<c1:int,c2:int> at the second column of the second table; 'Union false, false :- LocalRelation [_1#38, _2#39] +- LocalRelation _1#45, _2#46

In this case the col names are same so this unionByName should have the support to check within in the Struct if col names are same it should not throw this exception and works.

after fix we are getting the result

val unionDF = df1.unionByName(df2)
scala>  unionDF.show
+---+------+                                                                    
|  a|     b|
+---+------+
|  1|{1, 2}|
|  1|{2, 1}|
+---+------+

Why are the changes needed?

As per unionByName functionality based on name, does the union. In the case of struct this scenario was missing where all the columns names are same but sequence is different, so added this functionality.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added the unit test and also done the testing through spark shell

@github-actions github-actions bot added the SQL label Jun 19, 2021
@viirya
Copy link
Member

viirya commented Jun 19, 2021

I think it works in current master, no?

scala> val df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
df1: org.apache.spark.sql.DataFrame = [a: int, b: struct<c1: int, c2: int>]

scala> val df2 = Seq((1, Struct2(1, 2))).toDF("a", "b")
df2: org.apache.spark.sql.DataFrame = [a: int, b: struct<c2: int, c1: int>]

scala> val unionDF = df1.unionByName(df2, true)
unionDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: struct<c1: int, c2: int>]

scala> unionDF.show
+---+------+
|  a|     b|
+---+------+
|  1|{1, 2}|
|  1|{2, 1}|
+---+------+

@SaurabhChawla100
Copy link
Contributor Author

I think it works in current master, no?

scala> val df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
df1: org.apache.spark.sql.DataFrame = [a: int, b: struct<c1: int, c2: int>]

scala> val df2 = Seq((1, Struct2(1, 2))).toDF("a", "b")
df2: org.apache.spark.sql.DataFrame = [a: int, b: struct<c2: int, c1: int>]

scala> val unionDF = df1.unionByName(df2, true)
unionDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: struct<c1: int, c2: int>]

scala> unionDF.show
+---+------+
|  a|     b|
+---+------+
|  1|{1, 2}|
|  1|{2, 1}|
+---+------+

yes this is val unionDF = df1.unionByName(df2, true) working in the current master but this is giving exception val unionDF = df1.unionByName(df2) , So the change is done to make this scenario working.

@HyukjinKwon
Copy link
Member

How does it relate to #32448? StructType.merge can handle this case IIRC.

@Kimahriman
Copy link
Contributor

They're mostly different issues. This is more of a semantics thing. If you have two nested structs with the same fields, but in a different order, you have to set allowMissingCol to true in order for the structs to be sorted, which isn't very intuitive. This is trying to make the ByName part apply to nested structs as well, and leave allowMissingCol to just actually apply to missing (possibly nested) columns.

So I do think this idea makes sense, but I don't think the implementation handles multiple levels of nested structs correctly. addFields assumes adding missing columns, so I think you could end up with a case that adds null nested columns even if allowMissingCol is false.

I think the logic would have to be added to addFields to handle whether or not it should add null missing columns.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Jun 20, 2021

They're mostly different issues. This is more of a semantics thing. If you have two nested structs with the same fields, but in a different order, you have to set allowMissingCol to true in order for the structs to be sorted, which isn't very intuitive. This is trying to make the ByName part apply to nested structs as well, and leave allowMissingCol to just actually apply to missing (possibly nested) columns.

So I do think this idea makes sense, but I don't think the implementation handles multiple levels of nested structs correctly. addFields assumes adding missing columns, so I think you could end up with a case that adds null nested columns even if allowMissingCol is false.

I think the logic would have to be added to addFields to handle whether or not it should add null missing columns.

So I do think this idea makes sense, but I don't think the implementation handles multiple levels of nested structs correctly. addFields assumes adding missing columns, so I think you could end up with a case that adds null nested columns even if allowMissingCol is false.

@Kimahriman - Not able to understand how missing columns added null nested column in this PR for allowMissingCol is false.

  case (source: StructType, target: StructType)
            if !allowMissingCol && !source.sameType(target) &&
              target.toAttributes.map(attr => attr.name).sorted
                == source.toAttributes.map(x => x.name).sorted =>
            // Having an output with same name, but different struct type.
            // We will sort columns in the struct expression to make sure two sides of
            // union have consistent schema.
            aliased += foundAttr
            Alias(addFields(foundAttr, target), foundAttr.name)()

In this PR we are only calling the addFields when both source and target side have same columns on both sides ( target.toAttributes.map(attr => attr.name).sorted == source.toAttributes.map(x => x.name).sorted).

val missingFieldsOpt =
      StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver)

so this missingFieldsOpt is always empty , and we just do a sorting when allowMissingCol is false.

 if (missingFieldsOpt.isEmpty) {
      sortStructFields(col)
    }

Please do let me know if my understanding is not correct here.

@Kimahriman
Copy link
Contributor

Yeah I'm saying it only properly handles one level of a nested struct, not recursively like it should. Got your code running to show an example:

>>> df1 = spark.createDataFrame([Row(a=Row(aa=Row(aaa=1)))])
>>> df2 = spark.createDataFrame([Row(a=Row(aa=Row(aab=1)))])
>>> df1
DataFrame[a: struct<aa:struct<aaa:bigint>>]
>>> df2
DataFrame[a: struct<aa:struct<aab:bigint>>]
>>> df1.unionByName(df2)
DataFrame[a: struct<aa:struct<aaa:bigint,aab:bigint>>]
>>> df1.unionByName(df2).explain()
== Physical Plan ==
Union
:- *(1) Project [if (isnull(a#21)) null else named_struct(aa, if (isnull(a#21.aa)) null else named_struct(aaa, a#21.aa.aaa, aab, null)) AS a#37]
:  +- *(1) Scan ExistingRDD[a#21]
+- *(2) Project [if (isnull(a#23)) null else named_struct(aa, if (isnull(a#23.aa)) null else named_struct(aaa, null, aab, a#23.aa.aab)) AS a#34]
   +- *(2) Scan ExistingRDD[a#23]

The inner struct gets merged adding missing columns even though allowMissingCol is false

@SaurabhChawla100
Copy link
Contributor Author

Yeah I'm saying it only properly handles one level of a nested struct, not recursively like it should. Got your code running to show an example:

>>> df1 = spark.createDataFrame([Row(a=Row(aa=Row(aaa=1)))])
>>> df2 = spark.createDataFrame([Row(a=Row(aa=Row(aab=1)))])
>>> df1
DataFrame[a: struct<aa:struct<aaa:bigint>>]
>>> df2
DataFrame[a: struct<aa:struct<aab:bigint>>]
>>> df1.unionByName(df2)
DataFrame[a: struct<aa:struct<aaa:bigint,aab:bigint>>]
>>> df1.unionByName(df2).explain()
== Physical Plan ==
Union
:- *(1) Project [if (isnull(a#21)) null else named_struct(aa, if (isnull(a#21.aa)) null else named_struct(aaa, a#21.aa.aaa, aab, null)) AS a#37]
:  +- *(1) Scan ExistingRDD[a#21]
+- *(2) Project [if (isnull(a#23)) null else named_struct(aa, if (isnull(a#23.aa)) null else named_struct(aaa, null, aab, a#23.aa.aab)) AS a#34]
   +- *(2) Scan ExistingRDD[a#23]

The inner struct gets merged adding missing columns even though allowMissingCol is false

Thank you for explaining the nested struct scenario. For allowMissingCol as false , we need to do only the sort so instead of using addFields method used sortStructFields

added the unit test for the same

case class UnionClass1d(c1: Int, c2: Int, c3: Struct3)
case class UnionClass1e(c2: Int, c1: Int, c3: Struct4)
case class Struct3(c3: Int)
case class Struct4(c4: Int)

 df1 = Seq((1, 2, UnionClass1d(1, 2, Struct3(1)))).toDF("a", "b", "c")
 df2 = Seq((1, 2, UnionClass1e(1, 2, Struct4(1)))).toDF("a", "b", "c")

df1.unionByName(df2) -> This will not add the missing column, instead will throw exception
 "Union can only be performed on tables with the compatible column types." +
        " struct<c1:int,c2:int,c3:struct<c4:int>> <> struct<c1:int,c2:int,c3:struct<c3:int>>" +
        " at the third column of the second table"

@Kimahriman
Copy link
Contributor

This will definitely conflict with #32448, but I can update as necessary if this is accepted and goes in first. Still waiting on some feedback on that.

@SaurabhChawla100
Copy link
Contributor Author

cc @viirya , @HyukjinKwon @cloud-fan - Please review this Pr

Copy link
Contributor

@cloud-fan cloud-fan Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we handle different column order for top-level columns right now?

Copy link
Contributor Author

@SaurabhChawla100 SaurabhChawla100 Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this handled in the code where each of the left attribute it adds its corresponding right side

https://github.com/apache/spark/pull/32972/files#diff-84dd17265dcadd59f6ad9e649203d38b808485c7b5bd3937136222378f2ed27dR170

 val rightProjectList = leftOutputAttrs.map { lattr =>
      val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }
      if (found.isDefined) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit fragile as it doesn't consider case sensitivity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnionByName with allowMissing columns true add it as the missing column in case of case senistive attributes for both the scenarios spark.sql.caseSensitive as true and false

case class UnionClass2(a: Int, c: String)
case class UnionClass4(A: Int, b: Long)
case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)
case class UnionClass1c(a: Int, b: Long, nested: UnionClass4)


 val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2")))).toDF("id", "a")
 val df2 = Seq((1, UnionClass1c(1, 2L, UnionClass4(2, 3L)))).toDF("id", "a")

case 1 - set spark.sql.caseSensitive=false

scala> spark.sql("set spark.sql.caseSensitive=false")
res6: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> var unionDf = df1.unionByName(df2, true)
unionDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, a: struct<a: int, b: bigint ... 1 more field>]

scala> unionDf.schema.toDDL
res7: String = `id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, `nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING, `A`: INT>>

case 2 -> when spark.sql.caseSensitive is enabled 
scala> spark.sql("set spark.sql.caseSensitive=true")
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> 

scala> var unionDf = df1.unionByName(df2, true)
unionDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, a: struct<a: int, b: bigint ... 1 more field>]

scala> unionDf.schema.toDDL
res3: String = `id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, `nested`: STRUCT<`A`: INT, `a`: INT, `b`: BIGINT, `c`: STRING>>

for UnionbyName without allowMissing -> we cannot add the missing column ,it should give the exception with schema not same so union cannot be done so that is the reason for not converting into lower case and than do the comparison

> var unionDf = df1.unionByName(df2)
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<a:int,b:bigint,nested:struct<A:int,b:bigint>> <> struct<a:int,b:bigint,nested:struct<a:int,c:string>> at the second column of the second table;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he means more like

df1 = spark.createDataFrame([Row(nested=Row(a=1, b=2))])
df2 = spark.createDataFrame([Row(nested=Row(B=1, A=2))])
df1.unionByName(df2)

These wouldn't get merged without doing case insensitive comparisons

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation is not required
target.toAttributes.map(attr => attr.name).sorted == source.toAttributes.map(x => x.name).sorted
Since the we need to sort both left and right side in case of struct and sortStructFields method recursively sorts in all the nested struct.

Now after removing this validation case sensitive attributes with different sequence are also working
as its working for the case sensitive attributes with same sequence

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Jun 23, 2021

There is one scenario which is failing for the allowMissing as true in the existing master branch, This is the scenario where there is sorting done on the case C2 and c1 here after sorting it gives C2, c1 where as on the other side if c1, c2 so it fails with Union can only be performed on tables with the compatible column types.

scala> case class Struct2(C2: Int, c1: Int)
defined class Struct2

scala> case class Struct1(c1: Int, c2: Int)
defined class Struct1

scala> var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b")
df2: org.apache.spark.sql.DataFrame = [a: int, b: struct<C2: int, c1: int>]

scala> var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
df1: org.apache.spark.sql.DataFrame = [a: int, b: struct<c1: int, c2: int>]

scala> var unionDF = df1.unionByName(df2, true)
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<C2:int,c1:int> <> struct<c1:int,c2:int> at the second column of the second table;
'Union false, false

There is need to do a sorting on the lower case to handle this case.

After fix

scala> case class Struct2(C2: Int, c1: Int)
defined class Struct2

scala> case class Struct1(c1: Int, c2: Int)
defined class Struct1

scala> var df1 = Seq((1, Struct1(1, 2))).toDF("a", "b")
df1: org.apache.spark.sql.DataFrame = [a: int, b: struct<c1: int, c2: int>]

scala> var df2 = Seq((1, Struct2(1, 2))).toDF("a", "b")
df2: org.apache.spark.sql.DataFrame = [a: int, b: struct<C2: int, c1: int>]

scala> var unionDF = df1.unionByName(df2, true)
unionDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: struct<c1: int, c2: int ... 1 more field>]

@Kimahriman
Copy link
Contributor

#33040 was merged so this needs to be reworked based off that now

@SaurabhChawla100
Copy link
Contributor Author

#33040 was merged so this needs to be reworked based off that now

Thanks for sharing the details. Will update the PR as per the new change done

var unionDF = df1.unionByName(df2)
var expected = Row(1, Row(1, 2)) :: Row(1, Row(2, 1)) :: Nil
val schema = "`a` INT,`b` STRUCT<`c1`: INT, `c2`: INT>"
assert(unionDF.schema.toDDL === schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit fragile to compare the DDL string, can we compare StructType instance directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the StructType comparison

@SaurabhChawla100
Copy link
Contributor Author

@cloud-fan - Shall we trigger the test build for this PR, seems like its not triggered

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44957/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44957/

@SparkQA
Copy link

SparkQA commented Jun 30, 2021

Test build #140443 has finished for PR 32972 at commit a29df92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SaurabhChawla100
Copy link
Contributor Author

@cloud-fan - All the test passed on this PR. If every thing looks good than, shall we merge this PR

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in ca12176 Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants