Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DeletePartitionUtils {

/**
* Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
* be dropped.
* <p>
* This check is to prevent a drop-partition from proceeding should a partition have a table service action in
* the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
* also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
* committed, there will be two commits replacing a single filegroup.
* <p>
* For example, a timeline might have an execution order as such:
* 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
* 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
* 000.replacecommit.inflight (clustering is executed now)
* 000.replacecommit (clustering completed)
* For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
* This will cause downstream duplicate key errors when a map is being constructed.
*
* @param table Table to perform validation on
* @param partitionsToDrop List of partitions to drop
*/
public static void checkForPendingTableServiceActions(HoodieTable table, List<String> partitionsToDrop) {
List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
// ensure that there are no pending inflight clustering/compaction operations involving this partition
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();

// separating the iteration of pending compaction operations from clustering as they return different stream types
Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

fileSystemView.getFileGroupsInPendingClustering()
.filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
.forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));

if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+ "Instant(s) of offending pending table service action: "
+ instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestDeletePartitionUtils {

private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
private static final String HARCODED_INSTANT_TIME = "0";

private final HoodieTable table = Mockito.mock(HoodieTable.class);

private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);

public static Stream<Arguments> generateTruthValues() {
int NUMBER_OF_VARIABLES = 3;
int NUMBER_OF_ROWS = 1 << NUMBER_OF_VARIABLES;
Object[][] truthValues = new Object[NUMBER_OF_ROWS][NUMBER_OF_VARIABLES];
for(int i = 0; i < NUMBER_OF_ROWS; i++) {
for(int j = NUMBER_OF_VARIABLES - 1, k = 0; j >= 0; j--, k++) {
boolean out = (i / (int) Math.pow(2, j)) % 2 != 0;
truthValues[i][j] = out;
}
}
return Stream.of(truthValues).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("generateTruthValues")
public void testDeletePartitionUtils(
boolean hasPendingCompactionOperations,
boolean hasPendingLogCompactionOperations,
boolean hasFileGroupsInPendingClustering) {
System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering);
Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations));
Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));

boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering;

if (shouldThrowException) {
assertThrows(HoodieDeletePartitionException.class,
() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
} else {
assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
}
}

private static Stream<Pair<String, CompactionOperation>> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
return Stream.of(Pair.of(HARCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
}

private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
return new CompactionOperation(
"fileId", getPartitionName(hasPendingJobInPartition), HARCODED_INSTANT_TIME, Option.empty(),
new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
}

private static Stream<Pair<HoodieFileGroupId, HoodieInstant>> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARCODED_INSTANT_TIME);
return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
}

private static String getPartitionName(boolean hasPendingTableServiceAction) {
return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -62,6 +63,8 @@ public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);

try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -59,6 +60,8 @@ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);

try {
HoodieTimer timer = HoodieTimer.start();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
Expand Down Expand Up @@ -396,4 +398,45 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
}
}
}

test("Prevent a partition from being dropped if there are pending table service actions") {
withTempDir { tmp =>
Seq("cow").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())

checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
)

val partition = "ts=1002"
val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
}
}
}

}