Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -37,11 +38,13 @@
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
Expand All @@ -62,6 +65,8 @@ public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
checkPreconditions();

try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down Expand Up @@ -98,4 +103,43 @@ private List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}

/**
* Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
* be dropped.
* <p>
* This check is to prevent a drop-partition from proceeding should a partition have a table service action in
* the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
* also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
* committed, there will be two commits replacing a single filegroup.
* <p>
* For example, a timeline might have an execution order as such:
* 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
* 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
* 000.replacecommit.inflight (clustering is executed now)
* 000.replacecommit (clustering completed)
* For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
* This will cause downstream duplicate key errors when a map is being constructed.
*/
private void checkPreconditions() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this introduce an untility class into util package of hudi-client-common? And then the Flink and Spark client could invoke this method.

List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
// ensure that there are no pending inflight clustering/compaction operations involving this partition
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();

// separating the iteration of pending compaction operations from clustering as they return different stream types
Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitions.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only compaction, all the table service in pending should be avoided right ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, compaction and clustering should be avoided.

In line >133, the pending clustering groups are iterated over for validation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, for example, there are pending clustering instants for the partition to be deleted.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, is there any updates for pending clustering instants?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileSystemView.getFileGroupsInPendingClustering()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SteNicholas CMIIW, no additional commits were pushed for that as it is already evaluated in the original commit. Does this evaluation require any enhancement?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, could the pending compaction and clustering logic combine?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Will change it to improve readability.

The implementation here was copied over from org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy#getFileSlicesEligibleForClustering, which is why it is the way it is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, I got above point. IMO, this could improve the readability and performance because the operation of getting the pending compaction and clustering instants from FileSystemView consume a little performance and time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the code again, I forgot to mention that:

getPendingCompactionOperations + getPendingLogCompactionOperations

and

getFileGroupsInPendingClustering

returns different Stream types, which is why they were separated and not combined.

I'll add a comment to note this.

Not really sure how one can improve performance for this given. Are there any other APIs that we can use to improve the performance of this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, you could introduce new interface to get all pending table service action instant.

.filter(fgIdInstantPair -> partitions.contains(fgIdInstantPair.getLeft().getPartitionPath()))
.forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));

if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
throw new HoodieDeletePartitionException("Failed to drop partitions. "

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is any pending compaction or clustering action, is it better to wait the action completion and then drop partition?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such a case, the Spark-SQL session might get stuck waiting for extended periods of time if there are multiple compaction/clustering plans involving the partition to be dropped.

If there are 5 compaction/clustering jobs involving a partition, we will need to wait for all 5 pending jobs to finish before we can drop the partition.

@SteNicholas SteNicholas Jan 30, 2023

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, IMO, from user perspective, there are always compaction/clustering actions for inline table service in Flink jobs and users are hard to control when to drop partition. Therefore, this could wait for the completion of compaction/clustering and drop the partition immediately. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I have given this a thought before. There are a few ways around this issue.

The list below is ranked in terms of difficulty to implement (easiest to hardest).

  1. Fail-fast, do not allow users to drop a partition if there are pending clustering/compaction.
  2. Spark-SQL console/session will wait for the pending clustering/compaction to finish before issuing a drop partition (This may lead to "resource-starvation" if we do not implement additional logic to block table service action plans from being created if there is a drop-partition that is being requested)
  3. We allow drop-partition to complete; for any pending clustering/compaction jobs, when performing a commit, we can flag these files to be replaced so they will not be read.

As such, i chose to implement the first option first.

A "full" fix is definitely required, this PR is to address the immediate problem of correctness through adding and making limitations known.

@SteNicholas SteNicholas Jan 30, 2023

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, IMO, users could check the .hoodie directory to check whether there is any pending compaction/clustering commit in the timeline before dropping the paritition by theirs. The current implementation helps users to skip this check step, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap!

  1. Users might not have permission perform a list-file operation on the underlying filesystem (hdfs/gcs/s3)
  2. If the commit retain-commits configuration is set to a somewhat large value, navigating the results of a list-file operation might be difficult. Users could use grep to perform filters, but this may introduce human error
  3. We cannot expect users to check the commit timeline everytime they want to perform a DDL on a Hudi table. Hudi should have reasonable safeguards in place to ensure correctness.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, we could introduce this validation in this pull request and create another ticket for waiting for the completion of pending table service. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, that would be most ideal!

There are many ways around the waiting for the completion of pending table service fix and will definitely require more thought to fully implement. Will create a JIRA issue once this has been merged and will add the link the JIRA issue after.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous, after improving above readability, I have no any problem for this pull request.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an existing GitHub issue for this:
#7663

I have also identified a few other correctness issues caused by the DROP-PARTITION DDL that may require some attention too, for example:
#7634

+ "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitions + ". "
+ "Instant(s) of offending pending table service action: "
+ instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -38,10 +39,13 @@
import org.apache.hadoop.fs.Path;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
Expand All @@ -59,6 +63,7 @@ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
checkPreconditions();
try {
HoodieTimer timer = HoodieTimer.start();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down Expand Up @@ -90,4 +95,43 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e);
}
}

/**
* Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
* be dropped.
* <p>
* This check is to prevent a drop-partition from proceeding should a partition have a table service action in
* the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
* also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
* committed, there will be two commits replacing a single filegroup.
* <p>
* For example, a timeline might have an execution order as such:
* 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
* 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
* 000.replacecommit.inflight (clustering is executed now)
* 000.replacecommit (clustering completed)
* For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
* This will cause downstream duplicate key errors when a map is being constructed.
*/
private void checkPreconditions() {
List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
// ensure that there are no pending inflight clustering/compaction operations involving this partition
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();

// separating the iteration of pending compaction operations from clustering as they return different stream types
Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitions.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

fileSystemView.getFileGroupsInPendingClustering()
.filter(fgIdInstantPair -> partitions.contains(fgIdInstantPair.getLeft().getPartitionPath()))
.forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));

if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitions + ". "
+ "Instant(s) of offending pending table service action: "
+ instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
Expand Down Expand Up @@ -396,4 +398,45 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
}
}
}

test("Prevent a partition from being dropped if there are pending table service actions") {
withTempDir { tmp =>
Seq("cow").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())

checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
)

val partition = "ts=1002"
val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
}
}
}

}