diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 62baac4cc6b2c..259c2e40cd477 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -303,8 +303,8 @@ private List buildFileIndex() { } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants(), fileStatuses); + // file-slice after pending compaction-requested instant-time is also considered valid + metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), fileStatuses); String latestCommit = fsView.getLastInstant().get().getTimestamp(); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); final AtomicInteger cnt = new AtomicInteger(0); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index e6424a1abb751..6fbbab81fa4a6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -463,6 +463,32 @@ void testReadWithWiderSchema(HoodieTableType tableType) throws Exception { TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT); } + /** + * Test reading file groups with compaction plan scheduled and delta logs. + * File-slice after pending compaction-requested instant-time should also be considered valid. + */ + @Test + void testReadMORWithCompactionPlanScheduled() throws Exception { + Map options = new HashMap<>(); + // compact for each commit + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write three commits + for (int i = 0; i < 6; i += 2) { + List dataset = TestData.dataSetInsert(i + 1, i + 2); + TestData.writeData(dataset, conf); + } + + InputFormat inputFormat1 = this.tableSource.getInputFormat(); + assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class)); + + List actual = readData(inputFormat1); + final List expected = TestData.dataSetInsert(1, 2, 3, 4, 5, 6); + TestData.assertRowDataEquals(actual, expected); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 11ce0a66ad330..a747672827169 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -237,7 +237,7 @@ public static List, List>> groupLogsByBaseFi try { // Both commit and delta-commits are included - pick the latest completed one Option latestCompletedInstant = - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant(); Stream latestFileSlices = latestCompletedInstant .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 1d140304cb4c9..a1d857c948c17 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -151,8 +151,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, // Load files from the global paths if it has defined to be compatible with the original mode val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) val fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, inMemoryFileIndex.allFiles().toArray) + // file-slice after pending compaction-requested instant-time is also considered valid + metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants, + inMemoryFileIndex.allFiles().toArray) val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)