From 75ef37eb146b3be501729fa952777cf8439acdd0 Mon Sep 17 00:00:00 2001 From: aliceyyan Date: Fri, 6 May 2022 16:50:56 +0800 Subject: [PATCH 1/5] HUDI-4044 When reading data from flink-hudi to external storage, the result is incorrect --- .../hudi/source/IncrementalInputSplits.java | 2 +- .../apache/hudi/table/HoodieTableSource.java | 6 ++-- .../format/mor/MergeOnReadInputSplit.java | 34 +++++++++++++++++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 02e0e253cf577..6e40fae016393 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -226,7 +226,7 @@ public Result inputSplits( String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange,fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index d00eb3e3ec700..fff116a0fc37d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -181,7 +182,8 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) - .transform("split_reader", typeInfo, factory) + .keyBy((KeySelector) mos -> String.valueOf(mos.getFileId())) + .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { @@ -316,7 +318,7 @@ private List buildFileIndex() { .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null,fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 156622c303519..5faf50d5f4044 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -18,13 +18,11 @@ package org.apache.hudi.table.format.mor; +import org.apache.flink.core.io.InputSplit; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; -import org.apache.flink.core.io.InputSplit; - import javax.annotation.Nullable; - import java.util.List; /** @@ -43,6 +41,8 @@ public class MergeOnReadInputSplit implements InputSplit { private final long maxCompactionMemoryInBytes; private final String mergeType; private final Option instantRange; + private String fileId; + // for streaming reader to record the consumed offset, // which is the start of next round reading. @@ -67,6 +67,34 @@ public MergeOnReadInputSplit( this.instantRange = Option.ofNullable(instantRange); } + public MergeOnReadInputSplit( + int splitNum, + @Nullable String basePath, + Option> logPaths, + String latestCommit, + String tablePath, + long maxCompactionMemoryInBytes, + String mergeType, + @Nullable InstantRange instantRange,String fileId ) { + this.splitNum = splitNum; + this.basePath = Option.ofNullable(basePath); + this.logPaths = logPaths; + this.latestCommit = latestCommit; + this.tablePath = tablePath; + this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + this.mergeType = mergeType; + this.instantRange = Option.ofNullable(instantRange); + this.fileId = fileId; + } + + public String getFileId() { + return fileId; + } + + public void setFileId(String fileId) { + this.fileId = fileId; + } + public Option getBasePath() { return basePath; } From f88f4a68fefa4a774768ad2671bb94ab785ca93e Mon Sep 17 00:00:00 2001 From: aliceyyan Date: Mon, 9 May 2022 14:19:29 +0800 Subject: [PATCH 2/5] HUDI-4044 When reading data from flink-hudi to external storage, the result is incorrect --- .../hudi/source/IncrementalInputSplits.java | 2 +- .../apache/hudi/table/HoodieTableSource.java | 4 ++-- .../format/mor/MergeOnReadInputSplit.java | 22 +++---------------- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 6e40fae016393..94eeefcd36df3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -226,7 +226,7 @@ public Result inputSplits( String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange,fileSlice.getFileId()); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index fff116a0fc37d..335addcaccf71 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -182,7 +182,7 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) - .keyBy((KeySelector) mos -> String.valueOf(mos.getFileId())) + .keyBy((KeySelector) mos -> mos.getFileId()) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); @@ -318,7 +318,7 @@ private List buildFileIndex() { .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null,fileSlice.getFileId()); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 5faf50d5f4044..3b28ebaeb84f6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -48,24 +48,7 @@ public class MergeOnReadInputSplit implements InputSplit { // which is the start of next round reading. private long consumed = NUM_NO_CONSUMPTION; - public MergeOnReadInputSplit( - int splitNum, - @Nullable String basePath, - Option> logPaths, - String latestCommit, - String tablePath, - long maxCompactionMemoryInBytes, - String mergeType, - @Nullable InstantRange instantRange) { - this.splitNum = splitNum; - this.basePath = Option.ofNullable(basePath); - this.logPaths = logPaths; - this.latestCommit = latestCommit; - this.tablePath = tablePath; - this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; - this.mergeType = mergeType; - this.instantRange = Option.ofNullable(instantRange); - } + public MergeOnReadInputSplit( int splitNum, @@ -75,7 +58,8 @@ public MergeOnReadInputSplit( String tablePath, long maxCompactionMemoryInBytes, String mergeType, - @Nullable InstantRange instantRange,String fileId ) { + @Nullable InstantRange instantRange, + String fileId) { this.splitNum = splitNum; this.basePath = Option.ofNullable(basePath); this.logPaths = logPaths; From 6b98aa1d5c1cd6cee2652062d957b4bb451fadb2 Mon Sep 17 00:00:00 2001 From: aliceyyan Date: Mon, 9 May 2022 15:52:24 +0800 Subject: [PATCH 3/5] HUDI-4044 When reading data from flink-hudi to external storage, the result is incorrect --- .../apache/hudi/table/HoodieTableSource.java | 4 ++-- .../format/mor/MergeOnReadInputSplit.java | 22 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 335addcaccf71..0a9639e74dc67 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -182,8 +182,8 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) - .keyBy((KeySelector) mos -> mos.getFileId()) - .transform("split_reader", typeInfo, factory) + .keyBy(inputSplit -> inputSplit.getFileId()) + .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 3b28ebaeb84f6..df793ce21df51 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -18,11 +18,13 @@ package org.apache.hudi.table.format.mor; -import org.apache.flink.core.io.InputSplit; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; +import org.apache.flink.core.io.InputSplit; + import javax.annotation.Nullable; + import java.util.List; /** @@ -51,15 +53,15 @@ public class MergeOnReadInputSplit implements InputSplit { public MergeOnReadInputSplit( - int splitNum, - @Nullable String basePath, - Option> logPaths, - String latestCommit, - String tablePath, - long maxCompactionMemoryInBytes, - String mergeType, - @Nullable InstantRange instantRange, - String fileId) { + int splitNum, + @Nullable String basePath, + Option> logPaths, + String latestCommit, + String tablePath, + long maxCompactionMemoryInBytes, + String mergeType, + @Nullable InstantRange instantRange, + String fileId) { this.splitNum = splitNum; this.basePath = Option.ofNullable(basePath); this.logPaths = logPaths; From 0045f11a810f5853e8d4607167216254fb5b4b1d Mon Sep 17 00:00:00 2001 From: aliceyyan Date: Mon, 9 May 2022 15:54:04 +0800 Subject: [PATCH 4/5] HUDI-4044 When reading data from flink-hudi to external storage, the result is incorrect --- .../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 0a9639e74dc67..da4abf0a96e60 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -18,7 +18,6 @@ package org.apache.hudi.table; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieLogFile; From 0cfbdfcb5fa8bbeb06c256e6efec59424f11b8d4 Mon Sep 17 00:00:00 2001 From: aliceyyan Date: Mon, 9 May 2022 15:56:59 +0800 Subject: [PATCH 5/5] HUDI-4044 When reading data from flink-hudi to external storage, the result is incorrect --- .../apache/hudi/table/format/mor/MergeOnReadInputSplit.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index df793ce21df51..cde646e41f035 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -45,13 +45,10 @@ public class MergeOnReadInputSplit implements InputSplit { private final Option instantRange; private String fileId; - // for streaming reader to record the consumed offset, // which is the start of next round reading. private long consumed = NUM_NO_CONSUMPTION; - - public MergeOnReadInputSplit( int splitNum, @Nullable String basePath,