-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-7207] Sequentially delete complete instant files in archival to prevent inconsistency during data reads #10325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…lly to prevent data errors during data reads.
| // and instants are deleted in ascending order to prevent the occurrence of such holes. | ||
| completedInstants.stream() | ||
| .forEach(instant -> activeTimeline.deleteInstantFileIfExists(instant)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the archived instants should be old, do you encouter data loss for table in production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I encountered this issue in an older version of Hudi, but I believe that the current version of Hudi has not addressed this problem.
As the example illustrates, let's assume that the order of ArchiveToInstant after sorting is 1, 2, 3, 4. Given a concurrency level of 2, instants 1 and 2 will be processed on one thread, while instants 3 and 4 will be processed on another. Suppose that the deletion of instant 1 is slow, and instants 3 and 4 are deleted first. This situation leads to the creation of an 'instant hole' in the timeline. If a query retrieves the timeline at this point, according to the rules for determining the visibility of the timeline, the files corresponding to instants 3 and 4 would be considered invisible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before release 1.0, Hudi relies on the existing file naming for file slicing. As long as there is parquet in the file group, the log would finally use the parquet instant time as its base instant time. The file slicing is not dependent to the commit metadata, the archiving sequence should not affect the file slicing version.
Since release 1.0, we use completion time file slicing, and the removing sequence does not matter because the completiom time of log is deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, guess you are talking about this line:
hudi/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
Line 153 in dd31c9b
| if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@majian1998 This is a good find. The issue you mentioned is legit, though it can only happen during the parallelized deletion of instant files in the active timeline.
@danny0405 yeah, you pointed the right method. The file group instance uses the active timeline loaded to determine if a file slice is committed, if either is true: (1) the active timeline contains the instant time, or (2) the instant time is smaller than the start of the active timeline.
/**
* A FileSlice is considered committed, if one of the following is true - There is a committed data file - There are
* some log files, that are based off a commit or delta commit.
*/
private boolean isFileSliceCommitted(FileSlice slice) {
if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) {
return false;
}
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime());
In the case @majian1998 mentions, i.e., 3.deltacommit is deleted first from the active timeline 1.deltacommit, 2.deltacommit, and 4.deltacommit, the data files written by 3.deltacommit are considered non-committed in such a transient state, since neither condition of the above is true.
So, the instant files have to be deleted in order based on the instant time (not the case as of master). Within the same instant time, the files should be deleted in the order of REQUESTED, INFLIGHT, COMPLETED (<ts>.commit etc. should be deleted last, which is the case now).
After looking at the history, the deletion did happen in order before until #3920 introduces the parallelized deletion. We need to revert that. In normal cases where archival runs for every commit, usually a handful of instant files are deleted so the performance hit may not be noticeable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A side note: Hudi makes sure the inflight data files are cleaned before archival happens on the relevant instants. So the base files with the instant time before the start of the active timeline are always committed, thus the check logic in isFileSliceCommitted(FileSlice).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! @yihua You got it! Haha. Just as @danny0405 pointed out, that piece of code isFileSliceCommitted(FileSlice) will have issues during concurrent deletion of completed instants. The order of deletion doesn't matter for inflight ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I believe that we just need to serialize the deletion of completed instants in order right here, while we can still maintain the concurrency of deleting inflight ones. cc @danny0405 @yihua
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. @majian1998 do you want to take a stab to fix the issue in this PR?
…lly to prevent data errors during data reads.
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
We encountered a similar issue internally. I think a viz will better help users understand this issue here: The digits in the square parenthesis represents the deletion order of the instants. ExplanationThere are 2 jobs that were technically running concurrently, 1 writing to BR, the other writing to CL. BR started and ended right before CL started. BR start: 2024-04-16 12:22:42.348 CL start: 2024-04-16 12:26:20.836 The HoodieSparkCOW table was initialised at around The archival was not performed in order, causing holes (non-contiguity) in the timeline while The smallest instant This causes the executors to be initialised with a metaclient that had the same holes/gaps in the timeline as the table is SerDe to the executors. Bottom right snippet represents the state in which the executor is initialized in when requesting for base file. Although timeline hash doesn't match, after performing a When running public boolean containsOrBeforeTimelineStarts(String instant) {
return instants.stream().anyMatch(s -> s.getTimestamp().equals(instant)) || isBeforeTimelineStarts(instant);
}Due to the non-contiguity of the timeline where |

This PR is more about discussing with everyone how to fix the existing issues.
Assuming there are 4 instants in a Hudi table that need to be archived, with timestamps in ascending order (as they have been sorted after obtaining instantToArchive): these are 1.deltacommit, 2.deltacommit, 3.deltacommit, and 4.deltacommit, corresponding to the files a.parquet, b.parquet, c.parquet, and d.parquet, respectively.
In the archiving code, the deletion of instants is handled by the following code snippet:
Different instants are distributed across different threads for execution. For instance, in Spark with a parallelism of 2, they would be distributed as 1 and 2, and 3 and 4. Consequently, there may be scenarios where instant 3 is deleted before instant 2. If instants 1 and 3 are deleted while 2 and 4 are not yet deleted, a query request obtaining visibleCommitsAndCompactionTimeline at this point would find a timeline with instants 2, 4, and so on.
During a query, this would result in the data under c.parquet, corresponding to instant 3, becoming completely invisible. I believe this is a very problematic situation, as users could unknowingly retrieve incorrect data.
Here are a few potential solutions I've considered:
1.Prohibit concurrent deletion of completed files. While this would ensure the order of deletions, it could significantly impact performance, which is not an optimal solution. Serially deleting instants may be slower, but as there are usually few to remove, it is an acceptable stopgap. The change can be undone when a superior solution is found.
2.Implement a solution similar to a marker file, recording which instants are in the process of being deleted, and then remove these instants directly from the timeline during reads.
3.Based on the second solution, incorporate archiving by adding archive instants to the timeline, allowing for direct retrieval of pending archives during data reads. Here I have a question: why don't previous archives have corresponding instant action?
Change Logs
delete complete instants serially.
Impact
None
Risk level (write none, low medium or high below)
Low
Documentation Update
None
Contributor's checklist