diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 7f7ea560e8454..0a49f0f343af2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
import java.io.Serializable;
import java.util.Objects;
@@ -33,19 +34,15 @@ public abstract class InstantRange implements Serializable {
protected final String endInstant;
public InstantRange(String startInstant, String endInstant) {
- this.startInstant = Objects.requireNonNull(startInstant);
- this.endInstant = Objects.requireNonNull(endInstant);
+ this.startInstant = startInstant;
+ this.endInstant = endInstant;
}
- public static InstantRange getInstance(String startInstant, String endInstant, RangeType rangeType) {
- switch (rangeType) {
- case OPEN_CLOSE:
- return new OpenCloseRange(startInstant, endInstant);
- case CLOSE_CLOSE:
- return new CloseCloseRange(startInstant, endInstant);
- default:
- throw new AssertionError();
- }
+ /**
+ * Returns the builder.
+ */
+ public static Builder builder() {
+ return new Builder();
}
public String getStartInstant() {
@@ -65,14 +62,14 @@ public String getEndInstant() {
/**
* Represents a range type.
*/
- public enum RangeType {
+ public static enum RangeType {
OPEN_CLOSE, CLOSE_CLOSE
}
private static class OpenCloseRange extends InstantRange {
public OpenCloseRange(String startInstant, String endInstant) {
- super(startInstant, endInstant);
+ super(Objects.requireNonNull(startInstant), endInstant);
}
@Override
@@ -84,10 +81,31 @@ public boolean isInRange(String instant) {
}
}
+ private static class OpenCloseRangeNullableBoundary extends InstantRange {
+
+ public OpenCloseRangeNullableBoundary(String startInstant, String endInstant) {
+ super(startInstant, endInstant);
+ ValidationUtils.checkArgument(startInstant != null || endInstant != null,
+ "Start and end instants can not both be null");
+ }
+
+ @Override
+ public boolean isInRange(String instant) {
+ if (startInstant == null) {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
+ } else if (endInstant == null) {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant);
+ } else {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant)
+ && HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
+ }
+ }
+ }
+
private static class CloseCloseRange extends InstantRange {
public CloseCloseRange(String startInstant, String endInstant) {
- super(startInstant, endInstant);
+ super(Objects.requireNonNull(startInstant), endInstant);
}
@Override
@@ -98,4 +116,78 @@ public boolean isInRange(String instant) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
}
}
+
+ private static class CloseCloseRangeNullableBoundary extends InstantRange {
+
+ public CloseCloseRangeNullableBoundary(String startInstant, String endInstant) {
+ super(startInstant, endInstant);
+ ValidationUtils.checkArgument(startInstant != null || endInstant != null,
+ "Start and end instants can not both be null");
+ }
+
+ @Override
+ public boolean isInRange(String instant) {
+ if (startInstant == null) {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
+ } else if (endInstant == null) {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
+ } else {
+ return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant)
+ && HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Builder for {@link InstantRange}.
+ */
+ public static class Builder {
+ private String startInstant;
+ private String endInstant;
+ private RangeType rangeType;
+ private boolean nullableBoundary = false;
+
+ private Builder() {
+ }
+
+ public Builder startInstant(String startInstant) {
+ this.startInstant = startInstant;
+ return this;
+ }
+
+ public Builder endInstant(String endInstant) {
+ this.endInstant = endInstant;
+ return this;
+ }
+
+ public Builder rangeType(RangeType rangeType) {
+ this.rangeType = rangeType;
+ return this;
+ }
+
+ public Builder nullableBoundary(boolean nullable) {
+ this.nullableBoundary = nullable;
+ return this;
+ }
+
+ public InstantRange build() {
+ ValidationUtils.checkState(this.rangeType != null, "Range type is required");
+ switch (rangeType) {
+ case OPEN_CLOSE:
+ return nullableBoundary
+ ? new OpenCloseRangeNullableBoundary(startInstant, endInstant)
+ : new OpenCloseRange(startInstant, endInstant);
+ case CLOSE_CLOSE:
+ return nullableBoundary
+ ? new CloseCloseRangeNullableBoundary(startInstant, endInstant)
+ : new CloseCloseRange(startInstant, endInstant);
+ default:
+ throw new AssertionError();
+ }
+ }
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 64bd91f480e32..4ce77afc9f15c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -172,4 +172,12 @@ public static boolean isInsertOverwrite(Configuration conf) {
return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
|| conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
}
+
+ /**
+ * Returns whether the read start commit is specific commit timestamp.
+ */
+ public static boolean isSpecificStartCommit(Configuration conf) {
+ return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
+ && !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index a90a0c056f63f..90c58687db28a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -82,6 +82,29 @@ public static void clean(String path) {
PROFILES.remove(path);
}
+ /**
+ * Returns all the incremental write file statuses with the given commits metadata.
+ *
+ *
Different with {@link #getWritePathsOfInstants}, the files are not filtered by
+ * existence.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commits metadata
+ * @param tableType The table type
+ * @return the file status array
+ */
+ public static FileStatus[] getRawWritePathsOfInstants(
+ Path basePath,
+ Configuration hadoopConf,
+ List metadataList,
+ HoodieTableType tableType) {
+ Map uniqueIdToFileStatus = new HashMap<>();
+ metadataList.forEach(metadata ->
+ uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, hadoopConf, tableType)));
+ return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
+ }
+
/**
* Returns all the incremental write file statuses with the given commits metadata.
*
@@ -103,6 +126,25 @@ public static FileStatus[] getWritePathsOfInstants(
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
}
+ /**
+ * Returns the commit file status info with given metadata.
+ *
+ * @param basePath Table base path
+ * @param metadata The metadata
+ * @param hadoopConf The filesystem
+ * @param tableType The table type
+ * @return the commit file status info grouping by specific ID
+ */
+ private static Map getFilesToReadOfInstant(
+ Path basePath,
+ HoodieCommitMetadata metadata,
+ Configuration hadoopConf,
+ HoodieTableType tableType) {
+ return getFilesToRead(hadoopConf, metadata, basePath.toString(), tableType).entrySet().stream()
+ .filter(entry -> StreamerUtil.isValidFile(entry.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
/**
* Returns the commit file status info with given metadata.
*
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index d4b23c3134875..0be2a5300f093 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -29,20 +30,24 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -117,7 +122,102 @@ public static Builder builder() {
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf) {
- return inputSplits(metaClient, hadoopConf, null);
+ HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ if (commitTimeline.empty()) {
+ LOG.warn("No splits found for the table under path " + path);
+ return Result.EMPTY;
+ }
+
+ final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
+ final String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
+ final boolean startFromEarliest = FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
+ final boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
+ final boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
+ boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
+
+ // Step1: find out the files to read, tries to read the files from the commit metadata first,
+ // fallback to full table scan if any of the following conditions matches:
+ // 1. there are files in metadata be deleted;
+ // 2. read from earliest
+ // 3. the start commit is archived
+ // 4. the end commit is archived
+ Set readPartitions;
+ final FileStatus[] fileStatuses;
+ List instants = filterInstantsWithRange(commitTimeline, null);
+ if (fullTableScan) {
+ // scans the partitions and files directly.
+ FileIndex fileIndex = getFileIndex();
+ readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading in user provided path.");
+ return Result.EMPTY;
+ }
+ fileStatuses = fileIndex.getFilesInPartitions();
+ } else {
+ if (instants.size() == 0) {
+ LOG.info("No new instant found for the table under path " + path + ", skip reading");
+ return Result.EMPTY;
+ }
+ String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+ List metadataList = instants.stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+ readPartitions = getReadPartitions(metadataList);
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading in user provided path.");
+ return Result.EMPTY;
+ }
+ FileStatus[] files = WriteProfiles.getRawWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
+ FileSystem fs = FSUtils.getFs(path.toString(), hadoopConf);
+ if (Arrays.stream(files).anyMatch(fileStatus -> !StreamerUtil.fileExists(fs, fileStatus.getPath()))) {
+ LOG.warn("Found deleted files in metadata, fall back to full table scan.");
+ // fallback to full table scan
+ fullTableScan = true;
+ // reading from the earliest, scans the partitions and files directly.
+ FileIndex fileIndex = getFileIndex();
+ readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading in user provided path.");
+ return Result.EMPTY;
+ }
+ fileStatuses = fileIndex.getFilesInPartitions();
+ } else {
+ fileStatuses = files;
+ }
+ }
+
+ if (fileStatuses.length == 0) {
+ LOG.warn("No files found for reading in user provided path.");
+ return Result.EMPTY;
+ }
+
+ // Step2: generates the instant range
+ // if the specified end commit is archived, still uses the specified timestamp,
+ // else uses the latest filtered instant time
+ // (would be the latest instant time if the specified end commit is greater than the latest instant time)
+ final String rangeEnd = endOutOfRange ? endCommit : instants.get(instants.size() - 1).getTimestamp();
+ // keep the same semantics with streaming read, default start from the latest commit
+ final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
+ final InstantRange instantRange;
+ if (!fullTableScan) {
+ instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+ } else if (startFromEarliest && endCommit == null) {
+ // short-cut for snapshot read
+ instantRange = null;
+ } else {
+ instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
+ }
+
+ // Step3: decides the read end commit
+ final String endInstant = fullTableScan
+ ? commitTimeline.lastInstant().get().getTimestamp()
+ : instants.get(instants.size() - 1).getTimestamp();
+
+ List inputSplits = getInputSplits(metaClient, commitTimeline,
+ fileStatuses, readPartitions, endInstant, instantRange);
+
+ return Result.instance(inputSplits, endInstant);
}
/**
@@ -146,18 +246,19 @@ public Result inputSplits(
if (issuedInstant != null) {
// the streaming reader may record the last issued instant, if the issued instant is present,
// the instant range should be: (issued instant, the latest instant].
- instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(),
- InstantRange.RangeType.OPEN_CLOSE);
+ instantRange = InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue.getTimestamp())
+ .rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
} else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
- : InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
+ : InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue.getTimestamp())
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
} else {
// first time consume and no start commit, consumes the latest incremental data set.
- instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
- InstantRange.RangeType.CLOSE_CLOSE);
+ instantRange = InstantRange.builder().startInstant(instantToIssue.getTimestamp()).endInstant(instantToIssue.getTimestamp())
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
@@ -166,18 +267,14 @@ public Result inputSplits(
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
- Set writePartitions;
+ Set readPartitions;
final FileStatus[] fileStatuses;
if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
- FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
- if (this.requiredPartitions != null) {
- // apply partition push down
- fileIndex.setPartitionPaths(this.requiredPartitions);
- }
- writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
- if (writePartitions.size() == 0) {
+ FileIndex fileIndex = getFileIndex();
+ readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
+ if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
@@ -198,13 +295,8 @@ public Result inputSplits(
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;
- writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
- // apply partition push down
- if (this.requiredPartitions != null) {
- writePartitions = writePartitions.stream()
- .filter(this.requiredPartitions::contains).collect(Collectors.toSet());
- }
- if (writePartitions.size() == 0) {
+ readPartitions = getReadPartitions(metadataList);
+ if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
@@ -216,11 +308,24 @@ public Result inputSplits(
return Result.EMPTY;
}
- HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final String endInstant = instantToIssue.getTimestamp();
+ List inputSplits = getInputSplits(metaClient, commitTimeline,
+ fileStatuses, readPartitions, endInstant, instantRange);
+
+ return Result.instance(inputSplits, endInstant);
+ }
+
+ private List getInputSplits(
+ HoodieTableMetaClient metaClient,
+ HoodieTimeline commitTimeline,
+ FileStatus[] fileStatuses,
+ Set readPartitions,
+ String endInstant,
+ InstantRange instantRange) {
+ final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
- List inputSplits = writePartitions.stream()
+ return readPartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(fileSlice -> {
Option> logPaths = Option.ofNullable(fileSlice.getLogFiles()
@@ -234,7 +339,32 @@ public Result inputSplits(
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
- return Result.instance(inputSplits, endInstant);
+ }
+
+ private FileIndex getFileIndex() {
+ FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
+ if (this.requiredPartitions != null) {
+ // apply partition push down
+ fileIndex.setPartitionPaths(this.requiredPartitions);
+ }
+ return fileIndex;
+ }
+
+ /**
+ * Returns the partitions to read with given metadata list.
+ * The partitions would be filtered by the pushed down required partitions.
+ *
+ * @param metadataList The metadata list
+ * @return the set of read partitions
+ */
+ private Set getReadPartitions(List metadataList) {
+ Set partitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
+ // apply partition push down
+ if (this.requiredPartitions != null) {
+ return partitions.stream()
+ .filter(this.requiredPartitions::contains).collect(Collectors.toSet());
+ }
+ return partitions;
}
/**
@@ -287,8 +417,7 @@ private List filterInstantsWithRange(
Stream instantStream = completedTimeline.getInstants();
- if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
- && !this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
+ if (OptionsResolver.isSpecificStartCommit(this.conf)) {
final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
instantStream = instantStream
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 0db960c32f56d..a8c5bd51ce931 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -514,4 +514,12 @@ public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolea
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
return schemaUtil.getTableAvroSchema(includeMetadataFields);
}
+
+ public static boolean fileExists(FileSystem fs, Path path) {
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ throw new HoodieException("Exception while checking file " + path + " existence", e);
+ }
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index a8104efb322f9..e8794b0d3b759 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -40,6 +40,7 @@
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
@@ -1120,6 +1121,39 @@ void testIncrementalRead(HoodieTableType tableType) throws Exception {
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception {
+ TableEnvironment tableEnv = batchTableEnv;
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+ conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 3);
+ conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 4);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 2);
+ conf.setString("hoodie.commits.archival.batch", "1");
+
+ // write 10 batches of data set
+ for (int i = 0; i < 20; i += 2) {
+ List dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+
+ String secondArchived = TestUtils.getNthArchivedInstant(tempFile.getAbsolutePath(), 1);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.READ_START_COMMIT, secondArchived)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ List result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10,
+ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 8d2f3585cd942..3bd00634f72b7 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -448,6 +450,104 @@ void testReadIncrementally(HoodieTableType tableType) throws Exception {
List actual4 = readData(inputFormat4);
final List expected4 = TestData.dataSetInsert(3, 4);
TestData.assertRowDataEquals(actual4, expected4);
+
+ // start and end commit: start commit out of range
+ conf.setString(FlinkOptions.READ_START_COMMIT, "000");
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat5 = this.tableSource.getInputFormat();
+ assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual5 = readData(inputFormat5);
+ final List expected5 = TestData.dataSetInsert(1, 2, 3, 4);
+ TestData.assertRowDataEquals(actual5, expected5);
+
+ // start and end commit: both are out of range
+ conf.setString(FlinkOptions.READ_START_COMMIT, "001");
+ conf.setString(FlinkOptions.READ_END_COMMIT, "002");
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat6 = this.tableSource.getInputFormat();
+ assertThat(inputFormat6, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual6 = readData(inputFormat6);
+ TestData.assertRowDataEquals(actual6, Collections.emptyList());
+ }
+
+ @Test
+ void testReadArchivedCommitsIncrementally() throws Exception {
+ Map options = new HashMap<>();
+ options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
+ options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
+ options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
+ options.put("hoodie.commits.archival.batch", "1");
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ // write 10 batches of data set
+ for (int i = 0; i < 20; i += 2) {
+ List dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+ // cleaning
+ HoodieFlinkWriteClient> writeClient = new HoodieFlinkWriteClient<>(
+ HoodieFlinkEngineContext.DEFAULT, StreamerUtil.getHoodieClientConfig(conf));
+ writeClient.clean();
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf));
+ List commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ assertThat(commits.size(), is(4));
+
+ List archivedCommits = metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
+ .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ assertThat(archivedCommits.size(), is(6));
+
+ // start and end commit: both are archived and cleaned
+ conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(0));
+ conf.setString(FlinkOptions.READ_END_COMMIT, archivedCommits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat1 = this.tableSource.getInputFormat();
+ assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual1 = readData(inputFormat1);
+ final List expected1 = TestData.dataSetInsert(1, 2, 3, 4);
+ TestData.assertRowDataEquals(actual1, expected1);
+
+ // only the start commit: is archived and cleaned
+ conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
+ conf.removeConfig(FlinkOptions.READ_END_COMMIT);
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat2 = this.tableSource.getInputFormat();
+ assertThat(inputFormat2, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual2 = readData(inputFormat2);
+ final List expected2 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10,
+ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+ TestData.assertRowDataEquals(actual2, expected2);
+
+ // only the end commit: is archived and cleaned
+ conf.removeConfig(FlinkOptions.READ_START_COMMIT);
+ conf.setString(FlinkOptions.READ_END_COMMIT, archivedCommits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat3 = this.tableSource.getInputFormat();
+ assertThat(inputFormat3, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual3 = readData(inputFormat3);
+ final List expected3 = TestData.dataSetInsert(3, 4);
+ TestData.assertRowDataEquals(actual3, expected3);
+
+ // start and end commit: start is archived and cleaned, end is active
+ conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(0));
+ this.tableSource = getTableSource(conf);
+ InputFormat inputFormat4 = this.tableSource.getInputFormat();
+ assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+
+ List actual4 = readData(inputFormat4);
+ final List expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ TestData.assertRowDataEquals(actual4, expected4);
}
@ParameterizedTest
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index d20c25866200f..4b3d87e387931 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -88,6 +88,14 @@ public static String getNthCompleteInstant(String basePath, int n, boolean isDel
.orElse(null);
}
+ @Nullable
+ public static String getNthArchivedInstant(String basePath, int n) {
+ final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();
+ return metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
+ .nthInstant(n).map(HoodieInstant::getTimestamp).orElse(null);
+ }
+
public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
assertTrue(split.getLogPaths().isPresent());
final String logPath = split.getLogPaths().get().get(0);