From f7c1e9665c61ac30016c6519ce32945cc27b2a79 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 26 Aug 2021 14:32:08 +0800 Subject: [PATCH 1/3] done --- .../java/org/apache/hudi/client/AbstractHoodieWriteClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index dfb2fc8fc1c22..9650ddaebfd93 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -434,10 +434,10 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata me // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + autoCleanOnCommit(); // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); archiveLog.archiveIfRequired(context); - autoCleanOnCommit(); if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) { syncTableMetadata(); } From 4a162abd6b22d7845d678548f4f3a811e8232b46 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Fri, 27 Aug 2021 15:19:20 +0800 Subject: [PATCH 2/3] add uts --- .../functional/TestHoodieDeltaStreamer.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index aab02da238dbf..d2764f73844a8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,20 +18,26 @@ package org.apache.hudi.utilities.functional; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.HoodieSparkSqlWriter; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -40,6 +46,7 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; @@ -92,14 +99,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.io.FileNotFoundException; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -972,6 +982,79 @@ public void testInlineClustering() throws Exception { }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception { + String tableBasePath = dfsBasePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean; + + int totalRecords = 3000; + + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); + cfg.configs.add("hoodie.parquet.small.file.limit=0"); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + + TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline(); + Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1); + assertTrue(firstReplaceHoodieInstant.isPresent()); + + Option firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get()); + HoodieReplaceCommitMetadata firstReplaceMetadata = HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), HoodieReplaceCommitMetadata.class); + Map> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds(); + String partitionName = null; + List replacedFileIDs = null; + ArrayList replacedFilePaths = new ArrayList<>(); + for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) { + partitionName = String.valueOf(entry.getKey()); + replacedFileIDs = (List) entry.getValue(); + } + + assertNotNull(partitionName); + assertNotNull(replacedFileIDs); + + Path partitionPath = new Path(meta.getBasePath(), partitionName); + RemoteIterator hoodieFiles = meta.getFs().listFiles(partitionPath, true); + while (hoodieFiles.hasNext()) { + LocatedFileStatus f = hoodieFiles.next(); + String file = f.getPath().toUri().toString(); + if (!file.contains(".hoodie_partition_metadata")) { + for (Object replacedFileID : replacedFileIDs) { + if (file.contains(String.valueOf(replacedFileID))) { + replacedFilePaths.add(file); + } + } + } + } + + List configs = getAsyncServicesConfigs(1, "true", "true", "2", "", ""); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean)); + + cfg.configs = configs; + cfg.continuousMode = false; + ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + assertFalse(replacedFilePaths.isEmpty()); + + for (String replacedFilePath : replacedFilePaths) { + assertFalse(meta.getFs().exists(new Path(replacedFilePath))); + } + } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List configs = new ArrayList<>(); From c5a338b609c436621b81c4e55e6bf378a218e8c6 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Fri, 27 Aug 2021 15:51:37 +0800 Subject: [PATCH 3/3] add more java docs --- .../functional/TestHoodieDeltaStreamer.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index d2764f73844a8..ad52ce3b15f4b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -22,13 +22,11 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.HoodieSparkSqlWriter; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; @@ -37,7 +35,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -46,7 +43,6 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; @@ -99,11 +95,9 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import java.io.FileNotFoundException; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -989,12 +983,13 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws int totalRecords = 3000; + // Step 1 : Prepare and insert data without archival and cleaner. + // Make sure that there are 6 commits including 2 replacecommits completed. HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); - cfg.configs.add("hoodie.parquet.small.file.limit=0"); + cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); @@ -1004,6 +999,7 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs); TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + // Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs. HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline(); Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1); @@ -1014,7 +1010,6 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Map> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds(); String partitionName = null; List replacedFileIDs = null; - ArrayList replacedFilePaths = new ArrayList<>(); for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) { partitionName = String.valueOf(entry.getKey()); replacedFileIDs = (List) entry.getValue(); @@ -1023,33 +1018,39 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws assertNotNull(partitionName); assertNotNull(replacedFileIDs); + // Step 3 : Based to replacedFileIDs , get the corresponding complete path. + ArrayList replacedFilePaths = new ArrayList<>(); Path partitionPath = new Path(meta.getBasePath(), partitionName); RemoteIterator hoodieFiles = meta.getFs().listFiles(partitionPath, true); while (hoodieFiles.hasNext()) { LocatedFileStatus f = hoodieFiles.next(); String file = f.getPath().toUri().toString(); - if (!file.contains(".hoodie_partition_metadata")) { - for (Object replacedFileID : replacedFileIDs) { - if (file.contains(String.valueOf(replacedFileID))) { - replacedFilePaths.add(file); - } + for (Object replacedFileID : replacedFileIDs) { + if (file.contains(String.valueOf(replacedFileID))) { + replacedFilePaths.add(file); } } } + assertFalse(replacedFilePaths.isEmpty()); + + // Step 4 : Insert 1 record and trigger sync/async cleaner and archive. List configs = getAsyncServicesConfigs(1, "true", "true", "2", "", ""); configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS")); configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1")); configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2")); configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3")); configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean)); - cfg.configs = configs; cfg.continuousMode = false; ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); - assertFalse(replacedFilePaths.isEmpty()); + // Step 5 : Make sure that firstReplaceHoodieInstant is archived. + long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count(); + assertEquals(0, count); + + // Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner. for (String replacedFilePath : replacedFilePaths) { assertFalse(meta.getFs().exists(new Path(replacedFilePath))); }