diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 55abeaaa56c2..0bbecbd26cc9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -280,6 +280,14 @@ private FlinkOptions() { + "usually with delta time compaction strategy that is long enough, for e.g, one week;\n" + "2) changelog mode is enabled, this option is a solution to keep data integrity"); + // this option is experimental + public static final ConfigOption READ_STREAMING_SKIP_CLUSTERING = ConfigOptions + .key("read.streaming.skip_clustering") + .booleanType() + .defaultValue(true) + .withDescription("Whether to skip clustering instants for streaming read,\n" + + "to avoid reading duplicates"); + public static final String START_COMMIT_EARLIEST = "earliest"; public static final ConfigOption READ_START_COMMIT = ConfigOptions .key("read.start-commit") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 09f7054cd777..aa0f4e1906e1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -39,6 +39,7 @@ import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.format.cdc.CdcInputSplit; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -95,6 +96,8 @@ public class IncrementalInputSplits implements Serializable { private final Set requiredPartitions; // skip compaction private final boolean skipCompaction; + // skip clustering + private final boolean skipClustering; private IncrementalInputSplits( Configuration conf, @@ -102,13 +105,15 @@ private IncrementalInputSplits( RowType rowType, long maxCompactionMemoryInBytes, @Nullable Set requiredPartitions, - boolean skipCompaction) { + boolean skipCompaction, + boolean skipClustering) { this.conf = conf; this.path = path; this.rowType = rowType; this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.requiredPartitions = requiredPartitions; this.skipCompaction = skipCompaction; + this.skipClustering = skipClustering; } /** @@ -446,7 +451,7 @@ private List getArchivedMetadata( HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); if (!archivedCompleteTimeline.empty()) { Stream instantStream = archivedCompleteTimeline.getInstants(); - return maySkipCompaction(instantStream) + return filterInstantsByCondition(instantStream, archivedTimeline) .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); } } @@ -466,7 +471,7 @@ private List filterInstantsWithRange( HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); if (issuedInstant != null) { // returns early for streaming mode - return maySkipCompaction(completedTimeline.getInstants()) + return filterInstantsByCondition(completedTimeline.getInstants(), commitTimeline) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); } @@ -482,13 +487,20 @@ private List filterInstantsWithRange( final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT); instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit)); } - return maySkipCompaction(instantStream).collect(Collectors.toList()); + return filterInstantsByCondition(instantStream, commitTimeline).collect(Collectors.toList()); } - private Stream maySkipCompaction(Stream instants) { - return this.skipCompaction - ? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) - : instants; + /** + * Filters out the unnecessary instants by user specified condition. + * + * @param instants The instants to filter + * @param timeline The timeline + * + * @return the filtered instants + */ + private Stream filterInstantsByCondition(Stream instants, HoodieTimeline timeline) { + return instants.filter(instant -> !this.skipCompaction || !instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) + .filter(instant -> !this.skipClustering || !ClusteringUtil.isClusteringInstant(instant, timeline)); } private static List mergeList(List list1, List list2) { @@ -544,6 +556,8 @@ public static class Builder { private Set requiredPartitions; // skip compaction private boolean skipCompaction = false; + // skip clustering + private boolean skipClustering = true; public Builder() { } @@ -578,10 +592,15 @@ public Builder skipCompaction(boolean skipCompaction) { return this; } + public Builder skipClustering(boolean skipClustering) { + this.skipClustering = skipClustering; + return this; + } + public IncrementalInputSplits build() { return new IncrementalInputSplits( Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), - this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction); + this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction, this.skipClustering); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 2ad312241e1a..c8757bf10f37 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -119,6 +119,7 @@ public StreamReadMonitoringFunction( .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) .requiredPartitions(requiredPartitionPaths) .skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)) + .skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java index 580dbacc4d34..e6ce6d4800e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java @@ -19,16 +19,21 @@ package org.apache.hudi.util; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -77,4 +82,18 @@ public static void rollbackClustering(HoodieFlinkTable table, HoodieFlinkWrit table.getMetaClient().reloadActiveTimeline(); }); } + + /** + * Returns whether the given instant {@code instant} is with clustering operation. + */ + public static boolean isClusteringInstant(HoodieInstant instant, HoodieTimeline timeline) { + if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return false; + } + try { + return TimelineUtils.getCommitMetadata(instant, timeline).getOperationType().equals(WriteOperationType.CLUSTER); + } catch (IOException e) { + throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, e); + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 4a43789ce198..f21b120c7d45 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -323,6 +323,35 @@ void testStreamWriteReadSkippingCompaction() throws Exception { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } + @Test + void testStreamWriteReadSkippingClustering() throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source", 4); + streamTableEnv.executeSql(createSource); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true) + .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true) + .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true) + .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1) + .option(FlinkOptions.CLUSTERING_TASKS, 1) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + String instant = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true); + + streamTableEnv.getConfig().getConfiguration() + .setBoolean("table.dynamic-table-options.enabled", true); + final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant); + List rows = execSelectSql(streamTableEnv, query, 10); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); + } + @Test void testStreamWriteWithCleaning() { // create filesystem table named source