diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index d8b0dc0ce1899..a55eaa890ea0b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -482,12 +482,6 @@ private FlinkOptions() { // Compaction Options // ------------------------------------------------------------------------ - public static final ConfigOption COMPACTION_MEMORY_FRACTION_PROP = ConfigOptions - .key(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP) - .doubleType() - .defaultValue(0.1) - .withDescription("Compaction memory fraction of Task Manager managed memory size, default 0.1."); - public static final ConfigOption COMPACTION_SCHEDULE_ENABLED = ConfigOptions .key("compaction.schedule.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 41d3179988b52..4f3faadb92f01 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -109,10 +109,6 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; - @Parameter(names = {"--compaction-memory-fraction-prop"}, - description = "Compaction memory fraction of Task Manager managed memory size, default 0.1") - public double compactionMemoryFractionProp = 0.1; - /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties @@ -133,7 +129,6 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCo conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); - conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp); // use synchronous compaction always conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 0892fc5c4caa2..1d7111f495c58 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -250,10 +250,6 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB") public Long compactionTargetIo = 512000L; - @Parameter(names = {"--compaction-memory-fraction-prop"}, - description = "Compaction memory fraction of Task Manager managed memory size, default 0.1") - public double compactionMemoryFractionProp = 0.1; - @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") public Boolean cleanAsyncEnabled = true; @@ -388,7 +384,6 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); - conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled); conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 8861b43882c2c..73009c30f76e1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,8 +18,6 @@ package org.apache.hudi.util; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; @@ -516,14 +514,6 @@ public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) { * Returns the max compaction memory in bytes with given conf. */ public static long getMaxCompactionMemoryInBytes(Configuration conf) { - if (conf.contains(FlinkOptions.COMPACTION_MAX_MEMORY)) { - return conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; - } - return (long)Math - .ceil(conf.getDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP) - * TaskExecutorProcessUtils.processSpecFromConfig( - TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( - conf, TaskManagerOptions.TOTAL_PROCESS_MEMORY)) - .getManagedMemorySize().getBytes()); + return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index b3b76d0d7ba6b..a76e00816189a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -83,7 +83,6 @@ void beforeEach() throws IOException { this.conf = new Configuration(); this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath()); this.conf.setString(FlinkOptions.TABLE_NAME, "t1"); - this.conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); StreamerUtil.initTableIfNotExists(this.conf); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index c41040a56c115..8ee18a9601b2f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -64,7 +64,6 @@ public class TestHoodieTableSource { void beforeEach() throws Exception { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); - conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); TestData.writeData(TestData.DATA_SET_INSERT, conf); } @@ -123,7 +122,6 @@ void testGetTableAvroSchema() { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); - conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); HoodieTableSource tableSource = new HoodieTableSource( TestConfigurations.TABLE_SCHEMA, diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 1e6029a2dd638..6fbbab81fa4a6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -73,7 +73,6 @@ void beforeEach(HoodieTableType tableType, Map options) throws I conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction - conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); options.forEach((key, value) -> conf.setString(key, value)); StreamerUtil.initTableIfNotExists(conf);