From 83ff30cf39090a6cdb12af089c5ea6f93838af8e Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 29 Mar 2022 18:37:00 +0530 Subject: [PATCH 1/8] [HUDI-3743] Support DELETE_PARTITION for metadata table Minor checkstyle fix --- .../metadata/HoodieTableMetadataWriter.java | 9 +++ .../FlinkHoodieBackedTableMetadataWriter.java | 6 ++ .../SparkHoodieBackedTableMetadataWriter.java | 16 ++++ .../functional/TestHoodieBackedMetadata.java | 73 +++++++++++++++++++ 4 files changed, 104 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 777c785e263db..3ccc3dcd8a0e3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -94,4 +94,13 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * @param instantTime instant time of the commit. */ void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); + + /** + * Drop the given metadata indexes. This path reuses DELETE_PARTITION operation. + * + * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline + * @param indexesToDrop - list of {@link MetadataPartitionType} to drop + * @throws IOException + */ + void dropIndex(String instantTime, List indexesToDrop); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 4d64e58342597..8226ddc29198c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.conf.Configuration; @@ -161,4 +162,9 @@ protected void commit(String instantTime, Map m.updateSizeMetrics(metadataMetaClient, metadata)); } + + @Override + public void dropIndex(String instantTime, List indexesToDrop) { + throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet."); + } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index d0173f984a2f0..568ab95aea988 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -25,8 +25,11 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -43,6 +46,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { @@ -177,4 +181,16 @@ protected void commit(String instantTime, Map m.updateSizeMetrics(metadataMetaClient, metadata)); } + + @Override + public void dropIndex(String instantTime, List indexesToDrop) { + List partitionsToDrop = indexesToDrop.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); + LOG.warn("Deleting Metadata Table partitions: " + partitionsToDrop); + + try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { + String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ); + writeClient.startCommitWithTime(instantTime, actionType); + writeClient.deletePartitions(partitionsToDrop, instantTime); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 299fabed7026a..72f798de0e781 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -446,6 +446,79 @@ private void testTableOperationsForMetaIndexImpl(final HoodieWriteConfig writeCo testTableOperationsImpl(engineContext, writeConfig); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException { + initPath(); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .bloomIndexBucketizedChecking(false) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withMetadataIndexBloomFilter(true) + .withMetadataIndexBloomFilterFileGroups(4) + .withMetadataIndexColumnStats(true) + .withMetadataIndexBloomFilterFileGroups(2) + .withMetadataIndexForAllColumns(true) + .build()) + .build(); + init(tableType, writeConfig); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // Write 1 (Bulk insert) + String newCommitTime = "0000001"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // Write 2 (upserts) + newCommitTime = "0000002"; + client.startCommitWithTime(newCommitTime); + validateMetadata(client); + + records = dataGen.generateInserts(newCommitTime, 10); + writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // metadata writer to delete column_stats partition + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + metadataWriter.dropIndex("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); + + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); + // partition should still be physically present + assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size()); + assertTrue(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())); + + Option completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant(); + assertTrue(completedReplaceInstant.isPresent()); + assertEquals("0000003", completedReplaceInstant.get().getTimestamp()); + + final Map metadataEnabledPartitionTypes = new HashMap<>(); + metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e)); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + metadataTablePartitions.forEach(partition -> { + List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); + if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) { + // there should not be any file slice in column_stats partition + assertTrue(latestSlices.isEmpty()); + } else { + assertFalse(latestSlices.isEmpty()); + assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() + <= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest base file per file group"); + assertTrue(latestSlices.size() + <= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest file slice per file group"); + } + }); + } + } + /** * Tests that virtual key configs are honored in base files after compaction in metadata table. * From e179a1346b5b070a79e268bf6d5faaf380316401 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 29 Mar 2022 22:49:20 +0530 Subject: [PATCH 2/8] Rename API to dropPartitions --- .../org/apache/hudi/metadata/HoodieTableMetadataWriter.java | 6 +++--- .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 4 ++-- .../hudi/client/functional/TestHoodieBackedMetadata.java | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 3ccc3dcd8a0e3..9e71e9ce719a1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -96,11 +96,11 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); /** - * Drop the given metadata indexes. This path reuses DELETE_PARTITION operation. + * Drop the given metadata partitions. This path reuses DELETE_PARTITION operation. * * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline - * @param indexesToDrop - list of {@link MetadataPartitionType} to drop + * @param partitions - list of {@link MetadataPartitionType} to drop * @throws IOException */ - void dropIndex(String instantTime, List indexesToDrop); + void dropPartitions(String instantTime, List partitions); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 8226ddc29198c..56ac3aad7e89b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -164,7 +164,7 @@ protected void commit(String instantTime, Map indexesToDrop) { + public void dropPartitions(String instantTime, List partitions) { throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet."); } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 568ab95aea988..3a1e469568d64 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -183,8 +183,8 @@ protected void commit(String instantTime, Map indexesToDrop) { - List partitionsToDrop = indexesToDrop.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); + public void dropPartitions(String instantTime, List partitions) { + List partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); LOG.warn("Deleting Metadata Table partitions: " + partitionsToDrop); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 72f798de0e781..5cdcb7c700b89 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -488,7 +488,7 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I // metadata writer to delete column_stats partition HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); - metadataWriter.dropIndex("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); + metadataWriter.dropPartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); From 0c5306176fa873ef2dc2b0544f14a8820c1a9941 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 31 Mar 2022 02:35:01 +0530 Subject: [PATCH 3/8] Address review comments --- .../org/apache/hudi/metadata/HoodieTableMetadataWriter.java | 2 +- .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 4 ++-- .../hudi/client/functional/TestHoodieBackedMetadata.java | 3 +-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 9e71e9ce719a1..0f2311ad751c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -102,5 +102,5 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * @param partitions - list of {@link MetadataPartitionType} to drop * @throws IOException */ - void dropPartitions(String instantTime, List partitions); + void deletePartitions(String instantTime, List partitions); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 56ac3aad7e89b..76774e9618d79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -164,7 +164,7 @@ protected void commit(String instantTime, Map partitions) { + public void deletePartitions(String instantTime, List partitions) { throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet."); } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 3a1e469568d64..7d94b2d4f53f1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -183,9 +183,9 @@ protected void commit(String instantTime, Map partitions) { + public void deletePartitions(String instantTime, List partitions) { List partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); - LOG.warn("Deleting Metadata Table partitions: " + partitionsToDrop); + LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 5cdcb7c700b89..2f50ed9576c3f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -483,12 +483,11 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I records = dataGen.generateInserts(newCommitTime, 10); writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - validateMetadata(client); // metadata writer to delete column_stats partition HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); - metadataWriter.dropPartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); + metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); From 9d7d1fd7c8d23d1b743c4df75ec8890d7f3819f7 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 31 Mar 2022 14:00:45 +0530 Subject: [PATCH 4/8] Rebase and change test to check partition deletion --- .../metadata/HoodieTableMetadataWriter.java | 4 +-- .../action/clean/CleanActionExecutor.java | 7 +++-- .../hudi/table/action/clean/CleanPlanner.java | 3 +++ .../functional/TestHoodieBackedMetadata.java | 27 ++++++++----------- .../metadata/HoodieTableMetadataUtil.java | 4 +-- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 --- 6 files changed, 20 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 0f2311ad751c6..3cb99137410b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; -import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -96,11 +95,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); /** - * Drop the given metadata partitions. This path reuses DELETE_PARTITION operation. + * Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation. * * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline * @param partitions - list of {@link MetadataPartitionType} to drop - * @throws IOException */ void deletePartitions(String instantTime, List partitions); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 31d26f133e07f..f2d6411ea0324 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -72,7 +72,7 @@ public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config this.skipLocking = skipLocking; } - static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { + private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { Path deletePath = new Path(deletePathStr); LOG.debug("Working on delete path :" + deletePath); try { @@ -88,7 +88,7 @@ static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throw } } - static Stream> deleteFilesFunc(Iterator> cleanFileInfo, HoodieTable table) { + private static Stream> deleteFilesFunc(Iterator> cleanFileInfo, HoodieTable table) { Map partitionCleanStatMap = new HashMap<>(); FileSystem fs = table.getMetaClient().getFs(); @@ -138,8 +138,6 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); - List partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>(); - Stream> partitionCleanStats = context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition, iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism); @@ -147,6 +145,7 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean Map partitionCleanStatsMap = partitionCleanStats .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + List partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>(); partitionsToBeDeleted.forEach(entry -> { try { deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 3eb0c97e5ea39..24cdd433348d6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -278,6 +278,9 @@ private Pair> getFilesToCleanKeepingLatestCommits(S * retain 10 commits, and commit batch time is 30 mins, then you have 5 hrs of lookback) *

* This policy is the default. + * + * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, + * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 2f50ed9576c3f..295a293bf6af0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -450,23 +450,18 @@ private void testTableOperationsForMetaIndexImpl(final HoodieWriteConfig writeCo @EnumSource(HoodieTableType.class) public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException { initPath(); - HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) - .withIndexConfig(HoodieIndexConfig.newBuilder() - .bloomIndexBucketizedChecking(false) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(true) - .withMetadataIndexBloomFilter(true) - .withMetadataIndexBloomFilterFileGroups(4) - .withMetadataIndexColumnStats(true) - .withMetadataIndexBloomFilterFileGroups(2) - .withMetadataIndexForAllColumns(true) - .build()) + int maxCommits = 1; + HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); - init(tableType, writeConfig); + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) { // Write 1 (Bulk insert) String newCommitTime = "0000001"; List records = dataGen.generateInserts(newCommitTime, 20); @@ -491,9 +486,9 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); - // partition should still be physically present + // partition should be physically deleted assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size()); - assertTrue(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())); + assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())); Option completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant(); assertTrue(completedReplaceInstant.isPresent()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index bb4aaca8a478a..36c58ca3871c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -195,7 +195,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); // Add record bearing added partitions list - ArrayList partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + List partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); @@ -371,7 +371,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCl records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true)); } LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size()); return records; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 01ca8e92854ad..033b2618a127c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -45,11 +45,8 @@ import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager -import org.apache.spark.SPARK_VERSION import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType From 8a189f17a5180474d9f58ffc54b737100ebc2b53 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 31 Mar 2022 19:58:38 +0530 Subject: [PATCH 5/8] Disable show partitions check --- .../hudi/TestAlterTableDropPartition.scala | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 4eec95abab389..e5d8fe80af045 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -120,12 +120,15 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, + // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. + // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - if (urlencode) { + /*if (urlencode) { checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - } + }*/ } } } @@ -171,12 +174,15 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, + // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. + // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - if (urlencode) { + /*if (urlencode) { checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - } + }*/ } } } @@ -211,8 +217,11 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02")) + // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, + // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. + // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) + // checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) } Seq(false, true).foreach { hiveStyle => @@ -256,12 +265,15 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { Seq(2, "l4", "v1", "2021", "10", "02") ) + // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, + // cleaner will delete the partition when it kicks in, however the replacecommit gets written in the timeline. + // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - if (hiveStyle) { + /*if (hiveStyle) { checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - } + }*/ } } } @@ -305,12 +317,15 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { assertResult(false)(existsPath( s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01")) + // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, + // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. + // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - if (hiveStyle) { + /*if (hiveStyle) { checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - } + }*/ } } } From 40c9bb022ed7cc41858ff6b800df9b27424b6087 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Fri, 1 Apr 2022 00:40:25 +0800 Subject: [PATCH 6/8] fix show partitions check --- .../hudi/table/action/clean/CleanPlanner.java | 39 +++++---- .../hudi/metadata/HoodieMetadataPayload.java | 17 ++++ .../metadata/HoodieTableMetadataUtil.java | 81 +++++++++++-------- .../hudi/TestAlterTableDropPartition.scala | 33 +++----- 4 files changed, 97 insertions(+), 73 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 24cdd433348d6..d02fddb016f02 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -18,10 +18,22 @@ package org.apache.hudi.table.action.clean; +import java.io.IOException; +import java.io.Serializable; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -45,24 +57,11 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieSavepointException; +import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; -import java.io.Serializable; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - /** * Cleaner is responsible for garbage collecting older files in a given partition path. Such that *

@@ -206,7 +205,15 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata */ private List getPartitionPathsForFullCleaning() { // Go to brute force mode of scanning all partitions - return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath()); + try { + // Because the partition of BaseTableMetadata has been deleted, + // all partition information can only be obtained from FileSystemBackedTableMetadata. + FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context, + context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning()); + return fsBackedTableMetadata.getAllPartitionPaths(); + } catch (IOException e) { + return Collections.emptyList(); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 4b9a185d2962a..83c25162845c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -239,6 +239,23 @@ public static HoodieRecord createPartitionListRecord(List return new HoodieAvroRecord<>(key, payload); } + /** + * Create and return a {@code HoodieMetadataPayload} to save list of partitions. + * + * @param partitionsAdded The list of added partitions + * @param partitionsDeleted The list of deleted partitions + */ + public static HoodieRecord createPartitionListRecord(List partitionsAdded, List partitionsDeleted) { + Map fileInfo = new HashMap<>(); + partitionsAdded.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitionsDeleted.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, true))); + + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); + return new HoodieAvroRecord<>(key, payload); + } + /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 36c58ca3871c7..f37c3fcb9861b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,6 +18,36 @@ package org.apache.hudi.metadata; +import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; +import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -38,7 +68,9 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -61,38 +93,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import javax.annotation.Nonnull; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; -import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; -import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; -import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; - /** * A utility to convert timeline information to metadata table records. */ @@ -197,7 +197,10 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo // Add record bearing added partitions list List partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); - records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); + // Add record bearing deleted partitions list + List partitionsDeleted = getPartitionsDeleted(commitMetadata); + + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted)); // Update files listing records for each individual partition List> updatedPartitionFilesRecords = @@ -247,6 +250,18 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo return records; } + private static ArrayList getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { + if (commitMetadata instanceof HoodieReplaceCommitMetadata + && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { + Map> partitionToReplaceFileIds = + ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); + if (!partitionToReplaceFileIds.isEmpty()) { + return new ArrayList<>(partitionToReplaceFileIds.keySet()); + } + } + return new ArrayList<>(); + } + /** * Convert commit action metadata to bloom filter records. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index e5d8fe80af045..4eec95abab389 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -120,15 +120,12 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) - // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, - // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. - // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - /*if (urlencode) { + if (urlencode) { checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - }*/ + } } } } @@ -174,15 +171,12 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) - // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, - // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. - // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - /*if (urlencode) { + if (urlencode) { checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - }*/ + } } } } @@ -217,11 +211,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02")) - // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, - // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. - // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - // checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) + checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) } Seq(false, true).foreach { hiveStyle => @@ -265,15 +256,12 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { Seq(2, "l4", "v1", "2021", "10", "02") ) - // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, - // cleaner will delete the partition when it kicks in, however the replacecommit gets written in the timeline. - // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - /*if (hiveStyle) { + if (hiveStyle) { checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - }*/ + } } } } @@ -317,15 +305,12 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { assertResult(false)(existsPath( s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01")) - // TODO (HUDI-3135): These validations are failing. Due to lazy deletion, - // cleaner will delete the partition when it kicks in, however the replacecommit get written in the timeline. - // We should check why FSUtils#getAllPartitionPaths does not exclude replaced filegroups. // show partitions - /*if (hiveStyle) { + if (hiveStyle) { checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) } else { checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) - }*/ + } } } } From fd030c4e79192f66846c9e0cc4223e17098d4dcf Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 31 Mar 2022 22:47:57 +0530 Subject: [PATCH 7/8] Rearracnge imports so that diff is clean --- .../hudi/table/action/clean/CleanPlanner.java | 31 ++++---- .../hudi/metadata/HoodieMetadataPayload.java | 15 ++-- .../metadata/HoodieTableMetadataUtil.java | 74 ++++++++++--------- .../hudi/TestAlterTableDropPartition.scala | 4 +- 4 files changed, 65 insertions(+), 59 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index d02fddb016f02..79eef43b3c00a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -18,19 +18,6 @@ package org.apache.hudi.table.action.clean; -import java.io.IOException; -import java.io.Serializable; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -59,9 +46,24 @@ import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; +import java.io.Serializable; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * Cleaner is responsible for garbage collecting older files in a given partition path. Such that *

@@ -201,7 +203,6 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata /** * Scan and list all partitions for cleaning. * @return all partitions paths for the dataset. - * @throws IOException */ private List getPartitionPathsForFullCleaning() { // Go to brute force mode of scanning all partitions @@ -476,7 +477,7 @@ public Option getEarliestCommitToRetain() { /** * Determine if file slice needed to be preserved for pending compaction. - * + * * @param fileSlice File Slice * @return true if file slice needs to be preserved, false otherwise. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 83c25162845c0..0f4599724b700 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -18,13 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; @@ -42,6 +35,14 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index f37c3fcb9861b..9e3eca3eb50a2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,41 +18,6 @@ package org.apache.hudi.metadata; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; -import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; -import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; -import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; -import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.annotation.Nonnull; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -90,9 +55,48 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; +import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; + /** * A utility to convert timeline information to metadata table records. */ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 4eec95abab389..fdff6928a215f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -222,7 +222,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) .toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi") @@ -273,7 +273,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) .toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi") From 6c89a72ace5325339b2d653e7a1814078fa9bf6e Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 1 Apr 2022 01:38:53 +0530 Subject: [PATCH 8/8] Remove redundant mdt validation Rebase --- .../org/apache/hudi/metadata/HoodieTableMetadataWriter.java | 1 + .../hudi/client/functional/TestHoodieBackedMetadata.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 3cb99137410b4..83fe186727b32 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; +import java.io.IOException; import java.io.Serializable; import java.util.List; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 295a293bf6af0..f60de7dfdd526 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1827,7 +1827,6 @@ public void testDeletePartitions() throws Exception { newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); client.startCommitWithTime(newCommitTime); client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime); - validateMetadata(client); // add 1 more commit newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); @@ -1842,7 +1841,7 @@ public void testDeletePartitions() throws Exception { writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - // trigger clean which will actually triggger deletion of the partition + // trigger clean which will actually trigger deletion of the partition newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime); validateMetadata(client);