diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 786023efa335f..c8f6e2a789392 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -19,7 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -29,7 +29,6 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; -import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadOperator; import org.apache.hudi.table.format.FilePathUtils; @@ -272,39 +271,24 @@ private List buildFileIndex(Path[] paths) { HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants(), fileStatuses); - List latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList()); String latestCommit = fsView.getLastInstant().get().getTimestamp(); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); final AtomicInteger cnt = new AtomicInteger(0); - if (latestFiles.size() > 0) { - Map> fileGroup = - HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf, latestFiles); - return fileGroup.entrySet().stream().map(kv -> { - HoodieBaseFile baseFile = kv.getKey(); - Option> logPaths = kv.getValue().size() == 0 - ? Option.empty() - : Option.of(kv.getValue()); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), - baseFile.getPath(), logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); - }).collect(Collectors.toList()); - } else { - // all the files are logs - return Arrays.stream(paths).map(partitionPath -> { - String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath); - return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) - .map(fileSlice -> { - Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .collect(Collectors.toList())); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), - null, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); - }).collect(Collectors.toList()); }) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - } + // generates one input split for each file group + return Arrays.stream(paths).map(partitionPath -> { + String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath); + return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) + .map(fileSlice -> { + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); + }).collect(Collectors.toList()); }) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } public InputFormat getInputFormat() { @@ -431,11 +415,12 @@ public Configuration getConf() { } /** - * Reload the active timeline view. + * Reset the state of the table source. */ @VisibleForTesting - public void reloadActiveTimeline() { + public void reset() { this.metaClient.reloadActiveTimeline(); + this.requiredPartitions = null; } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 0ce46980d4959..c8885b043c189 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -92,7 +92,7 @@ void testRead(HoodieTableType tableType) throws Exception { TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); // refresh the input format - this.tableSource.reloadActiveTimeline(); + this.tableSource.reset(); inputFormat = this.tableSource.getInputFormat(); result = readData(inputFormat); @@ -133,8 +133,12 @@ void testReadBaseAndLogFiles() throws Exception { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); + // write another commit using logs with separate partition + // so the file group has only logs + TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf); + // refresh the input format - this.tableSource.reloadActiveTimeline(); + this.tableSource.reset(); inputFormat = this.tableSource.getInputFormat(); result = readData(inputFormat); @@ -143,6 +147,10 @@ void testReadBaseAndLogFiles() throws Exception { expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, " + "id10,Ella,38,1970-01-01T00:00:00.007,par4, " + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, " + + "id12,Monica,27,1970-01-01T00:00:00.009,par5, " + + "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, " + + "id14,Rachel,52,1970-01-01T00:00:00.011,par6, " + + "id15,Ross,29,1970-01-01T00:00:00.012,par6, " + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, " + "id3,Julian,54,1970-01-01T00:00:00.003,par2, " + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, " diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index fae0765018e24..50ecf543e70ec 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -114,6 +114,17 @@ public class TestData { TimestampData.fromEpochMillis(8), StringData.fromString("par4")) ); + public static List DATA_SET_INSERT_SEPARATE_PARTITION = Arrays.asList( + insertRow(StringData.fromString("id12"), StringData.fromString("Monica"), 27, + TimestampData.fromEpochMillis(9), StringData.fromString("par5")), + insertRow(StringData.fromString("id13"), StringData.fromString("Phoebe"), 31, + TimestampData.fromEpochMillis(10), StringData.fromString("par5")), + insertRow(StringData.fromString("id14"), StringData.fromString("Rachel"), 52, + TimestampData.fromEpochMillis(11), StringData.fromString("par6")), + insertRow(StringData.fromString("id15"), StringData.fromString("Ross"), 29, + TimestampData.fromEpochMillis(12), StringData.fromString("par6")) + ); + public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(