Skip to content

Conversation

@zhuanshenbsj1
Copy link
Contributor

Change Logs

refer to #7401 (close accidentally)

In some scenes, such as offline clustering or online sync clustering(Parallel), later plans could completed before previous plans. If later plan belong to the next partation, and cleaning speed catch up with it,Incremental Cleaning mode for getPartitionPathsForIncrementalCleaning would ignore previous plans ,eventually lead to duplicate data.

partation : day=2022-12-06/hour=10/minute_per_10=0 , day=2022-12-06/hour=10/minute_per_10=1
image

cleaning catch up with clustering belong to partation day=2022-12-06/hour=10/minute_per_10=1
log:
image
instant:
image

the file in previous clustering plan belong to partation 2022-12-06/hour=10/minute_per_10=0 won't clean forever. and it will cause duplicate data when the instants archived.
image

image

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@zhuanshenbsj1
Copy link
Contributor Author

zhuanshenbsj1 commented Dec 8, 2022

image

@danny0405 How aboud add a check before archive , varify that every instant completed time should be earlier than the last completed clean-instant start clean time one by one.

instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
|| (instant.getMarkerFileAccessTimestamp().isPresent()
&& (HoodieTimeline.compareTimestamps(instant.getMarkerFileAccessTimestamp().get(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many cases that the instant meta file can be accessed , not just cluetring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many cases that the instant meta file can be accessed , not just cluetring.

Adjust access time to modification time.

earlestUnCleanCompletedInstant.map(instant ->
compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true)
).filter(s ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better check the timeline clean metadata files, only when all the replaced clustering files are cleaned, can the clustering instant be archived.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better check the timeline clean metadata files, only when all the replaced clustering files are cleaned, can the clustering instant be archived.

no completed clustering instant -> no need to check
firstCompletedClusteringInstant is not null && no completed clean instant -> return firstCompletedClusteringInstant
firstCompletedClusteringInstant is not null && earliestCleanInstant is not null -> return firstUncleanClusteringInstant

this.state = isInflight ? State.INFLIGHT : State.COMPLETED;
this.action = action;
this.timestamp = timestamp;
this.markerFileModificationTimestamp = Option.ofNullable(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removes this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removes this line.

Removed.

this.state = state;
this.action = action;
this.timestamp = timestamp;
this.markerFileModificationTimestamp = Option.ofNullable(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@danny0405
Copy link
Contributor

5341.patch.zip
Hi, @zhuanshenbsj1 , i have tried to review and applied a patch, add a weak guarantee on the archiver pre condition check, do you think that is okey ? It would be costy of we resolve all the clean metadata to see whether the clustering replaced files are all been cleaned.

We can go ahead if you think it is okey, and it would be great if you can test it also.

cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
|| (instant.getMarkerFileModificationTimestamp().isPresent()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think the condition is needed for the commits which isn't the replacecommit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarantee compatibility. Maybe other new kinds of instant need to clean later ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an out-of-order replace commit finished before the clean start and the instant time of the replace commit is before the earliest commit to retain, it won't be cleaned and left in the timeline. Archiver will then archive it since it's last modified time is earlier than the last clean in the timeline. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an out-of-order replace commit finished before the clean start and the instant time of the replace commit is before the earliest commit to retain, it won't be cleaned and left in the timeline. Archiver will then archive it since it's last modified time is earlier than the last clean in the timeline. What do you think?

You are right,it still won't clean the clustering instant in this scenario.

HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> firstCompletedClusteringInstant = activeTimeline.getCommitsTimeline().filterCompletedInstants()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).firstInstant();
if (!firstCompletedClusteringInstant.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the firstCompletedClusteringInstant isn't present, it's no need to get the latest clean time from the clean metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it behind else.

table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();

Option<HoodieInstant> earlestUnCleanCompletedInstant = CleanerUtils.getEarliestUnCleanCompletedInstant(metaClient);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@zhuanshenbsj1
Copy link
Contributor Author

5341.patch.zip Hi, @zhuanshenbsj1 , i have tried to review and applied a patch, add a weak guarantee on the archiver pre condition check, do you think that is okey ? It would be costy of we resolve all the clean metadata to see whether the clustering replaced files are all been cleaned.

We can go ahead if you think it is okey, and it would be great if you can test it also.

But this patch still not consider clustering completed time ? If previous clustering instant in timeline compelted later than following clean instant cleaned cutoff time, there are still risks.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@SteNicholas
Copy link
Member

SteNicholas commented Dec 27, 2022

@danny0405, @zhuanshenbsj1, @stream2000, IMO, the clean operation should modify the getEarliestCommitToRetain to keep the linear cleaning with the following:

if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
        && commitTimeline.countInstants() > commitsRetained) {
      Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
          .getActiveTimeline()
          .getCommitsTimeline()
          .filter(s -> !s.isCompleted()).firstInstant();
      if (earliestPendingCommits.isPresent()) {
        earliestCommitToRetain =
            commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
                return Option.of(nthInstant);
              } else {
                return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
              }
            }).orElse(Option.empty());
      } else {
        earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
      }
 }

WDYT?

@zhuanshenbsj1
Copy link
Contributor Author

@danny0405, @zhuanshenbsj1, @stream2000, IMO, the clean operation should modify the getEarliestCommitToRetain to keep the linear cleaning with the following:

if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
        && commitTimeline.countInstants() > commitsRetained) {
      Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
          .getActiveTimeline()
          .getCommitsTimeline()
          .filter(s -> !s.isCompleted()).firstInstant();
      if (earliestPendingCommits.isPresent()) {
        earliestCommitToRetain =
            commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
                return Option.of(nthInstant);
              } else {
                return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
              }
            }).orElse(Option.empty());
      } else {
        earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
      }
 }

WDYT?

It still need to confirm whether clustering instant has been cleaned when archiving.

@SteNicholas
Copy link
Member

SteNicholas commented Dec 27, 2022

@zhuanshenbsj1, +1. I have modified the getOldestInstantToRetainForClustering of the above patch with the following to take the case you mentioned:

/**
   * Checks whether the latest clustering instant has a subsequent cleaning action. Returns
   * the clustering instant if there is such cleaning action or empty.
   *
   * @param activeTimeline The active timeline
   * @return the oldest instant to retain for clustering
   */
  public static Option<HoodieInstant> getOldestInstantToRetainForClustering(HoodieActiveTimeline activeTimeline)
      throws IOException {
    Option<HoodieInstant> cleanInstantOpt =
        activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
    if (cleanInstantOpt.isPresent()) {
      // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
      // the clean metadata.
      HoodieInstant cleanInstant = cleanInstantOpt.get();
      String earliestCommitToRetain =
          TimelineMetadataUtils.deserializeHoodieCleanMetadata(
                  activeTimeline.getInstantDetails(cleanInstant.isRequested()
                      ? cleanInstant
                      : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get())
              .getEarliestCommitToRetain();
      return activeTimeline.getCompletedReplaceTimeline()
          .filter(instant ->
              HoodieTimeline.compareTimestamps(
                  instant.getTimestamp(),
                  HoodieTimeline.GREATER_THAN_OR_EQUALS,
                  earliestCommitToRetain))
          .firstInstant();
    }
    return Option.empty();
  }

PTAL.

@SteNicholas
Copy link
Member

@danny0405, IMO, this pull request could be close because the 7568 could replace the pull request.

@leesf
Copy link
Contributor

leesf commented Jan 6, 2023

closing the PR as we have the PR #7568

@leesf leesf closed this Jan 6, 2023
@zhuanshenbsj1 zhuanshenbsj1 deleted the clean_later_completed_clustering_1 branch January 8, 2024 07:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants