diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 36cd89add4dee..6b7a7d29cc436 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -158,6 +158,15 @@ public abstract HoodieWriteMetadata bulkInsert(HoodieEngineContext context, S */ public abstract HoodieWriteMetadata delete(HoodieEngineContext context, String instantTime, K keys); + /** + * Deletes all data of partitions. + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param partitions {@link List} of partition to be deleted + * @return HoodieWriteMetadata + */ + public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List partitions); + /** * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. *

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 3c4d7fb95a766..d0cb8dec788c9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -84,6 +84,11 @@ public HoodieWriteMetadata> delete(HoodieEngineContext context return new FlinkDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute(); } + @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List partitions) { + throw new HoodieNotSupportedException("DeletePartitions is not supported yet"); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 7f658898baa71..ddc995ab14de8 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -84,6 +84,11 @@ public HoodieWriteMetadata> delete(HoodieEngineContext context throw new HoodieNotSupportedException("Delete is not supported yet"); } + @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List partitions) { + throw new HoodieNotSupportedException("Delete partitions is not supported yet"); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 18f5309eee0b1..f7e7690bb163f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -245,6 +245,13 @@ public JavaRDD delete(JavaRDD keys, String instantTime) return postWrite(result, instantTime, table); } + public HoodieWriteResult deletePartitions(List partitions, String instantTime) { + HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); + setOperationType(WriteOperationType.DELETE_PARTITION); + HoodieWriteMetadata> result = table.deletePartitions(context,instantTime, partitions); + return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); + } + @Override protected JavaRDD postWrite(HoodieWriteMetadata> result, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 71085a232122f..357b5cea888bd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -50,6 +50,7 @@ import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor; +import org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor; @@ -108,6 +109,11 @@ public HoodieWriteMetadata> delete(HoodieEngineContext cont return new SparkDeleteCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute(); } + @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List partitions) { + return new SparkDeletePartitionCommitActionExecutor(context, config, this, instantTime, partitions).execute(); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, JavaRDD> preppedRecords) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java new file mode 100644 index 0000000000000..ea1ef51a4d40a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SparkDeletePartitionCommitActionExecutor> + extends SparkInsertOverwriteCommitActionExecutor { + + private List partitions; + public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List partitions) { + super(context, config, table, instantTime,null, WriteOperationType.DELETE_PARTITION); + this.partitions = partitions; + } + + @Override + public HoodieWriteMetadata> execute() { + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); + HoodieTimer timer = new HoodieTimer().startTimer(); + Map> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct() + .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); + HoodieWriteMetadata result = new HoodieWriteMetadata(); + result.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer())); + + result.setWriteStatuses(jsc.emptyRDD()); + this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime); + this.commitOnAutoCommit(result); + return result; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index 1e3822016a765..c5d3c7684c860 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -77,7 +77,7 @@ protected Map> getPartitionToReplacedFileIds(JavaRDD(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); } - private List getAllExistingFileIds(String partitionPath) { + protected List getAllExistingFileIds(String partitionPath) { // because new commit is not complete. it is safe to mark all existing file Ids as old files return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java index e349657b7e44e..c0145158f3b71 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -36,7 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class SparkInsertOverwriteTableCommitActionExecutor> extends SparkInsertOverwriteCommitActionExecutor { @@ -47,11 +46,6 @@ public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE); } - protected List getAllExistingFileIds(String partitionPath) { - return table.getSliceView().getLatestFileSlices(partitionPath) - .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); - } - @Override protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { Map> partitionToExistingFileIds = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index d9a396d0f64b5..d1d0e21e039b0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -1021,7 +1021,7 @@ public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Except * Test scenario of writing similar number file groups in partition. */ @Test - public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception { + public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception { verifyInsertOverwritePartitionHandling(3000, 3000); } @@ -1061,6 +1061,109 @@ private Set getFileIdsFromWriteStatus(List statuses) { return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); } + /** + * Test scenario of writing fewer file groups for first partition than second an third partition. + */ + @Test + public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() throws Exception { + verifyDeletePartitionsHandling(1000, 3000, 3000); + } + + /** + * Test scenario of writing similar number file groups in partition. + */ + @Test + public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception { + verifyDeletePartitionsHandling(3000, 3000, 3000); + } + + /** + * Test scenario of writing more file groups for first partition than second an third partition. + */ + @Test + public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception { + verifyDeletePartitionsHandling(3000, 1000, 1000); + } + + private Set insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) { + client.startCommitWithTime(commitTime1); + List inserts1 = dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 2); + List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); + assertNoWriteErrors(statuses); + Set batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); + verifyRecordsWritten(commitTime1, inserts1, statuses); + return batchBuckets; + } + + private Set deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List deletePartitionPath) { + client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime); + Set deletePartitionReplaceFileIds = + writeResult.getPartitionToReplaceFileIds().entrySet() + .stream().flatMap(entry -> entry.getValue().stream()).collect(Collectors.toSet()); + return deletePartitionReplaceFileIds; + } + + /** + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. + * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. + * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. + * 4) delete first partition and check result. + * 5) delete second and third partition and check result. + * + */ + private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception { + HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false); + SparkRDDWriteClient client = getHoodieWriteClient(config, false); + dataGen = new HoodieTestDataGenerator(); + + // Do Inserts for DEFAULT_FIRST_PARTITION_PATH + String commitTime1 = "001"; + Set batch1Buckets = + this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, DEFAULT_FIRST_PARTITION_PATH); + + // Do Inserts for DEFAULT_SECOND_PARTITION_PATH + String commitTime2 = "002"; + Set batch2Buckets = + this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, DEFAULT_SECOND_PARTITION_PATH); + + // Do Inserts for DEFAULT_THIRD_PARTITION_PATH + String commitTime3 = "003"; + Set batch3Buckets = + this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, DEFAULT_THIRD_PARTITION_PATH); + + // delete DEFAULT_FIRST_PARTITION_PATH + String commitTime4 = "004"; + Set deletePartitionReplaceFileIds1 = + deletePartitionWithCommit(client, commitTime4, Arrays.asList(DEFAULT_FIRST_PARTITION_PATH)); + assertEquals(batch1Buckets, deletePartitionReplaceFileIds1); + List baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH)); + assertEquals(0, baseFiles.size()); + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH)); + assertTrue(baseFiles.size() > 0); + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH)); + assertTrue(baseFiles.size() > 0); + + // delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH + String commitTime5 = "005"; + Set deletePartitionReplaceFileIds2 = + deletePartitionWithCommit(client, commitTime5, Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH)); + Set expectedFileId = new HashSet<>(); + expectedFileId.addAll(batch2Buckets); + expectedFileId.addAll(batch3Buckets); + assertEquals(expectedFileId, deletePartitionReplaceFileIds2); + + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH), + String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH), + String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH)); + assertEquals(0, baseFiles.size()); + } + /** * Verify data in parquet files matches expected records and commit time. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 307e0686756a8..c91b51b88667e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -147,6 +147,22 @@ public static long countRecordsSince(JavaSparkContext jsc, String basePath, SQLC } } + public static List getLatestBaseFiles(String basePath, FileSystem fs, + String... paths) { + List latestFiles = new ArrayList<>(); + try { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + for (String path : paths) { + BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); + latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList())); + } + } catch (Exception e) { + throw new HoodieException("Error reading hoodie table as a dataframe", e); + } + return latestFiles; + } + /** * Reads the paths under the a hoodie table out as a DataFrame. */ @@ -154,14 +170,9 @@ public static Dataset read(JavaSparkContext jsc, String basePath, SQLContex String... paths) { List filteredPaths = new ArrayList<>(); try { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); - for (String path : paths) { - BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); - List latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList()); - for (HoodieBaseFile file : latestFiles) { - filteredPaths.add(file.getPath()); - } + List latestFiles = getLatestBaseFiles(basePath, fs, paths); + for (HoodieBaseFile file : latestFiles) { + filteredPaths.add(file.getPath()); } return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()])); } catch (Exception e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index 39f0f62ca8e44..f237156360847 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -42,6 +42,8 @@ public enum WriteOperationType { INSERT_OVERWRITE("insert_overwrite"), // cluster CLUSTER("cluster"), + // delete partition + DELETE_PARTITION("delete_partition"), // insert overwrite with dynamic partitioning INSERT_OVERWRITE_TABLE("insert_overwrite_table"), // used for old version @@ -74,6 +76,8 @@ public static WriteOperationType fromValue(String value) { return DELETE; case "insert_overwrite": return INSERT_OVERWRITE; + case "delete_partition": + return DELETE_PARTITION; case "insert_overwrite_table": return INSERT_OVERWRITE_TABLE; case "cluster":