Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* A utility class for helper functions when performing a delete partition operation.
*/
public class DeletePartitionUtils {

/**
* Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
* be dropped.
* <p>
* This check is to prevent a drop-partition from proceeding should a partition have a table service action in
* the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
* also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
* committed, there will be two commits replacing a single filegroup.
* <p>
* For example, a timeline might have an execution order as such:
* 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
* 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
* 000.replacecommit.inflight (clustering is executed now)
* 000.replacecommit (clustering completed)
* For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
* This will cause downstream duplicate key errors when a map is being constructed.
*
* @param table Table to perform validation on
* @param partitionsToDrop List of partitions to drop
*/
public static void checkForPendingTableServiceActions(HoodieTable table, List<String> partitionsToDrop) {
List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
// ensure that there are no pending inflight clustering/compaction operations involving this partition
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();

// separating the iteration of pending compaction operations from clustering as they return different stream types
Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
.filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
.forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));

fileSystemView.getFileGroupsInPendingClustering()
.filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
.forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));

if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+ "Instant(s) of offending pending table service action: "
+ instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestDeletePartitionUtils {

private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
private static final String HARDCODED_INSTANT_TIME = "0";

private final HoodieTable table = Mockito.mock(HoodieTable.class);

private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);

public static Stream<Arguments> generateTruthValues() {
int noOfVariables = 3;
int noOfRows = 1 << noOfVariables;
Object[][] truthValues = new Object[noOfRows][noOfVariables];
for (int i = 0; i < noOfRows; i++) {
for (int j = noOfVariables - 1; j >= 0; j--) {
boolean out = (i / (1 << j)) % 2 != 0;
truthValues[i][j] = out;
}
}
return Stream.of(truthValues).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("generateTruthValues")
public void testDeletePartitionUtils(
boolean hasPendingCompactionOperations,
boolean hasPendingLogCompactionOperations,
boolean hasFileGroupsInPendingClustering) {
System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering);
Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations));
Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));

boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering;

if (shouldThrowException) {
assertThrows(HoodieDeletePartitionException.class,
() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
} else {
assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
}
}

private static Stream<Pair<String, CompactionOperation>> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
}

private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
return new CompactionOperation(
"fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(),
new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
}

private static Stream<Pair<HoodieFileGroupId, HoodieInstant>> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME);
return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
}

private static String getPartitionName(boolean hasPendingTableServiceAction) {
return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -62,6 +63,8 @@ public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);

try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -59,6 +60,8 @@ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);

try {
HoodieTimer timer = HoodieTimer.start();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue

class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {

Expand Down Expand Up @@ -396,4 +399,128 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
}
}
}

test("Prevent a partition from being dropped if there are pending CLUSTERING jobs") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = 'cow',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)

// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())

checkAnswer(s"call show_clustering('$tableName')")(
Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
)

val partition = "ts=1002"
val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
}
}

test("Prevent a partition from being dropped if there are pending COMPACTs jobs") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
// Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.index.type = 'INMEMORY'
| )
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
// Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)

// Generate the first compaction plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
assertTrue(client.scheduleCompactionAtInstant(firstScheduleInstant, HOption.empty()))

checkAnswer(s"call show_compaction('$tableName')")(
Seq(firstScheduleInstant, 5, HoodieInstant.State.REQUESTED.name())
)

val partition = "ts=1002"
val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
}
}

test("Prevent a partition from being dropped if there are pending LOG_COMPACT jobs") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
// Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.index.type = 'INMEMORY'
| )
| partitioned by(ts)
| location '$basePath'
| """.stripMargin)
// Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
// Write everything into the same FileGroup but into separate blocks
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)

// Generate the first log_compaction plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
assertTrue(client.scheduleLogCompactionAtInstant(firstScheduleInstant, HOption.empty()))

val partition = "ts=1000"
val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
}
}
}