-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35290][SQL] Append new nested struct fields rather than sort for unionByName with null filling #33040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Rework of #32448 with just the unionByName fixes and without StructType.merge changes that weren't necessary anymore for this |
|
Thanks, @Kimahriman. |
|
ok to test |
|
cc @viirya FYI |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
|
||
| - In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`. | ||
|
|
||
| - In Spark 3.2, `Dataset.unionByName` with `allowMissingColumns` set to true will add missing nested fields to the end of structs. In Spark 3.1, nested struct fields are sorted alphabetically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
union of top-level columns is also "left dominant", this makes sense.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Show resolved
Hide resolved
| case (Some(cf), expectedType: StructType) if cf.dataType.isInstanceOf[StructType] => | ||
| val extractedValue = ExtractValue(col, Literal(cf.name), resolver) | ||
| val combinedStruct = addFields(extractedValue, expectedType) | ||
| if (extractedValue.nullable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to see why null handling is needed here. I think we should move the null handling to where we return CreateNamedStruct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I just copied this from the old version. @viirya is there a reason all these nullable checks needed to be added? I don't know what you mean by move the null handling to where we return CreateNamedStruct though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err maybe I didn't copy that directly from somewhere, just saw it used other places. Not sure if/where/when we need the nullable checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean:
val newExpr = CreateNamedStruct(existingExprs)
If (expr.nullable) {
If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
} else {
newExpr
}Otherwise I don't find nullable check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, the code posted by @viirya is more widely used in the spark codebase. The rationale is pretty simple: CreateNamedStruct never return null and we need to handle the case if input is null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, why was that needed? I copied that I guess because I thought it might be needed here as well for some reason. Is that basically trying to keep nulls down to the lowest level of the struct, instead of taking a null struct and making a non-null struct with all null values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the null check to the CreatedNamedStruct
| colType.fields | ||
| .filter(f => targetType.fields.find(tf => resolver(f.name, tf.name)).isEmpty) | ||
| .foreach { f => | ||
| newStructFields ++= Literal(f.name) :: ExtractValue(col, Literal(f.name), resolver) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to add fields only in left side at the end of struct? Doesn't it match original field order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the left is projected this should match the original, but when the right is projected this will contain things on the right that aren't in the left. Basically it's
rightChild = left ++ (right - left)
leftChild = rightChild ++ (left - rightChild) = rightChild
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the project? Do you mean
val rightChild = Project(rightProjectList ++ notFoundAttrs, right)?
It is top level column projection. I mean the nested column field order.
newStructFields contains the (nested) struct fields both in left and right column in right order.
Then here it adds (nested) struct fields only in left back to newStructFields, before create new struct (CreateNamedStruct).
Do we reorder the fields later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rightProjectList contains the nested structs mapped in the order of left fields then remaining right fields recursively, so that's where all the reordering happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then leftChild is created from the fields in rightChild which already has all the fields as that point, which is the left fields and then the right fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rightProjectList contains the nested structs mapped in the order of left fields then remaining right fields recursively, so that's where all the reordering happens
The projection projects original right attributes to rightProjectList. If you have different nested column order, it will be projected to new order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The projection is not for reordering the nested column.
I look the code more in details.
targetType is actually left side type in first call. So here we align a right struct column to left struct column.
So it makes sense to add left nested columns first (newStructFields), then add nested columns only in right struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I kept most of the naming which gets a little weird with how left/right/source/target are constructed and depends on where in the codepath you are
| } | ||
| } | ||
|
|
||
| CreateNamedStruct(newStructFields.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @cloud-fan means to add the null check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I understand now
|
Test build #140205 has finished for PR 33040 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After addressing the null check and method comment, I think this should be fine.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
Outdated
Show resolved
Hide resolved
…e unused variable
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #140261 has finished for PR 33040 at commit
|
|
Thanks @Kimahriman! Merging to master. |
| // like that. We will sort columns in the struct expression to make sure two sides of | ||
| // union have consistent schema. | ||
| // We have two structs with different types, so make sure the two structs have their | ||
| // fields in the same order by using `target`'s fields and then inluding any remaining |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inluding -> including
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦
…or unionByName with null filling This PR changes the unionByName with null filling logic to append new nested struct fields from the right side of the union to the schema versus sorting fields alphabetically. It removes the need to use UpdateField expressions, and just directly projects new nested structs from each side of the union with the correct schema. This changes the union'd schema from being alphabetically sorted previously to now "left dominant", where the fields from the left side of the union are included and then the missing ones from the right are added in the same order found originally. Certain nested structs would cause unionByName with null filling to error out due to part of the logic for rewriting the expression tree to sort the structs. Yes, nested struct fields will be in a different order after unionByName with null filling than before, though shouldn't cause much effective difference. Updated existing tests based on the new StructField ordering and added a new test for the case that was broken originally. Closes apache#33040 from Kimahriman/union-by-name-struct-order. Authored-by: Adam Binford <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
What changes were proposed in this pull request?
This PR changes the unionByName with null filling logic to append new nested struct fields from the right side of the union to the schema versus sorting fields alphabetically. It removes the need to use UpdateField expressions, and just directly projects new nested structs from each side of the union with the correct schema. This changes the union'd schema from being alphabetically sorted previously to now "left dominant", where the fields from the left side of the union are included and then the missing ones from the right are added in the same order found originally.
Why are the changes needed?
Certain nested structs would cause unionByName with null filling to error out due to part of the logic for rewriting the expression tree to sort the structs.
Does this PR introduce any user-facing change?
Yes, nested struct fields will be in a different order after unionByName with null filling than before, though shouldn't cause much effective difference.
How was this patch tested?
Updated existing tests based on the new StructField ordering and added a new test for the case that was broken originally.