diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3a2e6152d79a5..c736821ce1c0a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -197,12 +197,18 @@ private FlinkOptions() { .withDescription("Check interval for streaming read of SECOND, default 1 minute"); public static final String START_COMMIT_EARLIEST = "earliest"; - public static final ConfigOption READ_STREAMING_START_COMMIT = ConfigOptions - .key("read.streaming.start-commit") + public static final ConfigOption READ_START_COMMIT = ConfigOptions + .key("read.start-commit") .stringType() .noDefaultValue() - .withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', " - + "by default reading from the latest instant"); + .withDescription("Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', " + + "by default reading from the latest instant for streaming read"); + + public static final ConfigOption READ_END_COMMIT = ConfigOptions + .key("read.end-commit") + .stringType() + .noDefaultValue() + .withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'"); // ------------------------------------------------------------------------ // Write Options diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index be02fc404a6f4..f1abf4b756220 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import javax.annotation.Nullable; + import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -36,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; /** * A file index which supports listing files efficiently through metadata table. @@ -137,11 +140,29 @@ public void reset() { this.partitionPaths = null; } + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + + /** + * Sets up explicit partition paths for pruning. + */ + public void setPartitionPaths(@Nullable Set partitionPaths) { + if (partitionPaths != null) { + this.partitionPaths = new ArrayList<>(partitionPaths); + } + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- - private List getOrBuildPartitionPaths() { + /** + * Returns all the relative partition paths. + * + *

The partition paths are cached once invoked. + */ + public List getOrBuildPartitionPaths() { if (this.partitionPaths != null) { return this.partitionPaths; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java new file mode 100644 index 0000000000000..7d319203d1fa3 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.partitioner.profile.WriteProfiles; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; + +/** + * Utilities to generate incremental input splits {@link MergeOnReadInputSplit}. + * The input splits are used for streaming and incremental read. + * + *

How to generate the input splits: + *

    + *
  1. first fetch all the commit metadata for the incremental instants;
  2. + *
  3. resolve the incremental commit file paths;
  4. + *
  5. filter the full file paths by required partitions;
  6. + *
  7. use the file paths from #step 3 as the back-up of the filesystem view.
  8. + *
+ */ +public class IncrementalInputSplits { + private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class); + private final Configuration conf; + private final Path path; + private final long maxCompactionMemoryInBytes; + // for partition pruning + private final Set requiredPartitions; + + private IncrementalInputSplits( + Configuration conf, + Path path, + long maxCompactionMemoryInBytes, + @Nullable Set requiredPartitions) { + this.conf = conf; + this.path = path; + this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + this.requiredPartitions = requiredPartitions; + } + + /** + * Returns the builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Returns the incremental input splits. + * + * @param metaClient The meta client + * @param hadoopConf The hadoop configuration + * @return The list of incremental input splits or empty if there are no new instants + */ + public Result inputSplits( + HoodieTableMetaClient metaClient, + org.apache.hadoop.conf.Configuration hadoopConf) { + return inputSplits(metaClient, hadoopConf, null); + } + + /** + * Returns the incremental input splits. + * + * @param metaClient The meta client + * @param hadoopConf The hadoop configuration + * @param issuedInstant The last issued instant, only valid in streaming read + * @return The list of incremental input splits or empty if there are no new instants + */ + public Result inputSplits( + HoodieTableMetaClient metaClient, + org.apache.hadoop.conf.Configuration hadoopConf, + String issuedInstant) { + metaClient.reloadActiveTimeline(); + HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); + if (commitTimeline.empty()) { + LOG.warn("No splits found for the table under path " + path); + return Result.EMPTY; + } + List instants = filterInstantsWithRange(commitTimeline, issuedInstant); + // get the latest instant that satisfies condition + final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); + final InstantRange instantRange; + if (instantToIssue != null) { + if (issuedInstant != null) { + // the streaming reader may record the last issued instant, if the issued instant is present, + // the instant range should be: (issued instant, the latest instant]. + instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(), + InstantRange.RangeType.OPEN_CLOSE); + } else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) { + // first time consume and has a start commit + final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT); + instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST) + ? null + : InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); + } else { + // first time consume and no start commit, consumes the latest incremental data set. + instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(), + InstantRange.RangeType.CLOSE_CLOSE); + } + } else { + LOG.info("No new instant found for the table under path " + path + ", skip reading"); + return Result.EMPTY; + } + + String tableName = conf.getString(FlinkOptions.TABLE_NAME); + List activeMetadataList = instants.stream() + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); + List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); + if (archivedMetadataList.size() > 0) { + LOG.warn("" + + "--------------------------------------------------------------------------------\n" + + "---------- caution: the reader has fall behind too much from the writer,\n" + + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" + + "--------------------------------------------------------------------------------"); + } + List metadataList = archivedMetadataList.size() > 0 + ? mergeList(activeMetadataList, archivedMetadataList) + : activeMetadataList; + + Set writePartitions = getWritePartitionPaths(metadataList); + // apply partition push down + if (this.requiredPartitions != null) { + writePartitions = writePartitions.stream() + .filter(this.requiredPartitions::contains).collect(Collectors.toSet()); + } + FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList); + if (fileStatuses.length == 0) { + LOG.warn("No files found for reading in user provided path."); + return Result.EMPTY; + } + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); + final String endInstant = instantToIssue.getTimestamp(); + final AtomicInteger cnt = new AtomicInteger(0); + final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); + List inputSplits = writePartitions.stream() + .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant) + .map(fileSlice -> { + Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), + basePath, logPaths, endInstant, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + }).collect(Collectors.toList())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + return Result.instance(inputSplits, endInstant); + } + + /** + * Returns the archived metadata in case the reader consumes untimely or it wants + * to read from the earliest. + * + *

Note: should improve it with metadata table when the metadata table is stable enough. + * + * @param metaClient The meta client + * @param instantRange The instant range to filter the timeline instants + * @param commitTimeline The commit timeline + * @param tableName The table name + * @return the list of archived metadata, or empty if there is no need to read the archived timeline + */ + private List getArchivedMetadata( + HoodieTableMetaClient metaClient, + InstantRange instantRange, + HoodieTimeline commitTimeline, + String tableName) { + if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) { + // read the archived metadata if: + // 1. the start commit is 'earliest'; + // 2. the start instant is archived. + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); + if (!archivedCompleteTimeline.empty()) { + final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp(); + Stream instantStream = archivedCompleteTimeline.getInstants(); + if (instantRange != null) { + archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs); + instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); + } else { + final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp(); + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + } + return instantStream + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); + } + } + return Collections.emptyList(); + } + + /** + * Returns the instants with a given issuedInstant to start from. + * + * @param commitTimeline The completed commits timeline + * @param issuedInstant The last issued instant that has already been delivered to downstream + * @return the filtered hoodie instants + */ + private List filterInstantsWithRange( + HoodieTimeline commitTimeline, + final String issuedInstant) { + HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); + if (issuedInstant != null) { + // returns early for streaming mode + return completedTimeline.getInstants() + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) + .collect(Collectors.toList()); + } + + Stream instantStream = completedTimeline.getInstants(); + + if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() + && !this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) { + final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT); + instantStream = instantStream + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit)); + } + if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) { + final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT); + instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit)); + } + return instantStream.collect(Collectors.toList()); + } + + /** + * Returns all the incremental write partition paths as a set with the given commits metadata. + * + * @param metadataList The commits metadata + * @return the partition path set + */ + private Set getWritePartitionPaths(List metadataList) { + return metadataList.stream() + .map(HoodieCommitMetadata::getWritePartitionPaths) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + private static List mergeList(List list1, List list2) { + List merged = new ArrayList<>(list1); + merged.addAll(list2); + return merged; + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Represents a result of calling {@link #inputSplits}. + */ + public static class Result { + private final List inputSplits; // input splits + private final String endInstant; // end instant to consume to + + public static final Result EMPTY = instance(Collections.emptyList(), ""); + + public boolean isEmpty() { + return this.inputSplits.size() == 0; + } + + public List getInputSplits() { + return this.inputSplits; + } + + public String getEndInstant() { + return this.endInstant; + } + + private Result(List inputSplits, String endInstant) { + this.inputSplits = inputSplits; + this.endInstant = endInstant; + } + + public static Result instance(List inputSplits, String endInstant) { + return new Result(inputSplits, endInstant); + } + } + + /** + * Builder for {@link IncrementalInputSplits}. + */ + public static class Builder { + private Configuration conf; + private Path path; + private long maxCompactionMemoryInBytes; + // for partition pruning + private Set requiredPartitions; + + public Builder() { + } + + public Builder conf(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder path(Path path) { + this.path = path; + return this; + } + + public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) { + this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + return this; + } + + public Builder requiredPartitions(@Nullable Set requiredPartitions) { + this.requiredPartitions = requiredPartitions; + return this; + } + + public IncrementalInputSplits build() { + return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), + this.maxCompactionMemoryInBytes, this.requiredPartitions); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index c5610d2f55221..bfd745288ec32 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -18,19 +18,9 @@ package org.apache.hudi.source; -import org.apache.hudi.common.model.BaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.InstantRange; -import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -45,24 +35,15 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; /** * This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit} @@ -112,21 +93,21 @@ public class StreamReadMonitoringFunction private HoodieTableMetaClient metaClient; - private final long maxCompactionMemoryInBytes; - - // for partition pruning - private final Set requiredPartitionPaths; + private final IncrementalInputSplits incrementalInputSplits; public StreamReadMonitoringFunction( Configuration conf, Path path, long maxCompactionMemoryInBytes, - Set requiredPartitionPaths) { + @Nullable Set requiredPartitionPaths) { this.conf = conf; this.path = path; this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); - this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; - this.requiredPartitionPaths = requiredPartitionPaths; + this.incrementalInputSplits = IncrementalInputSplits.builder() + .conf(conf) + .path(path) + .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) + .requiredPartitions(requiredPartitionPaths).build(); } @Override @@ -208,98 +189,23 @@ public void monitorDirAndForwardSplits(SourceContext cont // table does not exist return; } - metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); - if (commitTimeline.empty()) { - LOG.warn("No splits found for the table under path " + path); - return; - } - List instants = filterInstantsWithStart(commitTimeline, this.issuedInstant); - // get the latest instant that satisfies condition - final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); - final InstantRange instantRange; - if (instantToIssue != null) { - if (this.issuedInstant != null) { - // had already consumed an instant - instantRange = InstantRange.getInstance(this.issuedInstant, instantToIssue.getTimestamp(), - InstantRange.RangeType.OPEN_CLOSE); - } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { - // first time consume and has a start commit - final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT); - instantRange = specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST) - ? null - : InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); - } else { - // first time consume and no start commit, consumes the latest incremental data set. - instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(), - InstantRange.RangeType.CLOSE_CLOSE); - } - } else { - LOG.info("No new instant found for the table under path " + path + ", skip reading"); - return; - } - // generate input split: - // 1. first fetch all the commit metadata for the incremental instants; - // 2. filter the relative partition paths - // 3. filter the full file paths - // 4. use the file paths from #step 3 as the back-up of the filesystem view - - String tableName = conf.getString(FlinkOptions.TABLE_NAME); - List activeMetadataList = instants.stream() - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); - List archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName); - if (archivedMetadataList.size() > 0) { - LOG.warn("" - + "--------------------------------------------------------------------------------\n" - + "---------- caution: the reader has fall behind too much from the writer,\n" - + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" - + "--------------------------------------------------------------------------------"); - } - List metadataList = archivedMetadataList.size() > 0 - ? mergeList(activeMetadataList, archivedMetadataList) - : activeMetadataList; - - Set writePartitions = getWritePartitionPaths(metadataList); - // apply partition push down - if (this.requiredPartitionPaths.size() > 0) { - writePartitions = writePartitions.stream() - .filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet()); - } - FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList); - if (fileStatuses.length == 0) { - LOG.warn("No files found for reading in user provided path."); + IncrementalInputSplits.Result result = + incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); + if (result.isEmpty()) { + // no new instants, returns early return; } - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); - final String commitToIssue = instantToIssue.getTimestamp(); - final AtomicInteger cnt = new AtomicInteger(0); - final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); - List inputSplits = writePartitions.stream() - .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue) - .map(fileSlice -> { - Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .collect(Collectors.toList())); - String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), - basePath, logPaths, commitToIssue, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); - }).collect(Collectors.toList())) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - for (MergeOnReadInputSplit split : inputSplits) { + for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); } // update the issues instant time - this.issuedInstant = commitToIssue; + this.issuedInstant = result.getEndInstant(); LOG.info("" - + "------------------------------------------------------------\n" - + "---------- consumed to instant: {}\n" - + "------------------------------------------------------------", - commitToIssue); + + "------------------------------------------------------------\n" + + "---------- consumed to instant: {}\n" + + "------------------------------------------------------------", + this.issuedInstant); } @Override @@ -343,87 +249,4 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { this.instantState.add(this.issuedInstant); } } - - /** - * Returns the archived metadata in case the reader consumes untimely or it wants - * to read from the earliest. - * - *

Note: should improve it with metadata table when the metadata table is stable enough. - * - * @param instantRange The instant range to filter the timeline instants - * @param commitTimeline The commit timeline - * @param tableName The table name - * @return the list of archived metadata, or empty if there is no need to read the archived timeline - */ - private List getArchivedMetadata( - InstantRange instantRange, - HoodieTimeline commitTimeline, - String tableName) { - if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) { - // read the archived metadata if: - // 1. the start commit is 'earliest'; - // 2. the start instant is archived. - HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); - HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); - if (!archivedCompleteTimeline.empty()) { - final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp(); - Stream instantStream = archivedCompleteTimeline.getInstants(); - if (instantRange != null) { - archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs); - instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); - } else { - final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp(); - archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); - } - return instantStream - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); - } - } - return Collections.emptyList(); - } - - /** - * Returns the instants with a given issuedInstant to start from. - * - * @param commitTimeline The completed commits timeline - * @param issuedInstant The last issued instant that has already been delivered to downstream - * @return the filtered hoodie instants - */ - private List filterInstantsWithStart( - HoodieTimeline commitTimeline, - final String issuedInstant) { - HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); - if (issuedInstant != null) { - return completedTimeline.getInstants() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) - .collect(Collectors.toList()); - } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent() - && !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) { - String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); - return completedTimeline.getInstants() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) - .collect(Collectors.toList()); - } else { - return completedTimeline.getInstants().collect(Collectors.toList()); - } - } - - /** - * Returns all the incremental write partition paths as a set with the given commits metadata. - * - * @param metadataList The commits metadata - * @return the partition path set - */ - private Set getWritePartitionPaths(List metadataList) { - return metadataList.stream() - .map(HoodieCommitMetadata::getWritePartitionPaths) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - - private static List mergeList(List list1, List list2) { - List merged = new ArrayList<>(list1); - merged.addAll(list2); - return merged; - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index cf1cbd58f8adc..a2d0960770e9c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -155,6 +155,8 @@ private static void setupConfOptions( setupCompactionOptions(conf); // hive options setupHiveOptions(conf); + // read options + setupReadOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); } @@ -270,6 +272,16 @@ private static void setupHiveOptions(Configuration conf) { } } + /** + * Sets up the read options from the table definition. + */ + private static void setupReadOptions(Configuration conf) { + if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) { + conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 43743fc64319c..0494143a1a01b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -19,7 +19,6 @@ package org.apache.hudi.table; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; @@ -31,6 +30,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; import org.apache.hudi.source.FileIndex; +import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadOperator; import org.apache.hudi.table.format.FilePathUtils; @@ -108,6 +108,9 @@ public class HoodieTableSource implements private static final int NO_LIMIT_CONSTANT = -1; + private static final InputFormat EMPTY_INPUT_FORMAT = + new CollectionInputFormat<>(Collections.emptyList(), null); + private final transient org.apache.hadoop.conf.Configuration hadoopConf; private final transient HoodieTableMetaClient metaClient; private final long maxCompactionMemoryInBytes; @@ -220,7 +223,7 @@ public String asSummaryString() { public Result applyFilters(List filters) { this.filters = new ArrayList<>(filters); // refuse all the filters now - return Result.of(Collections.emptyList(), new ArrayList<>(filters)); + return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters)); } @Override @@ -256,8 +259,8 @@ private DataType getProducedDataType() { DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]); return DataTypes.ROW(Arrays.stream(this.requiredPos) - .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i])) - .toArray(DataTypes.Field[]::new)) + .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i])) + .toArray(DataTypes.Field[]::new)) .bridgedTo(RowData.class); } @@ -268,16 +271,21 @@ private List> getOrFetchPartitions() { return requiredPartitions; } + @Nullable private Set getRequiredPartitionPaths() { if (this.requiredPartitions == null) { - return Collections.emptySet(); + // returns null for non partition pruning + return null; } return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); } - private List buildFileIndex(Path[] paths) { - if (paths.length == 0) { + private List buildFileIndex() { + Set requiredPartitionPaths = getRequiredPartitionPaths(); + fileIndex.setPartitionPaths(requiredPartitionPaths); + List relPartitionPaths = fileIndex.getOrBuildPartitionPaths(); + if (relPartitionPaths.size() == 0) { return Collections.emptyList(); } FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); @@ -292,19 +300,17 @@ private List buildFileIndex(Path[] paths) { final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); final AtomicInteger cnt = new AtomicInteger(0); // generates one input split for each file group - return Arrays.stream(paths).map(partitionPath -> { - String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath); - return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) - .map(fileSlice -> { - String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); - Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .collect(Collectors.toList())); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); - }).collect(Collectors.toList()); - }) + return relPartitionPaths.stream() + .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) + .map(fileSlice -> { + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); + }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); } @@ -319,16 +325,6 @@ private List buildFileIndex(Path[] paths) { } private InputFormat getBatchInputFormat() { - // When this table has no partition, just return an empty source. - if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) { - return new CollectionInputFormat<>(Collections.emptyList(), null); - } - - final Path[] paths = getReadPaths(); - if (paths.length == 0) { - return new CollectionInputFormat<>(Collections.emptyList(), null); - } - final Schema tableAvroSchema = getTableAvroSchema(); final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); @@ -340,62 +336,37 @@ private List buildFileIndex(Path[] paths) { final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); switch (tableType) { case MERGE_ON_READ: - final List inputSplits = buildFileIndex(paths); + final List inputSplits = buildFileIndex(); if (inputSplits.size() == 0) { // When there is no input splits, just return an empty source. LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); - return new CollectionInputFormat<>(Collections.emptyList(), null); + return EMPTY_INPUT_FORMAT; } - final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( - rowType, - requiredRowType, - tableAvroSchema.toString(), - AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - inputSplits, - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); - return MergeOnReadInputFormat.builder() - .config(this.conf) - .paths(FilePathUtils.toFlinkPaths(paths)) - .tableState(hoodieTableState) - // use the explicit fields data type because the AvroSchemaConverter - // is not very stable. - .fieldTypes(rowDataType.getChildren()) - .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) - .limit(this.limit) - .emitDelete(false) - .build(); + return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, + rowDataType, inputSplits, false); case COPY_ON_WRITE: - FileInputFormat format = new CopyOnWriteInputFormat( - FilePathUtils.toFlinkPaths(paths), - this.schema.getColumnNames().toArray(new String[0]), - this.schema.getColumnDataTypes().toArray(new DataType[0]), - this.requiredPos, - this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), - this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value - getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) - ); - format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); - return format; + return baseFileOnlyInputFormat(); default: throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); } case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED: - FileInputFormat format = new CopyOnWriteInputFormat( - FilePathUtils.toFlinkPaths(paths), - this.schema.getColumnNames().toArray(new String[0]), - this.schema.getColumnDataTypes().toArray(new DataType[0]), - this.requiredPos, - "default", - this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value - getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) - ); - format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); - return format; + return baseFileOnlyInputFormat(); + case FlinkOptions.QUERY_TYPE_INCREMENTAL: + IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder() + .conf(conf).path(FilePathUtils.toFlinkPath(path)) + .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) + .requiredPartitions(getRequiredPartitionPaths()).build(); + final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf); + if (result.isEmpty()) { + // When there is no input splits, just return an empty source. + LOG.warn("No input splits generate for incremental read, returns empty collection instead"); + return new CollectionInputFormat<>(Collections.emptyList(), null); + } + return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, + rowDataType, result.getInputSplits(), false); default: - String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType, - FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); + String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType, + FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL); throw new HoodieException(errMsg); } } @@ -408,56 +379,62 @@ private List buildFileIndex(Path[] paths) { final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); - org.apache.flink.core.fs.Path[] paths = new org.apache.flink.core.fs.Path[0]; if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); - switch (tableType) { - case MERGE_ON_READ: - final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( - rowType, - requiredRowType, - tableAvroSchema.toString(), - AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - Collections.emptyList(), - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); - return MergeOnReadInputFormat.builder() - .config(this.conf) - .paths(paths) - .tableState(hoodieTableState) - // use the explicit fields data type because the AvroSchemaConverter - // is not very stable. - .fieldTypes(rowDataType.getChildren()) - .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) - .limit(this.limit) - .emitDelete(true) - .build(); - case COPY_ON_WRITE: - final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState( - rowType, - requiredRowType, - tableAvroSchema.toString(), - AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - Collections.emptyList(), - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); - return MergeOnReadInputFormat.builder() - .config(this.conf) - .paths(paths) - .tableState(hoodieTableState2) - // use the explicit fields data type because the AvroSchemaConverter - // is not very stable. - .fieldTypes(rowDataType.getChildren()) - .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) - .limit(this.limit) - .build(); - default: - throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); - } + boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; + return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, + rowDataType, Collections.emptyList(), emitDelete); } String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType, FlinkOptions.QUERY_TYPE_SNAPSHOT); throw new HoodieException(errMsg); } + private MergeOnReadInputFormat mergeOnReadInputFormat( + RowType rowType, + RowType requiredRowType, + Schema tableAvroSchema, + DataType rowDataType, + List inputSplits, + boolean emitDelete) { + final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( + rowType, + requiredRowType, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(requiredRowType).toString(), + inputSplits, + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .tableState(hoodieTableState) + // use the explicit fields' data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .emitDelete(emitDelete) + .build(); + } + + private InputFormat baseFileOnlyInputFormat() { + final Path[] paths = getReadPaths(); + if (paths.length == 0) { + return EMPTY_INPUT_FORMAT; + } + FileInputFormat format = new CopyOnWriteInputFormat( + FilePathUtils.toFlinkPaths(paths), + this.schema.getColumnNames().toArray(new String[0]), + this.schema.getColumnDataTypes().toArray(new DataType[0]), + this.requiredPos, + this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), + this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value + getParquetConf(this.conf, this.hadoopConf), + this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) + ); + format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); + return format; + } + private Schema inferSchemaFromDdl() { Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType()); return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 2042b96739ef7..e3a8eee9292db 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -45,7 +45,6 @@ import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -85,8 +84,6 @@ public class MergeOnReadInputFormat private transient org.apache.hadoop.conf.Configuration hadoopConf; - private Path[] paths; - private final MergeOnReadTableState tableState; /** @@ -134,14 +131,12 @@ public class MergeOnReadInputFormat private MergeOnReadInputFormat( Configuration conf, - Path[] paths, MergeOnReadTableState tableState, List fieldTypes, String defaultPartName, long limit, boolean emitDelete) { this.conf = conf; - this.paths = paths; this.tableState = tableState; this.fieldNames = tableState.getRowType().getFieldNames(); this.fieldTypes = fieldTypes; @@ -165,7 +160,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.hadoopConf = StreamerUtil.getHadoopConf(); if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { - if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { + if (split.getInstantRange() != null) { // base file only with commit time filtering this.iterator = new BaseFileOnlyFilteringIterator( split.getInstantRange(), @@ -212,16 +207,8 @@ public void open(MergeOnReadInputSplit split) throws IOException { @Override public void configure(Configuration configuration) { - if (this.paths.length == 0) { - // file path was not specified yet. Try to set it from the parameters. - String filePath = configuration.getString(FlinkOptions.PATH, null); - if (filePath == null) { - throw new IllegalArgumentException("File path was not specified in input format or configuration."); - } else { - this.paths = new Path[] {new Path(filePath)}; - } - } - // may supports nested files in the future. + // no operation + // may support nested files in the future. } @Override @@ -750,7 +737,6 @@ private Option mergeRowWithLog( */ public static class Builder { private Configuration conf; - private Path[] paths; private MergeOnReadTableState tableState; private List fieldTypes; private String defaultPartName; @@ -762,11 +748,6 @@ public Builder config(Configuration conf) { return this; } - public Builder paths(Path[] paths) { - this.paths = paths; - return this; - } - public Builder tableState(MergeOnReadTableState tableState) { this.tableState = tableState; return this; @@ -793,8 +774,8 @@ public Builder emitDelete(boolean emitDelete) { } public MergeOnReadInputFormat build() { - return new MergeOnReadInputFormat(conf, paths, tableState, - fieldTypes, defaultPartName, limit, emitDelete); + return new MergeOnReadInputFormat(conf, tableState, fieldTypes, + defaultPartName, limit, emitDelete); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index f229f2de8a8f2..334df5961314d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -88,4 +88,18 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception { assertThat(fileStatuses.length, is(1)); assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFileListingEmptyTable(boolean enableMetadata) { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + List partitionKeys = Collections.singletonList("partition"); + List> partitions = fileIndex.getPartitions(partitionKeys, "default", false); + assertThat(partitions.size(), is(0)); + + FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); + assertThat(fileStatuses.length, is(0)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index d13f68319d9b4..3687e9d7cee4d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -144,7 +144,7 @@ public void testConsumeFromSpecifiedCommit() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); - conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit); + conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { harness.setup(); @@ -175,7 +175,7 @@ public void testConsumeFromEarliestCommit() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); - conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); + conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { harness.setup(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 233e6fa7eb04a..911c68511ccee 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.table.format.mor.MergeOnReadTableState; @@ -45,7 +44,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -263,10 +261,8 @@ private OneInputStreamOperatorTestHarness create AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList(), new String[0]); - Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, hadoopConf, partitionKeys); MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() .config(conf) - .paths(FilePathUtils.toFlinkPaths(paths)) .tableState(hoodieTableState) .fieldTypes(rowDataType.getChildren()) .defaultPartName("default").limit(1000L) diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 9d0bcabac6aaa..db7111b1f795f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -113,7 +113,7 @@ void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) throws .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit) + .option(FlinkOptions.READ_START_COMMIT, firstCommit) .end(); streamTableEnv.executeSql(hoodieTableDDL); List rows = execSelectSql(streamTableEnv, "select * from t1", 10); @@ -186,7 +186,7 @@ void testStreamReadAppendData(HoodieTableType tableType) throws Exception { .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit) + .option(FlinkOptions.READ_START_COMMIT, specifiedCommit) .end(); streamTableEnv.executeSql(createHoodieTable2); List rows = execSelectSql(streamTableEnv, "select * from t2", 10); @@ -289,7 +289,7 @@ void testStreamReadWithDeletes() throws Exception { .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) - .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit) + .option(FlinkOptions.READ_START_COMMIT, latestCommit) .option(FlinkOptions.CHANGELOG_ENABLED, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -343,7 +343,7 @@ void testStreamReadMorTableWithCompactionPlan() throws Exception { .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) .option(FlinkOptions.READ_AS_STREAMING, true) - .option(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST) + .option(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST) .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) // close the async compaction .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false) @@ -879,6 +879,33 @@ void testWriteReadDecimals() { assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]"); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testIncrementalRead(HoodieTableType tableType) throws Exception { + TableEnvironment tableEnv = batchTableEnv; + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); + + // write 3 batches of data set + TestData.writeData(TestData.dataSetInsert(1, 2), conf); + TestData.writeData(TestData.dataSetInsert(3, 4), conf); + TestData.writeData(TestData.dataSetInsert(5, 6), conf); + + String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.READ_START_COMMIT, latestCommit) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, TestData.dataSetInsert(5, 6)); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 1572dd446950f..bbbb49d4277c6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -231,6 +231,32 @@ void testSetupCleaningOptionsForSource() { assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); } + @Test + void testSetupReadOptionsForSource() { + // definition with simple primary key and partition path + ResolvedSchema schema1 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + // set up new retains commits that is less than min archive commits + this.conf.setString(FlinkOptions.READ_END_COMMIT, "123"); + + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); + final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.getString(FlinkOptions.QUERY_TYPE), is(FlinkOptions.QUERY_TYPE_INCREMENTAL)); + + this.conf.removeConfig(FlinkOptions.READ_END_COMMIT); + this.conf.setString(FlinkOptions.READ_START_COMMIT, "123"); + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); + final Configuration conf2 = tableSource2.getConf(); + assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), is(FlinkOptions.QUERY_TYPE_INCREMENTAL)); + } + @Test void testInferAvroSchemaForSink() { // infer the schema if not specified diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index d50a716cf741c..8ee18a9601b2f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -19,7 +19,6 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -31,6 +30,7 @@ import org.apache.flink.table.data.RowData; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.ThrowingSupplier; import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +46,9 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; /** * Test cases for HoodieTableSource. @@ -112,9 +112,9 @@ void testGetInputFormat() throws Exception { inputFormat = tableSource.getInputFormat(); assertThat(inputFormat, is(instanceOf(MergeOnReadInputFormat.class))); conf.setString(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL); - assertThrows(HoodieException.class, - () -> tableSource.getInputFormat(), - "Invalid query type : 'incremental'. Only 'snapshot' is supported now"); + assertDoesNotThrow( + (ThrowingSupplier>) tableSource::getInputFormat, + "Query type: 'incremental' should be supported"); } @Test diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index f83b2d991c1ea..d4692059ced57 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -19,6 +19,8 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieTableSource; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; @@ -44,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; @@ -71,12 +74,7 @@ void beforeEach(HoodieTableType tableType, Map options) throws I options.forEach((key, value) -> conf.setString(key, value)); StreamerUtil.initTableIfNotExists(conf); - this.tableSource = new HoodieTableSource( - TestConfigurations.TABLE_SCHEMA, - new Path(tempFile.getAbsolutePath()), - Collections.singletonList("partition"), - "default", - conf); + this.tableSource = getTableSource(conf); } @ParameterizedTest @@ -385,10 +383,81 @@ void testReadChangesUnMergedMOR() throws Exception { assertThat(actual, is(expected)); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testReadIncrementally(HoodieTableType tableType) throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL); + beforeEach(tableType, options); + + // write another commit to read again + for (int i = 0; i < 6; i += 2) { + List dataset = TestData.dataSetInsert(i + 1, i + 2); + TestData.writeData(dataset, conf); + } + + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath()); + List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants() + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + + assertThat(commits.size(), is(3)); + + // only the start commit + conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1)); + this.tableSource = getTableSource(conf); + InputFormat inputFormat1 = this.tableSource.getInputFormat(); + assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class)); + + List actual1 = readData(inputFormat1); + final List expected1 = TestData.dataSetInsert(3, 4, 5, 6); + TestData.assertRowDataEquals(actual1, expected1); + + // only the start commit: earliest + conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); + this.tableSource = getTableSource(conf); + InputFormat inputFormat2 = this.tableSource.getInputFormat(); + assertThat(inputFormat2, instanceOf(MergeOnReadInputFormat.class)); + + List actual2 = readData(inputFormat2); + final List expected2 = TestData.dataSetInsert(1, 2, 3, 4, 5, 6); + TestData.assertRowDataEquals(actual2, expected2); + + // start and end commit: [start commit, end commit] + conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0)); + conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1)); + this.tableSource = getTableSource(conf); + InputFormat inputFormat3 = this.tableSource.getInputFormat(); + assertThat(inputFormat3, instanceOf(MergeOnReadInputFormat.class)); + + List actual3 = readData(inputFormat3); + final List expected3 = TestData.dataSetInsert(1, 2, 3, 4); + TestData.assertRowDataEquals(actual3, expected3); + + // only the end commit: point in time query + conf.removeConfig(FlinkOptions.READ_START_COMMIT); + conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1)); + this.tableSource = getTableSource(conf); + InputFormat inputFormat4 = this.tableSource.getInputFormat(); + assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class)); + + List actual4 = readData(inputFormat4); + final List expected4 = TestData.dataSetInsert(3, 4); + TestData.assertRowDataEquals(actual4, expected4); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + private HoodieTableSource getTableSource(Configuration conf) { + return new HoodieTableSource( + TestConfigurations.TABLE_SCHEMA, + new Path(tempFile.getAbsolutePath()), + Collections.singletonList("partition"), + "default", + conf); + } + @SuppressWarnings("unchecked, rawtypes") private static List readData(InputFormat inputFormat) throws IOException { InputSplit[] inputSplits = inputFormat.createInputSplits(1); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 3e0afc25a0dbc..b0f7b5f0866b0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -284,6 +284,14 @@ public class TestData { TimestampData.fromEpochMillis(2), StringData.fromString("par1")) ); + public static List dataSetInsert(int... ids) { + List inserts = new ArrayList<>(); + Arrays.stream(ids).forEach(i -> inserts.add( + insertRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(i), StringData.fromString("par1")))); + return inserts; + } + private static Integer toIdSafely(Object id) { if (id == null) { return -1; @@ -424,7 +432,7 @@ public static void assertRowDataEquals(List rows, String expected) { */ public static void assertRowDataEquals(List rows, List expected) { String rowsString = rowDataToString(rows); - assertThat(rowDataToString(expected), is(rowsString)); + assertThat(rowsString, is(rowDataToString(expected))); } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index 4e9ad5123e820..3719705d6fb8d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -28,7 +28,6 @@ import org.apache.flink.core.fs.Path; import java.io.File; -import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -58,6 +57,6 @@ public static String getSplitPartitionPath(MergeOnReadInputSplit split) { public static StreamReadMonitoringFunction getMonitorFunc(Configuration conf) { final String basePath = conf.getString(FlinkOptions.PATH); - return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 1024L, Collections.emptySet()); + return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 1024L, null); } }