-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2833][Design] Merge small archive files instead of expanding indefinitely. #4078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9df4e27 to
8f8ae38
Compare
|
Hi @bhasudha. Sorry to bother you. Would you mind to take a look at this hudi on S3 related issue? |
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 Thanks for putting thoughts on improving the archived timeline. Some functionality still relies on the archived timeline, such as HoodieRepairTool. My concern is that simply keeping the most recent few archive files may cause side effects given information loss. Some further improvements I can think of are:
(1) Rewrite the archived timeline content into a smaller number of files
(2) When deleting the archived files, make sure the table does not have any corresponding base or log files from the contained instants, so there is essentially no information loss of the table states.
Wdyt?
|
Hi @yihua Thanks a lot for your attention. I agree with your opinion, indeed deleting archived files will lose historical instants information and affect some hudi functions such as HoodieRepairTool. At present, the user's use of hudi still involves the their own judgment, such as
Sometimes users still need to have a clear understanding of their configuration, just as enable archive files number will lose historical instant information, same as time travial to cleaner.(Of course we need to remind users in the document) Fortunately, users have options to use this function according on their own circumstances. If users need to keep all instant information, just disable it. If the user does not care about the instant after the archive, they can turn it on and keep a smaller value. On the other hand, I think the loss of information is inevitable, and we cannot keep all the data forever. The questions are when and how. Of course, the improvement you mentioned is very reasonable such as let hoodie implement append archive files function for unsupport-append dfs. Do you think we need to get it done in this PR or maybe we can walk step by step to reach the final state. (1) Rewrite the archived timeline content into a smaller number of files --> will lead a archive file write amplify |
|
I also echo Ethan's comment, but if this patch is guarded by a config flag, and default is not to clean up any archive files, guess we should be good. Until we have ways to fold N no of archive into 1 file, atleast users will have some way to trim it and not keep expanding indefinitely. As @zhangyue19921010 pointed out, we can call it out in our documentation and let users decide if they are ok enabling the config. |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM on high level. left some minor comments.
| .withDocumentation("The numbers of kept archive files under archived"); | ||
|
|
||
| public static final ConfigProperty<String> CLEAN_ARCHIVE_FILE_ENABLE_DROP = ConfigProperty | ||
| .key("hoodie.archive.clean.enable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this confuses me with regular clean. Can we call it as "hoodie.auto.trim.archive.files" or "hoodie.auto.delete.archive.files" or something on that end.
and and "hoodie.max.archive.files" for the previous config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tanks for your review. Changed as hoodie.auto.trim.archive.files and hoodie.max.archive.files
| while (iter.hasNext()) { | ||
| files.add(iter.next()); | ||
| } | ||
| assertEquals(archiveFilesToKeep, files.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add assertion that earliest files are deleted and not latest ones in archive folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
As discussed offline, we should warn users to avoid the config if they don't understand the mechanism. They should only use it if they know what they are doing. We can follow up with more comprehensive mechanism around cleaning the archived timeline. @zhangyue19921010 you can create a Jira ticket to track the future directions. |
|
|
||
| public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty | ||
| .key("hoodie.max.archive.files") | ||
| .defaultValue("10") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this noDefault() in case it's accidentally invoked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thing. Changed.
| public static final ConfigProperty<String> AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty | ||
| .key("hoodie.auto.trim.archive.files") | ||
| .defaultValue("false") | ||
| .withDocumentation("When enabled, Hoodie will keep the most recent " + MAX_ARCHIVE_FILES_TO_KEEP_PROP.key() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a WARNING in both configs. sth like : WARNING: do not use this config unless you know what you're doing. If enabled, details of older archived instants are deleted, resulting in information loss in the archived timeline, which may affect tools like CLI and repair. Only enable this if you hit severe performance issues for retrieving archived timeline. (feel free to add more details)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate it. Changed
| public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty | ||
| .key("hoodie.max.archive.files") | ||
| .defaultValue("10") | ||
| .withDocumentation("The numbers of kept archive files under archived."); | ||
|
|
||
| public static final ConfigProperty<String> AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty | ||
| .key("hoodie.auto.trim.archive.files") | ||
| .defaultValue("false") | ||
| .withDocumentation("When enabled, Hoodie will keep the most recent " + MAX_ARCHIVE_FILES_TO_KEEP_PROP.key() | ||
| + " archive files and delete older one which lose part of archived instants information."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these configs live in HoodieWriteConfig instead of HoodieCompactionConfig?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm, because all the archive related configs such as hoodie.archive.automatic, hoodie.commits.archival.batch and hoodie.keep.min.commits, etc are all lived in HoodieCompactionConfig , maybe it's better to be the same :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Ideally, the archive configs should not be in HoodieCompactionConfig. Let's keep it as is for now and clean this up in a follow-up PR.
| if (!skipped.isEmpty()) { | ||
| LOG.info("Deleting archive files : " + skipped); | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Delete archive files"); | ||
| Map<String, Boolean> result = deleteFilesParallelize(metaClient, skipped, context, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove local variable assignment since it's not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| ""); | ||
| List<HoodieLogFile> sortedLogFilesList = allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList()); | ||
| if (!sortedLogFilesList.isEmpty()) { | ||
| List<String> skipped = sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: skipped -> archiveFilesToDelete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| assertFalse(currentExistArchiveFiles.containsAll(archiveFilesDeleted)); | ||
| // assert most recent archive files are preserved | ||
| assertTrue(currentExistArchiveFiles.containsAll(archiveFilesKept)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a check when archive trim is disabled as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Just raise a new Ticket to track the further improvements. https://issues.apache.org/jira/browse/HUDI-3038 |
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left a couple of nits on the naming.
| + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); | ||
|
|
||
| public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty | ||
| .key("hoodie.max.archive.files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: after thinking about the naming again, let's use hoodie.archive prefix for the archive configs and update the variable naming accordingly.
For this one, it can be hoodie.archive.max.files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking more like "hoodie.max.archive.files.to.retain" sort of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This falls under archival timeline so better to have the same hoodie.archive prefix. Given that the writer archives instants instead of files, this shouldn't create confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| .withDocumentation("The numbers of kept archive files under archived."); | ||
|
|
||
| public static final ConfigProperty<String> AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty | ||
| .key("hoodie.auto.trim.archive.files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one can be: hoodie.archive.auto.trim.enable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
@hudi-bot run azure |
3 similar comments
|
@hudi-bot run azure |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
...-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
Show resolved
Hide resolved
...-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
Show resolved
Hide resolved
| } catch (Exception originalException) { | ||
| // merge small archive files may left uncompleted archive file which will cause exception. | ||
| // need to ignore this kind of exception here. | ||
| try { | ||
| Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); | ||
| HoodieWrapperFileSystem fileSystem = metaClient.getFs(); | ||
| if (fileSystem.exists(planPath)) { | ||
| HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); | ||
| String mergedArchiveFileName = plan.getMergedArchiveFileName(); | ||
| if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) { | ||
| LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here."); | ||
| continue; | ||
| } | ||
| } | ||
| throw originalException; | ||
| } catch (Exception e) { | ||
| // If anything wrong during parsing merge archive plan, we need to throw the original exception. | ||
| // For example corrupted archive file and corrupted plan are both existed. | ||
| throw originalException; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use these code to check if originalException is caused by corrupted mergedArchiveFile and ignore it.
Anything else needs to be threw again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nsivabalan and @yihua
The common concern is incomplete/duplicate data left after last merging of small archive files fails and the current Hudi writer / commit is configured to disable archive file merging.
Ideally we need to check and clean dirty data before every archive.
Why we need this button before do clean works I think are :
This is a new feature, it's more safe with a default false control here.
I am pretty worried about multi-writer here, at least we have a way to control only one writer could do merge works.
As for making sure that incomplete data will cause no damage for loading archived timeline until next clean up:
- we use HashSet to avoid duplicate instants during loading archive instants.
- we use this try-catch to deal with exception caused by loading incomplete merged small archive files.
In the next step, maybe we can take care about multi-writer, runs stable for some time in my staging/production environment and finally removed this strict restrictions for verifyLastMergeArchiveFilesIfNecessary here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should have a feature flag to turn all new logic off and skip the corrupted merged archive files when loading the archive timeline, in case there is an incomplete archive merge operation and the feature is turned off in the next run.
|
@hudi-bot run azure |
|
Hi @yihua all comments are addressed. And Azure success. PTAL. Thanks a lot :) |
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @yihua : can you please follow up.
| } | ||
| } | ||
|
|
||
| public static void createFileInPath(FileSystem fileSystem, org.apache.hadoop.fs.Path fullPath, Option<byte[]> content) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: have throws HoodieIOException in the method signature? so that the caller can decide if the exception can be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Changed.
| } | ||
| } | ||
|
|
||
| public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| private final int minInstantsToKeep; | ||
| private final HoodieTable<T, I, K, O> table; | ||
| private final HoodieTableMetaClient metaClient; | ||
| private final String mergeArchivePlanName = "mergeArchivePlan"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a public static final variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Just add public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan"; in HoodieArchivedTimeline.java .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sg
| // merge small archive files may left uncompleted archive file which will cause exception. | ||
| // need to ignore this kind of exception here. | ||
| try { | ||
| Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reuse HoodieTimelineArchiveLog::mergeArchivePlanName?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to add public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan"; in HoodieArchivedTimeline.java and let HoodieTimelineArchiveLog.java use it because of dependency issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| } catch (Exception originalException) { | ||
| // merge small archive files may left uncompleted archive file which will cause exception. | ||
| // need to ignore this kind of exception here. | ||
| try { | ||
| Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); | ||
| HoodieWrapperFileSystem fileSystem = metaClient.getFs(); | ||
| if (fileSystem.exists(planPath)) { | ||
| HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); | ||
| String mergedArchiveFileName = plan.getMergedArchiveFileName(); | ||
| if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) { | ||
| LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here."); | ||
| continue; | ||
| } | ||
| } | ||
| throw originalException; | ||
| } catch (Exception e) { | ||
| // If anything wrong during parsing merge archive plan, we need to throw the original exception. | ||
| // For example corrupted archive file and corrupted plan are both existed. | ||
| throw originalException; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should have a feature flag to turn all new logic off and skip the corrupted merged archive files when loading the archive timeline, in case there is an incomplete archive merge operation and the feature is turned off in the next run.
| Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); | ||
| HoodieWrapperFileSystem fileSystem = metaClient.getFs(); | ||
| if (fileSystem.exists(planPath)) { | ||
| HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here looks okay to me. Could you add a few unit tests to guard this logic and the failure recovery logic in the archival merging logic as well, since the logic are critical?
I'm thinking about the following two cases:
(1) Construct a corrupted mergeArchivePlan file with random content so that it cannot be deserialized.
(1.1) When archival merging is enabled, the plan should be deleted first.
(1.2) When archival merging is disabled, the archived timeline can still be read successfully.
(1.3) If there are other corrupted archived files not from merging, the loading of archived timeline should fail and original exception should be thrown.
(2) Construct a working mergeArchivePlan file and a corrupted merged archive file with random content so that it cannot be deserialized.
(2.1) When archival merging is enabled, the corrupted merged archive file should be deleted first and proceed.
(2.2) When archival merging is disabled, the archived timeline can still be read successfully and the corrupted archive file is skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing. added. Just
testMergeSmallArchiveFilesRecoverFromBuildPlanFailedto cover 1testMergeSmallArchiveFilesRecoverFromMergeFailedto cover 2- Also add
testLoadArchiveTimelineWithDamagedPlanFileandtestLoadArchiveTimelineWithUncompletedMergeArchiveFileto guard loading activedTimeline logic.
|
@hudi-bot run azure |
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I made one small nit fix to your PR.
…definitely. (apache#4078) Co-authored-by: yuezhang <[email protected]>
|
cc @vinothchandar this PR adds new functionality in archived timeline with a feature flag and a piece of error handling logic which cannot be feature flagged. You may want to take another look. |
…definitely. (apache#4078) Co-authored-by: yuezhang <[email protected]>
…definitely. (apache#4078) Co-authored-by: yuezhang <[email protected]>
https://issues.apache.org/jira/browse/HUDI-2833
What is the purpose of the pull request
As we know, most of storage do not support append action, so that hoodie will create a new archive file under archived dictionary when archiving.
As time goes by, there may be thousands of archive files, which most of them is not useful anymore.
Maybe it is meaningful to have a function to merge small archive files into bigger one.
Add three configs to control merge small archive files behavior
hoodie.archive.auto.merge.enable (default false)When enable, hoodie will auto merge several small archive files into larger one. It's useful when storage scheme doesn't support append operation.
hoodie.archive.files.merge.batch.size (default 10)The numbers of small archive files are merged at once.
hoodie.archive.merge.small.file.limit.bytes (default 20971520)This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.
Add a new plan named HoodieMergeArchiveFilePlan
We use this plan to record which candidate small archives files are merged into which bigger archive file.
It's useful to deal with merge action failure
Code Flow