diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index d5ca307a00cd6..dee3edfc578c7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -439,6 +439,11 @@ private void handleEndInputEvent(WriteMetadataEvent event) { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // sync Hive synchronously if it is enabled in batch mode. syncHive(); + // schedules the compaction plan in batch execution mode + if (tableState.scheduleCompaction) { + // if async compaction is on, schedule the compaction + CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, true); + } } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index d5e718883b86c..efaf4bfdc75b0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -47,7 +48,7 @@ *

It should be singleton to avoid conflicts. */ public class CompactionPlanOperator extends AbstractStreamOperator - implements OneInputStreamOperator { + implements OneInputStreamOperator, BoundedOneInput { /** * Config options. @@ -141,4 +142,10 @@ private void scheduleCompaction(HoodieFlinkTable table, long checkpointId) th public void setOutput(Output> output) { this.output = output; } + + @Override + public void endInput() throws Exception { + // Called when the input data ends, only used in batch mode. + notifyCheckpointComplete(-1); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d579027911a58..5af86867d86d6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -95,6 +95,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction if (OptionsResolver.needsAsyncCompaction(conf)) { + // use synchronous compaction for bounded source. + if (context.isBounded()) { + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + } return Pipelines.compact(conf, pipeline); } else { return Pipelines.clean(conf, pipeline); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 5c763e055fce5..bbe9a16a100bf 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -42,6 +42,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -288,6 +289,23 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel TestData.checkWrittenDataCOW(tempFile, EXPECTED3); } + @Test + public void testCompactionInBatchExecutionMode() throws Exception { + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); + TestData.checkWrittenDataCOW(tempFile, EXPECTED2); + } + private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient writeClient) { boolean scheduled = false; // judge whether there are any compaction operations.