diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 55806bf1e023b..767a16f0c0353 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -516,7 +516,7 @@ public void testCreateNewInstantTime() throws Exception { } executorService.shutdown(); - assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS)); // required to catch exceptions for (Future f : futures) { f.get(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index f2c0500f9555c..aa31d859bbc4c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -38,6 +38,31 @@ protected void setUp(Configuration conf) { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } + @Test + public void testIndexStateBootstrapWithMultiFilesInOneSlice() throws Exception { + // open the function and ingest data + preparePipeline(conf) + .consume(TestData.filterOddRows(TestData.DATA_SET_INSERT)) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT)) + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(EXPECTED1, 4) + // write another commit but does not complete it + .consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT)) + .checkpoint(3) + .assertNextEvent() + .end(); + + // reset the config option + conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + validateIndexLoaded(); + } + @Test public void testIndexStateBootstrapWithCompactionScheduled() throws Exception { // sets up the delta commits as 1 to generate a new compaction plan. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index c31c2bbadae25..d03e1b2eb5276 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -64,6 +64,7 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -304,6 +305,24 @@ public static List dataSetInsert(int... ids) { return inserts; } + public static List filterOddRows(List rows) { + return filterRowsByIndexPredicate(rows, i -> i % 2 != 0); + } + + public static List filterEvenRows(List rows) { + return filterRowsByIndexPredicate(rows, i -> i % 2 == 0); + } + + private static List filterRowsByIndexPredicate(List rows, Predicate predicate) { + List filtered = new ArrayList<>(); + for (int i = 0; i < rows.size(); i++) { + if (predicate.test(i)) { + filtered.add(rows.get(i)); + } + } + return filtered; + } + private static Integer toIdSafely(Object id) { if (id == null) { return -1;