-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1353] add incremental timeline support for pending clustering ops #2388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-1353] add incremental timeline support for pending clustering ops #2388
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2388 +/- ##
=============================================
+ Coverage 50.27% 69.43% +19.15%
+ Complexity 3050 357 -2693
=============================================
Files 419 53 -366
Lines 18897 1930 -16967
Branches 1937 230 -1707
=============================================
- Hits 9500 1340 -8160
+ Misses 8622 456 -8166
+ Partials 775 134 -641
Flags with carried forward coverage won't be shown. Click here to find out more. |
500ad3d to
4423c0c
Compare
|
@vinothchandar @n3nash : gentle reminder to review :) its been few days. |
bvaradar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed most of the code. Regarding your final item in the description (below), can you point me to the relevant section.
hange replacecommit.inflight file also to include clustering plan (Previously only requested file has clustering plan). This is needed to block updates on file groups in pending clustering correctly. One disadvantage is replacecommit.inflight has sometimes avro and sometimes json (WorkloadProfile used by insert_overwrite) structure. So there is a hack needed to figure out if a inflight file is created by insert_overwrite or clustering.
| List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream() | ||
| .filter(instantPair -> instantPair.getValue().getAction().equals(HoodieTimeline.COMMIT_ACTION) | ||
| && instantPair.getValue().isCompleted()) | ||
| List<HoodieInstant> finishedViewChangingInstants = viewChangingInstants.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you construct a timeline and call timeline.viewAlteringInstants() instead to avoid duplicating the logic ?
| List<Pair<HoodieInstant, HoodieInstant>> allTransitions = new ArrayList<>(); | ||
|
|
||
| return oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant -> { | ||
| allTransitions.addAll(oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of oldTimeline.filterPendingCompactionTimeline().getInstants(),
is there scope to use
oldTimeline.viewChangingInstants() and consolidate both this and below statement where we handle replace commits ?
| void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) { | ||
| fileGroups.forEach(fileGroupInstantPair -> { | ||
| ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()), | ||
| ValidationUtils.checkArgument(!fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug in 0.7 right which will fail when RocksDBFileSystemView is used ?
| diffResult.getFinishedViewChangingInstants().stream().forEach(instant -> { | ||
| try { | ||
| removePendingCompactionInstant(timeline, instant); | ||
| if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we introduce something like HoodieInstant.isAction(String action) instead of directly checking the action names here ? There could be many such occurrence like this ?
| // Adding mandatory parameters - Last instants affecting file-slice | ||
| timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp())); | ||
| builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash()); | ||
| builder.addParameter(TIMELINE_HASH, timeline.filterCompletedAndCompactionInstants().getTimelineHash()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be filterViewChangingInstants ?
| this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator()); | ||
| this.timeline = timeline; | ||
| this.lastInstant = timeline.lastInstant(); | ||
| this.lastInstant = timeline.filterCompletedAndCompactionInstants().lastInstant(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid this. FileGroup should just be acting on the timeline given to make them composable. Can you elaborate on why is there a need to ensure lastInstant must include only completed instants ?
bvaradar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will review the rest of the section after the diff gets updated.
|
@satishkotha can you address @bvaradar comments and rebase ? I can take a pass after that. |
|
@satishkotha gentle reminder |
|
@n3nash i dont have time in next 2-3 weeks to get this done. If you prefer, we can close this one. i can reopen (same PR or a different one) when i'm ready |
|
@n3nash @satishkotha Any updates on this? generally love to get these follow ups from clustering over the fence if we can |
What is the purpose of the pull request
Brief change log
Let me know if you have any suggestions .
Verify this pull request
This change added tests. See TestIncrementalFSViewSync.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.