diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 4f3faadb92f0..651385955657 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.configuration.FlinkOptions; import com.beust.jcommander.Parameter; @@ -109,6 +110,9 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; + @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false) + public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue(); + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties @@ -132,6 +136,8 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCo // use synchronous compaction always conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule); + // Map memory + conf.setString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), config.spillableMapPath); return conf; }