-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2711] Fallback to fulltable scan for IncrementalRelation if underlying files have been cleared or moved by cleaner #3946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@nsivabalan Could you please review this ? |
| log.info("Checking if paths exists took " + timeTaken + "ms") | ||
|
|
||
| val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) | ||
| val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a user standpoint, I would expect we should fallback to first valid commit in active timeline which cleaner has not cleaned up. But guess from an impl standpoint, we can't find this commit that easily. And so is the rational to fallback to snapshot query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad. from re-reading the description, guess the fix does not sit well. cleaner will not touch the timeline right. So, how do we know if a commit has been cleaned up or not (bcoz, it could still be part of active timeline). may be I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisited this patch. I get it now.
So, we are fixing two things.
1: a commit is valid in active timeline, but corresponding data files are cleaned up.
2: begin commit is archived.
Makes sense to me.
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a high level comment.
|
something to think about as a potential solution. |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some clarifying comments. Changes look good to me in general.
| log.info("Checking if paths exists took " + timeTaken + "ms") | ||
|
|
||
| val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) | ||
| val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisited this patch. I get it now.
So, we are fixing two things.
1: a commit is valid in active timeline, but corresponding data files are cleaned up.
2: begin commit is archived.
Makes sense to me.
| val timer = new HoodieTimer().startTimer(); | ||
|
|
||
| val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths | ||
| val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fs.isExists() should be routed to metadata table.
| .schema(usedSchema) | ||
| .format("hudi") | ||
| .load(basePath) | ||
| .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand how does this work?
Lets take the example in the tests added.
C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9
C0 to C3 is archived.
C4 and C4 are cleaned.
Active timeline: C4 to C9.
If someone tries incremental query with C4 and C5 as begin and end,
do we do full scan of table for records with commit time > C4 and <= C5?
Whats the checkpoint returned at the end ? Is it C5 so that next time the caller will make incremental query with begin time C5?
So, in this case, if records pertaining to C4 and C5 have been updated by future commits, we may return empty df is it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess my question on incremental query checkpoint may not make sense. If consumer is a deltastreamer, it will keep track of commits consumed and will send back C5 for next round. The query as such may not return any explicit checkpoint. Correct me if my understanding is wrong.
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I have created a follow up to address some of the feedback. Will go ahead and land this.
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
Outdated
Show resolved
Hide resolved
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. have filed a follow up jira to address feedback
https://issues.apache.org/jira/browse/HUDI-3189
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsbali : Can you review the patch once. I made some minor updates, but had to resolve conflicts w/ latest master. just wanted to ensure things are in good shape.
|
Adding more context here There is one other way where we skip the HoodieFileIndex path and directly take in dir glob pattern but that is one extra config which needs to be maintained. |
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
Outdated
Show resolved
Hide resolved
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets clarify with some experts. I am not comfortable changing the DefaultSource.
|
I am removing this one from 0.10.1 as it needs some discussion to be resolved. but lets try to get a closure soon. |
|
@jsbali : I have made some fixes and removed the changes in DefaultSource. can you take a look. |
…erlying files have been cleared or moved by cleaner
…tion column gets added to the end and fails the union of DF while doing incr scan fallback
|
@nsivabalan Thanks a lot for seeing this through. LGTM |
|
@jsbali : Do you think you can take up the similar work for MOR. I can assist you if need be. |
…erlying files have been cleared or moved by cleaner (apache#3946) Co-authored-by: sivabalan <[email protected]>
…erlying files have been cleared or moved by cleaner (apache#3946) Co-authored-by: sivabalan <[email protected]>
…erlying files have been cleared or moved by cleaner (apache#3946) Co-authored-by: sivabalan <[email protected]>

Tips
What is the purpose of the pull request
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.