Skip to content

[HUDI-5553] Prevent partition(s) from being dropped if there are pending…#7669

Merged
XuQianJin-Stars merged 12 commits intoapache:masterfrom
voonhous:HUDI-5553
Jan 31, 2023
Merged

[HUDI-5553] Prevent partition(s) from being dropped if there are pending…#7669
XuQianJin-Stars merged 12 commits intoapache:masterfrom
voonhous:HUDI-5553

Conversation

@voonhous
Copy link
Copy Markdown
Member

@voonhous voonhous commented Jan 13, 2023

… table service actions

Change Logs

Fixes #7634

Added a validation step to prevent partition(s) from being dropped if there are pending table service actions.

Implementing such a fail-fast behaviour will prevent any downstream write tasks from failing.

Although there are 4 types of table service actions:

  1. clustering
  2. compaction
  3. log_compaction
  4. clean

Why clean is omitted

We are only considering [clustering, compaction, log_compaction]. Please refer to #7669 (comment) on why clean is omitted.

Impact

Users will no longer be able to drop partition(s) if there are pending table service actions [clustering, compaction, log_compaction] on the partition to be dropped.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.
LOW

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@voonhous
Copy link
Copy Markdown
Member Author

@TengHuo for visibility

Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitions.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only compaction, all the table service in pending should be avoided right ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, compaction and clustering should be avoided.

In line >133, the pending clustering groups are iterated over for validation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, for example, there are pending clustering instants for the partition to be deleted.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, is there any updates for pending clustering instants?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous
Copy link
Copy Markdown
Member Author

@danny0405 Could you please to review this PR again? Thank you!


package org.apache.hudi.table.action.commit;

import java.util.ArrayList;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move after line 42 and follow the import check style.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


package org.apache.hudi.table.action.commit;

import java.util.ArrayList;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move after line 44 and follow the import check style.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitions.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, compaction and clustering should be avoided.

In line >133, the pending clustering groups are iterated over for validation.


package org.apache.hudi.table.action.commit;

import java.util.ArrayList;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


package org.apache.hudi.table.action.commit;

import java.util.ArrayList;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

.filter(op -> partitions.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

fileSystemView.getFileGroupsInPendingClustering()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SteNicholas CMIIW, no additional commits were pushed for that as it is already evaluated in the original commit. Does this evaluation require any enhancement?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, could the pending compaction and clustering logic combine?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Will change it to improve readability.

The implementation here was copied over from org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy#getFileSlicesEligibleForClustering, which is why it is the way it is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, I got above point. IMO, this could improve the readability and performance because the operation of getting the pending compaction and clustering instants from FileSystemView consume a little performance and time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the code again, I forgot to mention that:

getPendingCompactionOperations + getPendingLogCompactionOperations

and

getFileGroupsInPendingClustering

returns different Stream types, which is why they were separated and not combined.

I'll add a comment to note this.

Not really sure how one can improve performance for this given. Are there any other APIs that we can use to improve the performance of this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, you could introduce new interface to get all pending table service action instant.

.forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));

if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
throw new HoodieDeletePartitionException("Failed to drop partitions. "
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is any pending compaction or clustering action, is it better to wait the action completion and then drop partition?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such a case, the Spark-SQL session might get stuck waiting for extended periods of time if there are multiple compaction/clustering plans involving the partition to be dropped.

If there are 5 compaction/clustering jobs involving a partition, we will need to wait for all 5 pending jobs to finish before we can drop the partition.

Copy link
Copy Markdown
Member

@SteNicholas SteNicholas Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, IMO, from user perspective, there are always compaction/clustering actions for inline table service in Flink jobs and users are hard to control when to drop partition. Therefore, this could wait for the completion of compaction/clustering and drop the partition immediately. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I have given this a thought before. There are a few ways around this issue.

The list below is ranked in terms of difficulty to implement (easiest to hardest).

  1. Fail-fast, do not allow users to drop a partition if there are pending clustering/compaction.
  2. Spark-SQL console/session will wait for the pending clustering/compaction to finish before issuing a drop partition (This may lead to "resource-starvation" if we do not implement additional logic to block table service action plans from being created if there is a drop-partition that is being requested)
  3. We allow drop-partition to complete; for any pending clustering/compaction jobs, when performing a commit, we can flag these files to be replaced so they will not be read.

As such, i chose to implement the first option first.

A "full" fix is definitely required, this PR is to address the immediate problem of correctness through adding and making limitations known.

Copy link
Copy Markdown
Member

@SteNicholas SteNicholas Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, IMO, users could check the .hoodie directory to check whether there is any pending compaction/clustering commit in the timeline before dropping the paritition by theirs. The current implementation helps users to skip this check step, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap!

  1. Users might not have permission perform a list-file operation on the underlying filesystem (hdfs/gcs/s3)
  2. If the commit retain-commits configuration is set to a somewhat large value, navigating the results of a list-file operation might be difficult. Users could use grep to perform filters, but this may introduce human error
  3. We cannot expect users to check the commit timeline everytime they want to perform a DDL on a Hudi table. Hudi should have reasonable safeguards in place to ensure correctness.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, we could introduce this validation in this pull request and create another ticket for waiting for the completion of pending table service. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, that would be most ideal!

There are many ways around the waiting for the completion of pending table service fix and will definitely require more thought to fully implement. Will create a JIRA issue once this has been merged and will add the link the JIRA issue after.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, after improving above readability, I have no any problem for this pull request.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an existing GitHub issue for this:
#7663

I have also identified a few other correctness issues caused by the DROP-PARTITION DDL that may require some attention too, for example:
#7634

* For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
* This will cause downstream duplicate key errors when a map is being constructed.
*/
private void checkPreconditions() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this introduce an untility class into util package of hudi-client-common? And then the Flink and Spark client could invoke this method.

@voonhous
Copy link
Copy Markdown
Member Author

Had a chat @SteNicholas and he brought up a very good point.

If a DropPartition DDL is issued whilst a Cleaning operation is in the midst of executing, it will cause the timeline to look something like this:

Clean0.requested -> Clean{P0->FG1}

DropPartition0.requested -> Drop{P0->FG1}

DropPartition0.inflight -> Drop{P0->FG1}

DropPartition0.requested -> Drop{P0->FG1}

Clean0.inflight  -> Clean{P0->FG1}

Clean0.completed  -> Clean{P0->FG1}

When the next CleanPlanner is invoked, it may include a FileGroup that may have been previously cleaned.

Clean1.requested -> Clean{P0->FG1}
...

As such, when creating Clean1, we should exclude files that may have already been cleaned in CleanT-1.

SteNicholas
SteNicholas approved these changes Jan 30, 2023
@voonhous
Copy link
Copy Markdown
Member Author

For the concerns raised in:

#7669 (comment)

The files that have been removed in the CLEAN-0 operation will not be listed again in CLEAN-1.

So, there will not be any FileNotFoundException exceptions to be caught. i.e. No log warnings lines that may raise unnecessary "alarms".

This is so as:

  1. Clean will always ensure that every FileGroup will have at least 1 FileSlice left after cleaning. So, if there is only 1 FileSlice in the FileGroup, it will not be a candidate for cleaning
  2. DROP-PARTITION will flags FileGroups to be removed. Hence, it is still up to the cleaner to identify the remaining files that need to be removed. i.e. FileSlices that have not been removed yet/FileSlices that are not candidates for removal by previous clean operations.

Copy link
Copy Markdown
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@XuQianJin-Stars XuQianJin-Stars self-assigned this Jan 31, 2023
@YannByron
Copy link
Copy Markdown
Contributor

@voonhous @SteNicholas @XuQianJin-Stars hey guys, sorry that I have a different thought. I think the drop-partition operation should be allowed to execute, and the table service action should re-check whether the touched file groups have been updated when the table service action is actually executing. IMO, the operations to data is prior to the table management operations.

@voonhous
Copy link
Copy Markdown
Member Author

@YannByron Hmmm, are you envisioning option 3 as a solution for this issue that is described here? #7669 (comment)

i.e. Any new filegroup that is created from a filegroup that is flagged for deletion should also be flagged for deletion?

@SteNicholas
Copy link
Copy Markdown
Member

SteNicholas commented Jan 31, 2023

@voonhous, I had a voice communication with @YannByron.

This pull request is just a temporary check for deleting partitions, not the 'full' fix. The final reasonable implementation should be to put the check in the table service like compaction/clustering. In other words , table service should not affect the execution of DDL such as deleting partitions. However, it is beneficial to temporarily add the check in this pull request. For example, user A creates a Flink task that writes Hudi table 1 and enables asynchronous clustering. At this time, when user B wants to delete partitions, he could be noticed that this partition has a corresponding pending table service, it can at least temporarily refuse user B to delete the partition to avoid affecting user A's Flink task.

To sum up, @XuQianJin-Stars could merge first, and create a ticket to optimize this check in table service.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@XuQianJin-Stars XuQianJin-Stars merged commit d857693 into apache:master Jan 31, 2023
@SteNicholas
Copy link
Copy Markdown
Member

@voonhous, @YannByron, I have created ticket HUDI-5663 to improve the implementation of check on the table service.

@yihua yihua added the priority:blocker Production down; release blocker label Feb 5, 2023
nsivabalan pushed a commit to nsivabalan/hudi that referenced this pull request Mar 22, 2023
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
@voonhous voonhous deleted the HUDI-5553 branch April 28, 2023 07:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[SUPPORT] Consistent Hashing index type read inconsistency when performing an ALTER TABLE DROP PARTITION DDL

7 participants