diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index f745a3c89fcb9..be8ec36cbe503 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -54,6 +54,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,13 +66,13 @@ */ public class StreamWriteITCase extends TestLogger { - private static final Map EXPECTED = new HashMap<>(); + private static final Map> EXPECTED = new HashMap<>(); static { - EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]"); - EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]"); - EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]"); - EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]"); + EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); + EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); + EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); + EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); } @TempDir @@ -85,6 +86,7 @@ public void testWriteToHoodie() throws Exception { execEnv.setParallelism(4); // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Read from file source RowType rowType = @@ -137,7 +139,7 @@ public void testWriteToHoodie() throws Exception { } } - TestData.checkWrittenData(tempFile, EXPECTED); + TestData.checkWrittenFullData(tempFile, EXPECTED); } @Test @@ -215,6 +217,6 @@ public void testWriteToHoodieLegacy() throws Exception { } } - TestData.checkWrittenData(tempFile, EXPECTED); + TestData.checkWrittenFullData(tempFile, EXPECTED); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index b4c24ef695311..5b4131c993090 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -18,7 +18,13 @@ package org.apache.hudi.operator.utils; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkTable; import org.apache.avro.generic.GenericRecord; import org.apache.flink.table.data.RowData; @@ -49,6 +55,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Data set for testing, also some utilities to check the results. */ public class TestData { @@ -105,7 +112,7 @@ public class TestData { * *

Note: Replace it with the Flink reader when it is supported. * - * @param baseFile The file base to check, should be a directly + * @param baseFile The file base to check, should be a directory * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenData(File baseFile, Map expected) throws IOException { @@ -117,7 +124,7 @@ public static void checkWrittenData(File baseFile, Map expected) * *

Note: Replace it with the Flink reader when it is supported. * - * @param baseFile The file base to check, should be a directly + * @param baseFile The file base to check, should be a directory * @param expected The expected results mapping, the key should be the partition path * @param partitions The expected partition number */ @@ -149,6 +156,51 @@ public static void checkWrittenData( } } + /** + * Checks the source data are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + */ + public static void checkWrittenFullData( + File basePath, + Map> expected) throws IOException { + + // 1. init flink table + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); + FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null); + HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier); + HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient); + + // 2. check each partition data + expected.forEach((partition, partitionDataSet) -> { + + List readBuffer = new ArrayList<>(); + + table.getFileSystemView().getAllFileGroups(partition) + .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> { + String path = baseFile.getPath(); + try { + ParquetReader reader = AvroParquetReader.builder(new Path(path)).build(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer)); + + }); + + } + /** * Filter out the variables like file name. */