diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2eeb8f58b82a2..753ced46a79f0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -146,8 +146,8 @@ private static void setupConfOptions( conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); // hoodie key about options setupHoodieKeyOptions(conf, table); - // cleaning options - setupCleaningOptions(conf); + // compaction options + setupCompactionOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); } @@ -186,9 +186,9 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table } /** - * Sets up the cleaning options from the table definition. + * Sets up the compaction options from the table definition. */ - private static void setupCleaningOptions(Configuration conf) { + private static void setupCompactionOptions(Configuration conf) { int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS); int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS); if (commitsToRetain >= minCommitsToKeep) { @@ -199,6 +199,12 @@ private static void setupCleaningOptions(Configuration conf) { conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); } + if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) + && !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) + && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) { + // if compaction schedule is on, tweak the target io to 500GB + conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L); + } } /**