Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@ public HoodieFlinkClusteringJob(AsyncClusteringService service) {
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkClusteringConfig cfg = getFlinkClusteringConfig(args);
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);

AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env);
AsyncClusteringService service = new AsyncClusteringService(cfg, conf);

new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
}
Expand Down Expand Up @@ -165,20 +163,14 @@ public static class AsyncClusteringService extends HoodieAsyncTableService {
*/
private final HoodieFlinkTable<?> table;

/**
* Flink Execution Environment.
*/
private final StreamExecutionEnvironment env;

/**
* Executor Service.
*/
private final ExecutorService executor;

public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf) throws Exception {
this.cfg = cfg;
this.conf = conf;
this.env = env;
this.executor = Executors.newFixedThreadPool(1);

// create metaClient
Expand Down Expand Up @@ -338,6 +330,8 @@ private void cluster() throws Exception {
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// setup configuration
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,10 @@ public HoodieFlinkCompactor(AsyncCompactionService service) {
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);

AsyncCompactionService service = new AsyncCompactionService(cfg, conf, env);
AsyncCompactionService service = new AsyncCompactionService(cfg, conf);

new HoodieFlinkCompactor(service).start(cfg.serviceMode);
}
Expand Down Expand Up @@ -157,20 +155,14 @@ public static class AsyncCompactionService extends HoodieAsyncTableService {
*/
private final HoodieFlinkTable<?> table;

/**
* Flink Execution Environment.
*/
private final StreamExecutionEnvironment env;

/**
* Executor Service.
*/
private final ExecutorService executor;

public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf) throws Exception {
this.cfg = cfg;
this.conf = conf;
this.env = env;
this.executor = Executors.newFixedThreadPool(1);

// create metaClient
Expand Down Expand Up @@ -304,6 +296,7 @@ private void compact() throws Exception {
}
table.getMetaClient().reloadActiveTimeline();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,13 @@ public void testHoodieFlinkClusteringService() throws Exception {
TimeUnit.SECONDS.sleep(3);

// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minClusteringIntervalSeconds = 3;
cfg.schedule = true;
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);

HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env);
HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
asyncClusteringService.start(null);

// wait for the asynchronous commit to finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();

// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minCompactionIntervalSeconds = 3;
Expand All @@ -200,7 +199,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), FlinkMiniCluster.DEFAULT_PARALLELISM);

HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf);
asyncCompactionService.start(null);

// wait for the asynchronous commit to finish
Expand Down