diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 884e3a99cf4de..d9f3badf1fb8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -447,6 +447,28 @@ void testStreamWriteWithCleaning() { "some commits should be cleaned"); } + @Test + void testBatchWriteWithCleaning() { + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1) + .end(); + batchTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + execInsertSql(batchTableEnv, insertInto); + execInsertSql(batchTableEnv, insertInto); + execInsertSql(batchTableEnv, insertInto); + Configuration defaultConf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Map options1 = new HashMap<>(defaultConf.toMap()); + options1.put(FlinkOptions.TABLE_NAME.key(), "t1"); + Configuration conf = Configuration.fromMap(options1); + HoodieTimeline timeline = StreamerUtil.createMetaClient(conf).getActiveTimeline(); + assertTrue(timeline.filterCompletedInstants() + .getInstants().stream().anyMatch(instant -> instant.getAction().equals("clean")), + "some commits should be cleaned"); + } + @Test void testStreamReadWithDeletes() throws Exception { // create filesystem table named source