-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS #4739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prashantwason : Can you review this patch. We are modifying the way we combine multiple metadata records.
Here is the rational:
With delta writes, we only get delta size, but w/ rollback metadata, we only get full file sizes. So in hdfs storage, we might run into issues.
So, in this patch, we are fixing how we set the size with delta commits in metadata.
and so the combining logic becomes simpler. pick the higher value always if not deleted.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
071c618 to
ea49cc7
Compare
| } | ||
|
|
||
| protected long computeFileSizeInBytes() throws IOException { | ||
| protected long computeTotalWrittenBytes() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeFileSizeInBytes is a better name here and matches the FSUtils.getFileSize.
It is an internal details that for CreateHandle the total written bytes will be equal to the file size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
| // Combine previous record w/ the new one | ||
| // NOTE: New records _always_ take precedence over the old one, as such no special case | ||
| // handling is actually necessary | ||
| combinedFileInfo.putAll(filesystemMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not take care of deletes.
If the filesystemMetadata has a delete, we want to remove it rather than adding a delete record into combinedFileInfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prashantwason can you please elaborate on why we want to have this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scrap that comment, figured out what you're referring to
ea49cc7 to
11f1b68
Compare
| // - First we merge records from all of the delta log-files | ||
| // - Then we merge records from base-files with the delta ones (coming as a result | ||
| // of the previous step) | ||
| (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to take the max of either of sizes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prashantwason : aren't there any multi-writer scenarios, where a new commit could report a file w/ lower size compared to a previous commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nsivabalan i don't think this is possible, since we never delete in-place from files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, let me take a stab to see if this is a valid scenario. this could be applicable only for hdfs.
Let's say there is a concurrent write to a log file by two diff writers. one of them is doing a rollback and another is appending a log block. Let's say writer1 (who is doing a rollback) updates the log file first and gets size as 200 may be. And later writer2 appends to same log file and gets size as 300. Even though the order in which these writers appended to file could be writer1 followed by writer2. Its not guaranteed that the same order will be maintained when they reach metadata table. So, due to various reasons writer2 could complete its write earlier(for eg writer2 is updating only one file group, where as writer1 is updating some 100 file groups) and could apply its changes to metadata first before writer1.
wouldn't the final resolved size from metadata be 200 in this case when both writers are done updating MDT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've brought up interesting point. First of all, MT updates and actual FS changes are actually part of the commit operation which is atomic (as in: it's performed under lock).
But if extend your train of thought, and assume for a moment that it could be possible that MT updates and FS operations aren't atomic, we actually will not be able to guarantee consistency b/w actual state of the FS and MT, b/c at any point MT operations (accompanying actual FS ops) could get re-ordered which practically would entail that we will potentially have incorrect MT state b/c "merge" of MT records is not a commutative operation (b/c it inherently relies on ordering for deletes). Example of such inconsistency would be "resurrecting file" where (in order)
- File F is Appended (A_1)
- File F is Deleted (D_1)
- MT record (deleted=true) is added (for D_1)
- MT record (size=..., deleted=false) is added (for A_1)
Therefore, we will have to provide for such atomicity (which we do)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. but the scenario which you quoted are unlikely to happen in general.
a. since cleaner will not touch latest file slice where regular writes go into (MOR table).
b. incase of COW rollback, there is no log file concept and hence a rollback and a commit touching the same file is very unlikely.
So, two concurrent writes where one of them deletes while the other updates or adds a new file : I can't think of a valid scenario. may be there is, we need to think hard to see if we can come up w/ one.
But the scenario of rollback and concurrent updates to same file slice might happen in MOR table.
also, the fix that is being considered may not cause any regression and could possibly help thwart some of the scenarios. i.e taking the max file size rather than latest file size. Do you see any issues specifically to go w/ max of file sizes in merge function? Atleast for the rollback and regular writer in MOR use-case, we have to make the fix. I don't think we can ignore the use-case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nsivabalan we're not ignoring the use-case. My point was that FS changes and MT updates have to be atomic. Otherwise we can't guarantee MT consistency. They're atomic currently, so the use-case you've described is impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted offline w/ @nsivabalan:
- We can’t assume that MT update records will be ordered the same
way as actual FS operations (these are not atomic)
- AI: MT record merging should be a commutative operation (not
assuming the records ordering)
- This is possible for file-sizes
- This is not possible for deletes. However we’re assuming
that the case of concurrent write and deletion of the same
file is not possible
- This would only be possible with concurrent upset and
rollback operation (affecting the same log-file), which
is implausible, b/c either of have to be true:
- We’re appending to failed log-file (then the other
writer is trying to rollback it concurrently, before
it’s own write)
- Rollback (of completed instant) is running
concurrently with append (meaning that restore is
running concurrently with a write, which shouldn’t
occur see below)
- Rollback should not be used to rollback completed instants (except
in the Restore use-case)
- AI: We need to guard for that
- [HUDI-3407] Restore can NOT be run concurrently with any writes
- AI: We need to guard for that (for ex, we can take a look for the
whole duration of the Restore)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks a lot for capturing the gist. So, I assume you plan to fix the merge function to do max(old size, new size).
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one nit. LGTM.
b24eaa1 to
3130966
Compare
|
@hudi-bot run azure |
|
@hudi-bot run azure |
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Show resolved
Hide resolved
| // - First we merge records from all of the delta log-files | ||
| // - Then we merge records from base-files with the delta ones (coming as a result | ||
| // of the previous step) | ||
| (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better API usage than original verbose code.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Outdated
Show resolved
Hide resolved
prashantwason
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good.
9c39119 to
6498d01
Compare
db249ff to
4021121
Compare
|
@hudi-bot run azure |
ebf8acc to
ac98ca7
Compare
f88dbd7 to
ab452d9
Compare
2790e24 to
2f65bc5
Compare
|
@hudi-bot run azure |
… records creation; Tidying up
…lication across engines
Wired it up
Tidying up
…blic facing API; Cleaned up dead-code, restricted access for other methods
cb35475 to
157095d
Compare
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
…y on HDFS (apache#4739) - This change makes sure MT records are updated appropriately on HDFS: previously after Log File append operations MT records were updated w/ just the size of the deltas being appended to the original files, which have been found to be the cause of issues in case of Rollbacks that were instead updating MT with records bearing the full file-size. - To make sure that we hedge against similar issues going f/w, this PR alleviates this discrepancy and streamlines the flow of MT table always ingesting records bearing full file-sizes.
…y on HDFS (apache#4739) - This change makes sure MT records are updated appropriately on HDFS: previously after Log File append operations MT records were updated w/ just the size of the deltas being appended to the original files, which have been found to be the cause of issues in case of Rollbacks that were instead updating MT with records bearing the full file-size. - To make sure that we hedge against similar issues going f/w, this PR alleviates this discrepancy and streamlines the flow of MT table always ingesting records bearing full file-sizes.
| // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits | ||
| List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime); | ||
|
|
||
| Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May i know why we rollback the failed commits before doing the upgrade ? We already try to do that when start commit:
hudi/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Line 935 in 1562bb6
| CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code was migrated as is so can't speak up from historical context, but my hunch is that we do that to make sure table is in a consistent state (no leftovers of failed commits) when we start the upgrade process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure table is in a consistent state (no leftovers of failed commits) when we start the upgrade process.
My confusion is why we need to do that for upgrade ?
Is there any restriction here for correctness ? The code before the patch does not do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code was doing it in some branches (for Spark) but not in others. This PR is simply reconciling that and making behavior consistent across the board. In general, given that upgrade/downgrade could be acting upon and modifying table's current state it has to be in a consistent one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, given that upgrade/downgrade could be acting upon and modifying table's current state it has to be in a consistent one
I'm wondering the exact point for the background, i have two questions here:
- we already do a rollback when starting a new instant, so why here we rollback again ?
- the intermediate/corrupted data would sooner or later be rolled back, and i see that most of the upgrade/downgrade logic does not touch the data files/but only the table configs, so why there is need to rollback here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is how the code was in 0.9.0
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
this.txnManager.beginTransaction();
try {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER));
new SparkUpgradeDowngrade(metaClient, config, context)
.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
} finally {
this.txnManager.endTransaction();
}
} else {
upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
}
and here is how it looks in 0.10.0
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(
metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
metaClient.reloadActiveTimeline();
initializeMetadataTable(Option.of(instantTime));
}
So, atleast from 0.9.0, its evident that we were adding this only for multi-writer scenario. i.e when someone migrates from a single writer to multi-writer, we just wanted to ensure we rollback any partially failed commits.
But I don't see any gaps as such w/ current state of things. bcoz, this code gets exercised only when an upgrade is required. So, as per the guideline, we should have only only writer in flight when an upgrade happens. So, we can do eager rollbacks irrespective of whether multi-writer is enabled or not.
Let me know if you can think of a scenario where we can't trigger eager rollbacks during upgrade ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 : happy to jam to see if there are any gaps here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fired a fix here, i think there needs some improvement here: #5535
Tips
What is the purpose of the pull request
This change makes sure MT records are updated appropriately on HDFS: previously after Log File append operations MT records were updated w/ just the size of the deltas being appended to the original files, which have been found to be the cause of issues in case of Rollbacks that were instead updating MT with records bearing the full file-size.
To make sure that we hedge against similar issues going f/w, this PR alleviates this discrepancy and streamlines the flow of MT table always ingesting records bearing full file-sizes.
Brief change log
Verify this pull request
This pull request is already covered by existing tests, such as (please describe tests).
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.