diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 4499df87e5791..acb4af61110fa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -100,4 +100,14 @@ public static boolean isDeltaTimeCompaction(Configuration conf) { public static boolean isPartitionedTable(Configuration conf) { return FilePathUtils.extractPartitionKeys(conf).length > 0; } + + /** + * Returns whether the source should emit changelog. + * + * @return true if the source is read as streaming with changelog mode enabled + */ + public static boolean emitChangelog(Configuration conf) { + return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index f0dbffd4732fe..62baac4cc6b2c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; import org.apache.hudi.source.FileIndex; @@ -196,11 +197,9 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) @Override public ChangelogMode getChangelogMode() { - return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED) - ? ChangelogModes.FULL - // when all the changes are persisted or read as batch, - // use INSERT mode. - : ChangelogMode.insertOnly(); + // when read as streaming and changelog mode is enabled, emit as FULL mode; + // when all the changes are compacted or read as batch, emit as INSERT mode. + return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly(); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 2bf5bd58edb1f..95c4bd4a5431b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; @@ -179,8 +180,7 @@ public void open(MergeOnReadInputSplit split) throws IOException { } } else if (!split.getBasePath().isPresent()) { // log files only - if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) - && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { + if (OptionsResolver.emitChangelog(conf)) { this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); } else { this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));