-
Notifications
You must be signed in to change notification settings - Fork 2k
Add a nested schema pruning test case for Update #1371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a nested schema pruning test case for Update #1371
Conversation
|
If this method is acceptable I have a working version for delete, and merge might be able to benefit from this as well. |
| .collect() | ||
| } | ||
|
|
||
| // If no files need to be updated, the observation never gets initialized, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reword this comment a little bit? It was confusing at first because I thought the following code would be the case when no files need to be updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean, how about:
// Only get the metrics if there are files to be updated. If there are no files that need
// to be updated, the observation may never get initialized and `.get` will hang.
|
My understanding this uses Observable API which is based on query listener metrics, so perhaps it’s a bit more brittle for file pruning. I’ve seen cases query listener can miss some events when spark.scheduler.listenerbus.eventqueue.capacity is set |
|
Yeah I guess you're right, it's all based on the QueryExecutionListener which is doing async events that could get dropped. I'm trying to create a separate wrapper that can use the same CollectMetrics expression but pulls the metrics directly off the queryExecution. |
|
Created a util method to pull the metrics directly from |
|
| withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) { | ||
| data.filter(new Column(updateCondition)) | ||
| .select(input_file_name()) | ||
| .filter(updatedRowUdf()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why Spark doesn't do the column pruning properly? Although we can change the code to work around it, it's better to also fix it in Spark side as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because PhysicalOperation is used to determine nested fields to prune, and non-deterministic Project's aren't included in PhysicalOperation's. I feel like there's a lot of optimizations that just don't apply to non-deterministic functions, so if there's ways to avoid using them it's probably good, especially when it's just to collect metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or in this case, non-deterministic filter expressions don't get included in PhysicalOperation's. I'm not exactly sure when/how the empty Project gets added in the plan in the description. Interestingly nested fields don't get pushed through a CollectMetrics operation, which prevents full schema pruning with this method for merging, but I already have a working Spark patch to enable that for the next Spark release if we go down the CollectMetrics vs non-determinstic UDF approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And actually it's not just the metrics, but also the input_file_name that I thing combined or one of the other causes the problem, it's hard to say exactly. But you could potentially use metadata fields instead of input_file_name in the future too to help address these weird cases caused by non-deterministic functions too (which is a problem for merging)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because PhysicalOperation is used to determine nested fields to prune, and non-deterministic Project's aren't included in PhysicalOperation's.
IIRC, the non-deterministic UDF makes Spark not be able to know which columns are needed? Technically, Spark can still do the nested column pruning on the non-deterministic UDF if the UDF doesn't read these columns. Right? Do you know the exact optimization rule?
Trying to understand the entire problem better so that we can apply the same approach to other places that are using non-deterministic UDFs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah still trying to figure out the exact reason/code path. But I already have a fix for Delete which has the same problem, and a partial fix for Merge (which has additional issues that might only be fixable come Spark 3.4), which are the only other places it looks like the non-deterministic func for metrics thing is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so while digging into the why, I was testing out on Spark master and couldn't recreate the issue. After a lot of digging, I'm pretty sure apache/spark#37176 will fix this issue in Spark 3.4+. Gonna try to test all the cases (update/delete/merge) with latest Spark master and see if it addresses all the current limitations
|
Delete still needs the same update as #1202, because even with the PhysicalOperation updates, columns still can't get pushed through a Filter - Filter non-deterministic - Project combo. Still haven't tested out merge behavior with the updates. I can either close this or add a new UT with the current behavior (no nested pruning), with a comment that it should fail when upgrading to Spark 3.4. |
This sounds better. It's worth to have a unit test here. But not worth to change the prod code since it's going to be fixed soon. |
|
Sounds good, I'll update this PR and then make a separate issue/PR for delete, and try to see if anything needs to be done with merge |
3f52832 to
b0c61f8
Compare
|
Just have the updated test here now |
|
Made #1412 for delete |
|
could you resolve the conflicts? |
Conflicts: core/src/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
|
Oops sorry meant to do that when I updated it, done |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Closing this. It has been merged: acd0bd4 |
Description
#1370 will be fixed shortly after we upgrade to Spark 3.4.0. This PR just adds a test case.
How was this patch tested?
New UT.
Does this PR introduce any user-facing changes?
No