-
Notifications
You must be signed in to change notification settings - Fork 2.5k
HUDI-4044 When reading data from flink-hudi to external storage, the … #5516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
75ef37e
f88f4a6
6b98aa1
0045f11
0cfbdfc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| package org.apache.hudi.table; | ||
|
|
||
| import org.apache.flink.api.java.functions.KeySelector; | ||
| import org.apache.hudi.avro.HoodieAvroUtils; | ||
| import org.apache.hudi.common.model.BaseFile; | ||
| import org.apache.hudi.common.model.HoodieLogFile; | ||
|
|
@@ -181,7 +182,8 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) | |
| OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); | ||
| SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) | ||
| .setParallelism(1) | ||
| .transform("split_reader", typeInfo, factory) | ||
| .keyBy((KeySelector<MergeOnReadInputSplit, String>) mos -> mos.getFileId()) | ||
| .transform("split_reader", typeInfo, factory) | ||
|
||
| .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); | ||
| return new DataStreamSource<>(source); | ||
| } else { | ||
|
|
@@ -316,7 +318,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() { | |
| .map(logFile -> logFile.getPath().toString()) | ||
| .collect(Collectors.toList())); | ||
| return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, | ||
| metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); | ||
| metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); | ||
| }).collect(Collectors.toList())) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| .flatMap(Collection::stream) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,13 +18,11 @@ | |
|
|
||
| package org.apache.hudi.table.format.mor; | ||
|
|
||
| import org.apache.flink.core.io.InputSplit; | ||
| import org.apache.hudi.common.table.log.InstantRange; | ||
| import org.apache.hudi.common.util.Option; | ||
|
|
||
| import org.apache.flink.core.io.InputSplit; | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes for the import is not necessary. |
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
|
|
@@ -43,20 +41,25 @@ public class MergeOnReadInputSplit implements InputSplit { | |
| private final long maxCompactionMemoryInBytes; | ||
| private final String mergeType; | ||
| private final Option<InstantRange> instantRange; | ||
| private String fileId; | ||
|
|
||
|
|
||
| // for streaming reader to record the consumed offset, | ||
| // which is the start of next round reading. | ||
| private long consumed = NUM_NO_CONSUMPTION; | ||
|
|
||
|
|
||
|
|
||
| public MergeOnReadInputSplit( | ||
| int splitNum, | ||
| @Nullable String basePath, | ||
| Option<List<String>> logPaths, | ||
| String latestCommit, | ||
| String tablePath, | ||
| long maxCompactionMemoryInBytes, | ||
| String mergeType, | ||
| @Nullable InstantRange instantRange) { | ||
| int splitNum, | ||
| @Nullable String basePath, | ||
| Option<List<String>> logPaths, | ||
| String latestCommit, | ||
|
||
| String tablePath, | ||
| long maxCompactionMemoryInBytes, | ||
| String mergeType, | ||
| @Nullable InstantRange instantRange, | ||
| String fileId) { | ||
| this.splitNum = splitNum; | ||
| this.basePath = Option.ofNullable(basePath); | ||
| this.logPaths = logPaths; | ||
|
|
@@ -65,6 +68,15 @@ public MergeOnReadInputSplit( | |
| this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; | ||
| this.mergeType = mergeType; | ||
| this.instantRange = Option.ofNullable(instantRange); | ||
| this.fileId = fileId; | ||
| } | ||
|
|
||
| public String getFileId() { | ||
| return fileId; | ||
| } | ||
|
|
||
| public void setFileId(String fileId) { | ||
| this.fileId = fileId; | ||
| } | ||
|
|
||
| public Option<String> getBasePath() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instantRange,fileSlice.getFileId()->instantRange, fileSlice.getFileId()