diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index 1942b1ce29e71..b451c36418647 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -76,12 +76,10 @@ public HoodieFlinkClusteringJob(AsyncClusteringService service) { } public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - FlinkClusteringConfig cfg = getFlinkClusteringConfig(args); Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env); + AsyncClusteringService service = new AsyncClusteringService(cfg, conf); new HoodieFlinkClusteringJob(service).start(cfg.serviceMode); } @@ -165,20 +163,14 @@ public static class AsyncClusteringService extends HoodieAsyncTableService { */ private final HoodieFlinkTable table; - /** - * Flink Execution Environment. - */ - private final StreamExecutionEnvironment env; - /** * Executor Service. */ private final ExecutorService executor; - public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf) throws Exception { this.cfg = cfg; this.conf = conf; - this.env = env; this.executor = Executors.newFixedThreadPool(1); // create metaClient @@ -338,6 +330,8 @@ private void cluster() throws Exception { final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // setup configuration long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 1475a493c1a38..ea1fbdcc5dfcf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -69,12 +69,10 @@ public HoodieFlinkCompactor(AsyncCompactionService service) { } public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - FlinkCompactionConfig cfg = getFlinkCompactionConfig(args); Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); - AsyncCompactionService service = new AsyncCompactionService(cfg, conf, env); + AsyncCompactionService service = new AsyncCompactionService(cfg, conf); new HoodieFlinkCompactor(service).start(cfg.serviceMode); } @@ -157,20 +155,14 @@ public static class AsyncCompactionService extends HoodieAsyncTableService { */ private final HoodieFlinkTable table; - /** - * Flink Execution Environment. - */ - private final StreamExecutionEnvironment env; - /** * Executor Service. */ private final ExecutorService executor; - public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf) throws Exception { this.cfg = cfg; this.conf = conf; - this.env = env; this.executor = Executors.newFixedThreadPool(1); // create metaClient @@ -304,6 +296,7 @@ private void compact() throws Exception { } table.getMetaClient().reloadActiveTimeline(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new CompactionPlanSourceFunction(compactionPlans)) .name("compaction_source") .uid("uid_compaction_source") diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index f2273e40a26db..29f280e61208b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -208,14 +208,13 @@ public void testHoodieFlinkClusteringService() throws Exception { TimeUnit.SECONDS.sleep(3); // Make configuration and setAvroSchema. - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkClusteringConfig cfg = new FlinkClusteringConfig(); cfg.path = tempFile.getAbsolutePath(); cfg.minClusteringIntervalSeconds = 3; cfg.schedule = true; Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env); + HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf); asyncClusteringService.start(null); // wait for the asynchronous commit to finish diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 6157b5e901130..0ad78890aad37 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -191,7 +191,6 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); // Make configuration and setAvroSchema. - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkCompactionConfig cfg = new FlinkCompactionConfig(); cfg.path = tempFile.getAbsolutePath(); cfg.minCompactionIntervalSeconds = 3; @@ -200,7 +199,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), FlinkMiniCluster.DEFAULT_PARALLELISM); - HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env); + HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf); asyncCompactionService.start(null); // wait for the asynchronous commit to finish