diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java new file mode 100644 index 0000000000000..92c2065457e0a --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.exception.HoodieDeletePartitionException; +import org.apache.hudi.table.HoodieTable; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A utility class for helper functions when performing a delete partition operation. + */ +public class DeletePartitionUtils { + + /** + * Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to + * be dropped. + *

+ * This check is to prevent a drop-partition from proceeding should a partition have a table service action in + * the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might + * also be a candidate for being replaced. As such, when the table service action and drop-partition commits are + * committed, there will be two commits replacing a single filegroup. + *

+ * For example, a timeline might have an execution order as such: + * 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3) + * 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1) + * 000.replacecommit.inflight (clustering is executed now) + * 000.replacecommit (clustering completed) + * For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced. + * This will cause downstream duplicate key errors when a map is being constructed. + * + * @param table Table to perform validation on + * @param partitionsToDrop List of partitions to drop + */ + public static void checkForPendingTableServiceActions(HoodieTable table, List partitionsToDrop) { + List instantsOfOffendingPendingTableServiceAction = new ArrayList<>(); + // ensure that there are no pending inflight clustering/compaction operations involving this partition + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); + + // separating the iteration of pending compaction operations from clustering as they return different stream types + Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations()) + .filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath())) + .forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft())); + + fileSystemView.getFileGroupsInPendingClustering() + .filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath())) + .forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp())); + + if (instantsOfOffendingPendingTableServiceAction.size() > 0) { + throw new HoodieDeletePartitionException("Failed to drop partitions. " + + "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". " + + "Instant(s) of offending pending table service action: " + + instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList())); + } + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java new file mode 100644 index 0000000000000..3a1632737efa9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieDeletePartitionException; +import org.apache.hudi.table.HoodieTable; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestDeletePartitionUtils { + + private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action"; + private static final String HARDCODED_INSTANT_TIME = "0"; + + private final HoodieTable table = Mockito.mock(HoodieTable.class); + + private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class); + + public static Stream generateTruthValues() { + int noOfVariables = 3; + int noOfRows = 1 << noOfVariables; + Object[][] truthValues = new Object[noOfRows][noOfVariables]; + for (int i = 0; i < noOfRows; i++) { + for (int j = noOfVariables - 1; j >= 0; j--) { + boolean out = (i / (1 << j)) % 2 != 0; + truthValues[i][j] = out; + } + } + return Stream.of(truthValues).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("generateTruthValues") + public void testDeletePartitionUtils( + boolean hasPendingCompactionOperations, + boolean hasPendingLogCompactionOperations, + boolean hasFileGroupsInPendingClustering) { + System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n", + hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering); + Mockito.when(table.getSliceView()).thenReturn(fileSystemView); + Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations)); + Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations)); + Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering)); + + boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering; + + if (shouldThrowException) { + assertThrows(HoodieDeletePartitionException.class, + () -> DeletePartitionUtils.checkForPendingTableServiceActions(table, + Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION))); + } else { + assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table, + Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION))); + } + } + + private static Stream> createPendingCompactionOperations(boolean hasPendingCompactionOperations) { + return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations))); + } + + private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) { + return new CompactionOperation( + "fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(), + new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>()); + } + + private static Stream> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) { + HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId"); + HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME); + return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant)); + } + + private static String getPartitionName(boolean hasPendingTableServiceAction) { + return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition"; + } + +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java index a301ba228e4f6..3f19534d08cdb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.DeletePartitionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -62,6 +63,8 @@ public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context, @Override public HoodieWriteMetadata> execute() { + DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions); + try { HoodieTimer timer = new HoodieTimer().startTimer(); context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java index 49134d604d2d0..b45a691fbad83 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.DeletePartitionUtils; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.WriteOperationType; @@ -59,6 +60,8 @@ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context, @Override public HoodieWriteMetadata> execute() { + DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions); + try { HoodieTimer timer = HoodieTimer.start(); context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 15b14ec77f522..02c558fc2f3cc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -18,14 +18,17 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertTrue class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { @@ -396,4 +399,128 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { } } } + + test("Prevent a partition from being dropped if there are pending CLUSTERING jobs") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}t/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'cow', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + | """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + + // Generate the first clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + + checkAnswer(s"call show_clustering('$tableName')")( + Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*") + ) + + val partition = "ts=1002" + val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]" + checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg) + } + } + + test("Prevent a partition from being dropped if there are pending COMPACTs jobs") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}t/$tableName" + // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY' + | ) + | partitioned by(ts) + | location '$basePath' + | """.stripMargin) + // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits` + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + + // Generate the first compaction plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + assertTrue(client.scheduleCompactionAtInstant(firstScheduleInstant, HOption.empty())) + + checkAnswer(s"call show_compaction('$tableName')")( + Seq(firstScheduleInstant, 5, HoodieInstant.State.REQUESTED.name()) + ) + + val partition = "ts=1002" + val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]" + checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg) + } + } + + test("Prevent a partition from being dropped if there are pending LOG_COMPACT jobs") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}t/$tableName" + // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY' + | ) + | partitioned by(ts) + | location '$basePath' + | """.stripMargin) + // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits` + // Write everything into the same FileGroup but into separate blocks + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)") + val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty) + + // Generate the first log_compaction plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + assertTrue(client.scheduleLogCompactionAtInstant(firstScheduleInstant, HOption.empty())) + + val partition = "ts=1000" + val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]" + checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg) + } + } }