diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 79a813567b961..60ea9542579bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -537,7 +537,8 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri */ protected void mayBeCleanAndArchive(HoodieTable table) { autoCleanOnCommit(); - autoArchiveOnCommit(table); + // reload table to that timeline reflects the clean commit + autoArchiveOnCommit(createTable(config, hadoopConf)); } protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option> extraMetadata) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala index 9b3f5a9c38bb7..80ce82ca88eae 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala @@ -56,14 +56,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect active commits for table val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() - assertResult(5) { + assertResult(4) { commits.length } // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(2) { + assertResult(3) { archivedCommits.length } } @@ -106,14 +106,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect active commits for table val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() - assertResult(5) { + assertResult(4) { commits.length } // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(2) { + assertResult(3) { archivedCommits.length } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 739b75ae5f778..febb9b15164d5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1109,15 +1109,31 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieR TestHelpers.addRecordMerger(recordType, configs); cfg.configs = configs; cfg.continuousMode = false; + // timeline as of now. no cleaner and archival kicked in. + // c1, c2, rc3, c4, c5, rc6, - for (int i = 0; i < 2; i++) { - ds = new HoodieDeltaStreamer(cfg, jsc); - ds.sync(); - } - - // Step 5 : FirstReplaceHoodieInstant is retained for clean. + ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + // after 1 round of sync, timeline will be as follows + // just before clean + // c1, c2, rc3, c4, c5, rc6, c7 + // after clean + // c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is c7) + // after archival (retain 4 commits) + // c4, c5, rc6, c7, c8.clean + + // old code has 2 sync() calls. book-keeping the sequence for now. + // after 2nd round of sync + // just before clean + // c4, c5, rc6, c7, c8.clean, c9 + // after clean + // c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain c9) + // after archival + // c5, rc6, c7, c8.clean, c9, c10.clean + + // Step 5 : FirstReplaceHoodieInstant should not be retained. long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count(); - assertEquals(1, count); + assertEquals(0, count); // Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner. for (String replacedFilePath : replacedFilePaths) { @@ -2387,7 +2403,7 @@ private void insertInTable(String tableBasePath, int count, WriteOperationType o if (cfg.configs == null) { cfg.configs = new ArrayList<>(); } - cfg.configs.add("hoodie.cleaner.commits.retained=3"); + cfg.configs.add("hoodie.cleaner.commits.retained=2"); cfg.configs.add("hoodie.keep.min.commits=4"); cfg.configs.add("hoodie.keep.max.commits=5"); cfg.configs.add("hoodie.test.source.generate.inserts=true");