diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 0984296ee54ef..5ff7a59cd2588 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -579,6 +579,14 @@ private FlinkOptions() { .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 30"); + public static final ConfigOption CLEAN_RETAIN_HOURS = ConfigOptions + .key("clean.retain_hours") + .intType() + .defaultValue(24)// default 24 hours + .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as" + + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," + + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); + public static final ConfigOption CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions .key("clean.retain_file_versions") .intType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3a69e5d080903..3ba1c6230ffd3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -272,6 +272,12 @@ public class FlinkStreamerConfig extends Configuration { + "This also directly translates into how much you can incrementally pull on this table, default 10") public Integer cleanRetainCommits = 10; + @Parameter(names = {"--clean-retain-hours"}, + description = "Number of hours for which commits need to be retained. This config provides a more flexible option as" + + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," + + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned. default 24") + public Integer cleanRetainHours = 24; + @Parameter(names = {"--clean-retain-file-versions"}, description = "Number of file versions to retain. Each file group will be retained for this number of version. default 5") public Integer cleanRetainFileVersions = 5; @@ -405,6 +411,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled); conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy); conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); + conf.setInteger(FlinkOptions.CLEAN_RETAIN_HOURS, config.cleanRetainHours); conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, config.cleanRetainFileVersions); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4e819ecd7b93a..5fb5162725b97 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -187,6 +187,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withCleanConfig(HoodieCleanConfig.newBuilder() .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) + .cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS)) .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) // override and hardcode to 20, // actually Flink cleaning is always with parallelism 1 now