diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 02e0e253cf577..94eeefcd36df3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -226,7 +226,7 @@ public Result inputSplits( String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index d00eb3e3ec700..da4abf0a96e60 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -181,6 +181,7 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) + .keyBy(inputSplit -> inputSplit.getFileId()) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); @@ -316,7 +317,7 @@ private List buildFileIndex() { .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 156622c303519..cde646e41f035 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -43,6 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit { private final long maxCompactionMemoryInBytes; private final String mergeType; private final Option instantRange; + private String fileId; // for streaming reader to record the consumed offset, // which is the start of next round reading. @@ -56,7 +57,8 @@ public MergeOnReadInputSplit( String tablePath, long maxCompactionMemoryInBytes, String mergeType, - @Nullable InstantRange instantRange) { + @Nullable InstantRange instantRange, + String fileId) { this.splitNum = splitNum; this.basePath = Option.ofNullable(basePath); this.logPaths = logPaths; @@ -65,6 +67,15 @@ public MergeOnReadInputSplit( this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.mergeType = mergeType; this.instantRange = Option.ofNullable(instantRange); + this.fileId = fileId; + } + + public String getFileId() { + return fileId; + } + + public void setFileId(String fileId) { + this.fileId = fileId; } public Option getBasePath() {