diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index c81f2d6977954..0a6659cb1e7e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; @@ -413,15 +412,33 @@ private Stream getCommitInstantsToArchive() throws IOException { // with logic above to avoid Stream.concat HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - Option oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline() - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) - .filter(s -> !s.isCompleted()) + // Get the oldest inflight instant and a completed commit before this inflight instant. + Option oldestPendingInstant = table.getActiveTimeline() + .getWriteTimeline() + .filter(instant -> !instant.isCompleted()) .firstInstant(); - Option oldestInflightCommitInstant = - table.getActiveTimeline() - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) - .filterInflights().firstInstant(); + // Oldest commit to retain is the greatest completed commit, that is less than the oldest pending instant. + // In some cases when inflight is the lowest commit then oldest commit to retain will be equal to oldest + // inflight commit. + Option oldestCommitToRetain; + if (oldestPendingInstant.isPresent()) { + Option completedCommitBeforeOldestPendingInstant = + Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants() + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), + LESSER_THAN, oldestPendingInstant.get().getTimestamp())).findFirst()); + // Check if the completed instant is higher than the oldest inflight instant + // in that case update the oldestCommitToRetain to oldestInflight commit time. + if (!completedCommitBeforeOldestPendingInstant.isPresent() + || HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(), + LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) { + oldestCommitToRetain = oldestPendingInstant; + } else { + oldestCommitToRetain = completedCommitBeforeOldestPendingInstant; + } + } else { + oldestCommitToRetain = Option.empty(); + } // NOTE: We cannot have any holes in the commit timeline. // We cannot archive any commits which are made after the first savepoint present, @@ -460,19 +477,12 @@ private Stream getCommitInstantsToArchive() throws IOException { return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); } }).filter(s -> { - // Ensure commits >= the oldest pending compaction/replace commit is retained - return oldestPendingCompactionAndReplaceInstant + // oldestCommitToRetain is the highest completed commit instant that is less than the oldest inflight instant. + // By filtering out any commit >= oldestCommitToRetain, we can ensure there are no gaps in the timeline + // when inflight commits are present. + return oldestCommitToRetain .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); - }).filter(s -> { - // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't - // get archived, i.e, instants after the oldestInflight are retained on the timeline - if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) { - return oldestInflightCommitInstant.map(instant -> - compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) - .orElse(true); - } - return true; }).filter(s -> oldestInstantToRetainForCompaction.map(instantToRetain -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 73bfacfd9d7d6..b049c60b9e76e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -90,6 +90,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -913,19 +914,21 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata) if (i != 7) { assertEquals(originalCommits, commitsAfterArchival); } else { - // on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); + // on 7th commit, archival will kick in, but cannot archive any commit, + // since 1st deltacommit is the greatest completed commit before an oldest inflight commit. + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 0); } } else { if (i != 7) { assertEquals(originalCommits, commitsAfterArchival); } else { - // on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); + // on 7th commit, archival will kick in, but cannot archive any commit, + // since 1st deltacommit is the greatest completed commit before an oldest inflight commit. + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 0); for (int j = 1; j <= 7; j++) { if (j == 1) { - // first commit should be archived - assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + // first commit should not be archived + assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); } else if (j == 2) { // 2nd compaction should not be archived assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j))); @@ -1418,6 +1421,115 @@ public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enable } } + /** + * Test archival functionality when there are inflights files. + * Archive should hold on to the greatest completed commit that is less than the oldes inflight commit. + * @throws Exception + */ + @Test + public void testGetCommitInstantsToArchiveDuringInflightCommits() throws Exception { + HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 3, 4, 2); + + Set expectedInstants = new HashSet<>(); + // Create 3 completed commits. + for (int i = 0; i < 3; i++) { + String instantTime = "100" + i; + HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf()); + expectedInstants.add(instantTime); + } + // Create an inflight file. + String replaceInstant = "1003"; + HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, replaceInstant, wrapperFs.getConf()); + expectedInstants.add(replaceInstant); + // Create 3 more instants + for (int i = 4; i < 7; i++) { + String instantTime = "100" + i; + HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf()); + expectedInstants.add(instantTime); + } + // Create another inflight commit + HoodieTestDataGenerator.createRequestedCommitFile(basePath, "1007", wrapperFs.getConf()); + HoodieTestDataGenerator.createPendingCommitFile(basePath, "1007", wrapperFs.getConf()); + expectedInstants.add("1007"); + // Create 6 more instants + for (int i = 0; i < 6; i++) { + String instantTime = "101" + i; + HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf()); + expectedInstants.add(instantTime); + } + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getWriteTimeline(); + + // Check the count of instants. + assertEquals(expectedInstants.size(), timeline.countInstants(), "Loaded 14 commits and the count should match"); + + // Run archival + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + boolean result = archiver.archiveIfRequired(context); + expectedInstants.remove("1000"); + expectedInstants.remove("1001"); + assertTrue(result); + timeline = metaClient.reloadActiveTimeline().getWriteTimeline(); + + // Check the count of instants after archive it should have 2 less instants + // because 103 replacecommit's inflight will block archival. + assertEquals(12, timeline.countInstants(), "After archival only first 2 commits should be archived"); + assertEquals(expectedInstants.size(), timeline.countInstants(), "After archival only first 2 commits should be archived"); + + HoodieTimeline finalTimeline = timeline; + assertEquals(12, expectedInstants.stream().filter(instant -> finalTimeline.containsInstant(instant)).count()); + assertEquals("1002", timeline.getInstantsAsStream().findFirst().get().getTimestamp()); + + // Delete replacecommit requested instant. + Path replaceCommitRequestedPath = new Path( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedReplaceFileName(replaceInstant)); + metaClient.getFs().delete(replaceCommitRequestedPath); + metaClient.reloadActiveTimeline(); + + // Run archival + assertTrue(archiver.archiveIfRequired(context)); + timeline = metaClient.reloadActiveTimeline().getWriteTimeline(); + expectedInstants.removeAll(Arrays.asList("1002", "1003", "1004", "1005")); + + // Check the count of instants after archive it should have 3 more less instants + // This time 1007 inflight commit will block archival. + assertEquals(8, timeline.countInstants(), "After archival only first 2 commits should be archived"); + assertEquals(expectedInstants.size(), timeline.countInstants(), "After archival only first 2 commits should be archived"); + HoodieTimeline refreshedTimeline = timeline; + assertEquals(8, expectedInstants.stream().filter(instant -> refreshedTimeline.containsInstant(instant)).count()); + assertEquals("1006", timeline.getInstantsAsStream().findFirst().get().getTimestamp()); + } + + /** + * If replacecommit inflight is the oldest commit in the timeline or for that matter any inflight commit is present + * then the archival is blocked from there. This method test this scenario. + */ + @Test + public void testWithOldestReplaceCommit() throws Exception { + HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 2, 3, 2); + + HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "1001", wrapperFs.getConf()); + HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "1001", wrapperFs.getConf()); + // Create 8 completed commits. + for (int i = 2; i < 10; i++) { + String instantTime = "100" + i; + HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf()); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + + HoodieTimeline timeline = metaClient.reloadActiveTimeline(); + assertEquals(9, timeline.countInstants(), "Loaded 9 commits and the count should match"); + boolean result = archiver.archiveIfRequired(context); + assertTrue(result); + timeline = metaClient.reloadActiveTimeline(); + assertEquals(9, timeline.countInstants(), + "Since we have a pending replacecommit at 1001, we should never archive any commit after 1001"); + assertEquals("1001", timeline.getInstantsAsStream().findFirst().get().getTimestamp()); + } + @Test public void testArchivalAndCompactionInMetadataTable() throws Exception { init(HoodieTableType.COPY_ON_WRITE); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index fae8362a74469..dc6de394bf32d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -256,7 +256,12 @@ public static Option getOldestInstantToRetainForClustering( retainLowerBound = earliestInstantToRetain.getTimestamp(); } else { // no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS clean policy, - // retain first instant after clean instant + // retain first instant after clean instant. + // For KEEP_LATEST_FILE_VERSIONS cleaner policy, file versions are only maintained for active file groups + // not for replaced file groups. So, last clean instant can be considered as a lower bound, since + // the cleaner would have removed all the file groups until then. But there is a catch to this logic, + // while cleaner is running if there is a pending replacecommit then those files are not cleaned. + // TODO: This case has to be handled. HUDI-6352 retainLowerBound = cleanInstant.getTimestamp(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 96b22b14990f2..80fb90f6fe34f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -493,6 +493,18 @@ public GenericRecord generateRecordForShortTripSchema(String rowKey, String ride return rec; } + public static void createRequestedCommitFile(String basePath, String instantTime, Configuration configuration) throws IOException { + Path pendingRequestedFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedCommitFileName(instantTime)); + createEmptyFile(basePath, pendingRequestedFile, configuration); + } + + public static void createPendingCommitFile(String basePath, String instantTime, Configuration configuration) throws IOException { + Path pendingCommitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeInflightCommitFileName(instantTime)); + createEmptyFile(basePath, pendingCommitFile, configuration); + } + public static void createCommitFile(String basePath, String instantTime, Configuration configuration) { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); createCommitFile(basePath, instantTime, configuration, commitMetadata); @@ -534,6 +546,20 @@ private static void createMetadataFile(String f, String basePath, Configuration } } + public static void createReplaceCommitRequestedFile(String basePath, String instantTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedReplaceFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + + public static void createReplaceCommitInflightFile(String basePath, String instantTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeInflightReplaceFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + private static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime), HoodieTimeline.makeRequestedReplaceFileName(instantTime)) @@ -558,6 +584,13 @@ private static void createEmptyFile(String basePath, Path filePath, Configuratio os.close(); } + public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedCompactionFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant, Configuration configuration) throws IOException { Path commitFile = diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 32690f12096d1..ab1a7a4551cbe 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -694,6 +694,41 @@ public void testSnapshotPreCommitValidate() throws IOException { ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10); } + /** + * Test scenario where inflight commit is between completed commits. + */ + @Test + public void testSnapshotPreCommitValidateWithInflights() throws IOException { + // Create commit and data files with commit 000 + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 5, "000"); + createCommitFile(basePath, "000", "2016/05/01"); + + // create inflight commit add more files with same file_id. + InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId1", 5, "100"); + FileCreateUtils.createInflightCommit(basePath.toString(), "100"); + + // Create another commit without datafiles. + createCommitFile(basePath, "200", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + // Now, the original data files with commit time 000 should be returned. + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(5, files.length, "Snapshot read must return all files in partition"); + ensureFilesInCommit("Should return base files from commit 000, inflight data files with " + + "greater timestamp should be filtered", files, "000", 5); + + // Create data files with same file_id for commit 200. + InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId1", 5, "200"); + + // This time data files from commit time 200 will be returned. + files = inputFormat.listStatus(jobConf); + assertEquals(5, files.length, "Snapshot read must return all files in partition"); + ensureFilesInCommit("Only completed commits files should be returned.", + files, "200", 5); + } + private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException { int actualCount = 0;