-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data #8503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6047] Clustering operation on consistent hashing index resulting in duplicate data #8503
Conversation
| ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); | ||
|
|
||
| table.getIndex().updateMetadata(table); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why triggers the fix during archival?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The archival process will archive replace commit from active timeline, once it does that , all the hudi writer will start referring default metadata index file that is 00000000000000.hashing_meta , check the function loadMetadata(HoodieTable table, String partition). That's the reason it is necessary to trigger the update metadata function before archival , so that it will bring 00000000000000.hashing_meta file in sync with latest metadata commit file .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @xiarixiaoyao , can you review the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, could the updateMetadata be invoked after archive or the caller of getCommitInstantsToArchive? IMO, getCommitInstantsToArchive only gets the commit instants to archive, no any update behavior. Therefore, the updateMetadata should not trigger in getCommitInstantsToArchive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , make sense. Updated the code to call after getCommitInstantsToArchive .
Please review
|
cc @SteNicholas for reviewing. |
| Map<String, Boolean> partitionVisiteddMap = new HashMap<>(); | ||
| HoodieTimeline hoodieTimeline = table.getActiveTimeline().getCompletedReplaceTimeline(); | ||
| hoodieTimeline.getInstants().forEach(instant -> { | ||
| Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When replacecommit comes from INSERT OVERWRITE action, is there any problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No i think, every clustering operation on consistent hashing index engine will create new metadata file, that will be <instant_time>.hashing_meta , this particular piece of code will make 00000000000000.hashing_meta in sync with <instant_time>.hashing_meta before triggering archival as replace commit might get archived from active timeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, the getCompletedReplaceTimeline() returns the completed replacecommit, which could generate from clustering operation and insert overwrite action. No problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay got it.I don't see any issue with insert overwrite action ,with insert overwrite action ClusteringUtils.getClusteringPlan will return empty plan for insert overwrite action. Just added check to ignore empty plan.
Thanks for pointing out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, BTW, does the replacecommit in archived timeline need to update metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , No i think. loadMetadata function refers to active timeline(replaced commit) to load metadata for reader and writers. Hence we are only interested in replace commit from active timeline.
And this code will make sure to keep replace commit in sync before archiving the replace commit..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, another question is that in HoodieTimelineArchiver, the replacecommits after oldestInstantToRetainForClustering aren't archived, but the replacecommits before oldestInstantToRetainForClustering would be archived. Therefore does all completed replacecommits need to update metadata? Or only unarchived replacecommits need to be updated metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , using all replace commits to update metadata will not cause any issue. Its just that in some cases it will update default metadata file for particular partitions multiple times, until it get archived.
Ideally it should update metadata for replace commit which are going to get archived replacecommits before oldestInstantToRetainForClustering .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , Updated the code to update metadata for replaced commits which are eligible for archival in the current run. This will avoid un-necessary metadata update.
Please review .
| */ | ||
| public void updateMetadata(HoodieTable table) { | ||
| Map<String, Boolean> partitionVisiteddMap = new HashMap<>(); | ||
| Option<HoodieInstant> hoodieOldestReplaceInstantToKeep = getOldestInstantToRetain(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the oldest instant to retain get from interface parameter? Otherwise the oldest instant gets twice and consumes some performance here and will be inconsistent between hoodieOldestReplaceInstantToKeep and oldestInstantToRetainForClustering in getCommitInstantsToArchive when the timeline changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i can make the change in interface. let me do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed redundant code. modified interface
...park-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
Outdated
Show resolved
Hide resolved
...park-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
Outdated
Show resolved
Hide resolved
| FSDataOutputStream fsOut = fs.create(fullPath, true); | ||
| byte[] bytes = metadata.toBytes(); | ||
| fsOut.write(bytes); | ||
| fsOut.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After above suggestion, fsOut.close() would invoke in finally block dynamic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks. committed the suggestions
| byte[] bytes = metadata.toBytes(); | ||
| fsOut.write(bytes); | ||
| } | ||
| byte[] bytes = metadata.toBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need remove line 338~340.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
@rohan-uptycs, could you add the test cases for this change? |
@SteNicholas, Sure will do |
@SteNicholas , added the test cases please check |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
Outdated
Show resolved
Hide resolved
| * if hoodieOldestInstantToArchive is null that means nothing is getting archived, so no need to update metadata | ||
| */ | ||
| if (hoodieOldestInstantToArchive != null) { | ||
| table.getIndex().updateMetadata(table, Option.of(hoodieOldestInstantToArchive)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think invoking updateMetadata in getInstantsToArchive make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so from where this function should get called ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInstantsToArchive is where archival process getting list of commits to archives. What issues you see for making the update metadata call after getInstantsToArchive ? Just wanted to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, getInstantsToArchive is also used to get instants to archive, therefore the update behavior couldn't invoke in this method for design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , understood your concern. Should we run update metadata in postcommit of clustering operation?
hudi/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Line 522 in b690346
| protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { |
Will that makes sense??
As new metadata file will get created only with clustering operation in consistent hash engine.
But there are some concerns what if postcommit operation fails due to some reason, then metadata sync will not happen , and metadata state will remain in in-consistent state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, when postCommit executes successfully, updateMetadata could invoke.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , Yeah it can be invoked but i see few problems with it
What if underlying file system is down and updateMetadata fails to sync metadata, then there is no mechanism to bring it in sync with latest committed metadata, and archival will remove replace commit eventually and it will end up in an inconsistent state.
On the other hand in archival process , it will be eventually in sync with committed metadata before replace commit getting archived.
I think consistent hashing metadata has strong dependency on archival process, As it is dependent on active timeline replaced commit to load metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we rename interface method as updateArchivalDependentIndexMetadata and call it in archiveIfRequired method will that be okay ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-uptycs, make senses to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SteNicholas , done. please review
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
Outdated
Show resolved
Hide resolved
|
@rohan-uptycs, please rebase master branch instead of merge. |
Sure |
| String partition = partitionMap.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); | ||
| if (!partitionVisiteddMap.containsKey(partition)) { | ||
| Option<HoodieConsistentHashingMetadata> hoodieConsistentHashingMetadataOption = loadMetadata(table, partition); | ||
| if (hoodieConsistentHashingMetadataOption.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still confused why we defer the metadata fix until archiving, shouldn't we update the consistent hashing metadata right after we finishing the clustering commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 , There are consistency issues we might face if we try updating metadata after finishing clustering commit.
What if underlying file system is down and updateMetadata fails to sync metadata, then there is no mechanism to bring it in sync with latest committed metadata, and archival will remove replace commit eventually and it will end up in an inconsistent state.
On the other hand in archival process , it will be eventually in sync with committed metadata before replace commit getting archived.
I think consistent hashing metadata has strong dependency on archival process, As it is dependent on active timeline replaced commit to load metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me give few examples which can cause inconsistency
Example 1
lets say we are updating metadat before clustering commit
1 - written the data files on storage
2 - update metatda
3 - commited clustering operation on hudi timeline by creating replace commit
now let's say 2nd operation is done but 3rd operation fails in this case metadata got synced but clustering is failed. all the subsequent write operation will read from failed clustering metadata synced file.
Example 2
lets say we are updating metadat after clustering commit
1 - written the data files on storage
2 - commited clustering operation on hudi timeline by creating replace commit
3 - update metadata
now let's say 3rd operation of updating metadata fails , then latest metadata commited file will not be in sync default metadata file (0000000*.meta),and there is no scheduled mechanism to bring it in sync state. hence once archival archives latest metadata file related replace commit from timeline, all the writer will start reading from default metadata file (0000000*.meta) and this will cause data duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if underlying file system is down and updateMetadata fails to sync metadata, then there is no mechanism to bring it in sync with latest committed metadat
Just put the update into the same transaction of the clustering operation for consistency hash indexing.
That is, update the metadata first, then transite the state from inflight to complete for this replace commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 , Understood, one question lets say transaction fails at transitioning the state from inflight to complete replace commit, does update to metadata file(default file 00000.meta) operation get reverted*?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but i guess we do not have very good manner to put them into one atomic operation, we can do some adjust like when job recovering, fix the metadata if there is in-consistency. but in general, fail for file transition is a very rare case I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will add the trigger in clustering operation itself. let me see how we can do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 @SteNicholas modified the approach.
Creating commit marker file for completed clustering operation post commit.
Reader will use commit marker as indicator to get latest metadata for consistent hashing.
If commit marker fails post commit, there is recovery job which will decide if its a valid latest metadata and create the marker file.
Please review.
|
Thanks for the contribution, I have reviewed and created a patch: BTW, why there are 34 commits, can you squash them into one for review convenience/ |
Sure will do |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
Outdated
Show resolved
Hide resolved
...park-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
Outdated
Show resolved
Hide resolved
|
|
||
| /*** | ||
| * Create commit marker corresponding to hashing metadata file after post commit clustering operation | ||
| * @param table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adds the comment of all parameters and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...park-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| /*** | ||
| * Load consistent hashing metadata from given file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Load consistent hashing metadata from given file | |
| * Loads consistent hashing metadata of table from the given meta file. |
|
|
||
| /*** | ||
| * Load consistent hashing metadata from given file | ||
| * @param table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adds the comment of all parameters and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * Note : we will end up calling this method if there is no marker file and no replace commit on active timeline, if replace commit is not present on | ||
| * active timeline that means old file group id's before clustering operation got cleaned and only new file group id's of current clustering operation | ||
| * are present on the disk. | ||
| * @param table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adds the comment of all parameters and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ent/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java
Outdated
Show resolved
Hide resolved
97db49b to
bacce6b
Compare
| */ | ||
| public void commitIndexMetadataIfNeeded(HoodieTable table, String hoodieInstant) { | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a new interface? Can we just make the metadata right in updateLocation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateLocation is pre-commit operation, that's the reason i haven't used it.I was thinking we can use commitIndexMetadata interface for post commit operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me know your thoughts on this. idea is, need to create commit marker file once clustering operation gets committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we can do if we don't want to use CommitIndexMetadta interface is to let loadMetadta function create commit marker file , when reader ask for updated metadata, if replace commit is on active timeline it will create marker file for it else Commit marker recovery function(recommitMetadataFile) will decide if this metadata file is latest or not and create marker file if it is latest.
Only cost of this will be listing all filegroups for that partition and comparing against metadata file. this will be done only once when commit file is not present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in no way, the updateLocation can ensure the automcity of index metadata and the instant commit, we can fix that in similar way like what we do in loadMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah in updateLocation itself we can't ensure atomicity of clustering operation.
In UpdateLocation we are already creating metadata through (saveMetadata), we can treat that metadata as in-flight metadata, and loadMetadata will decide if that metadata can be committed or not. I think we can use this mechanism to ensure consistency of metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, then we can get rid of the in-consistency and also the method commitIndexMetadataIfNeeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sure , let me add code change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done , removed the commitIndex interface. please review
bacce6b to
ad6db98
Compare
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, nice findings.
…g in duplicate data (apache#8503)
…g in duplicate data (apache#8503)
…g in duplicate data (apache#8503)
| public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) { | ||
| Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); | ||
|
|
||
| Path partitionPath = FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a typo here? partitionPath is data partition path now, e.g. hdfs://.../hudi_table/date=2023-12-01/, which does not match with logic in hashingMetaCommitFilePredicate. Correct me if I'm wrong, did i miss anything?
According to the code below, partitionPath will be used to create a marker file in method createCommitMarker. If partitionPath is the path of data files, it will create a marker file in the partition folder, e.g. hdfs://.../hudi_table/date=2023-12-01/00000000000000.commit.
However, the code below is looking for the committed metadata files in the folder metadataPath.
final FileStatus[] metaFiles = metaClient.getFs().listStatus(metadataPath);
final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
.map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));And in the test case, TestSparkConsistentBucketClustering.testLoadMetadata, it also looks for the committed metadata files in the folder metadataPath
Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), p);
try {
Arrays.stream(table.getMetaClient().getFs().listStatus(metadataPath)).forEach(fl -> {
if (fl.getPath().getName().contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX)) {
try {
// delete commit marker to test recovery job
table.getMetaClient().getFs().delete(fl.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
BTW, the code has been refactored in a new class, https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java#L106
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code, it creats a marker file under data partition while a normal meta file under metadata path, the marker file is used for recovery consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, correct, the marker file is under data partition. So, as I understand, commitMetaTss will always be an empty set. Because metaFiles is not data partition, it is FSUtils.getPartitionPath(metaClient.getHashingMetadataPath(), partition).
final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
May I ask how the recovery consistency works? Which part of code I should refer? Really appreciate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TengHuo Yeah it looks like a Typo. It should be metadata path. Idea is create commit marker file for every hashing_metadata file, so that we will able to identify latest metadata file.
@danny0405 thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, can you fire a fix @TengHuo @rohan-uptycs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 @TengHuo, I will raise a PR in sometime.
Change Logs
Following changes added
Impact
None
Risk level (write none, low medium or high below)
none
Documentation Update
none
Contributor's checklist