Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -65,13 +66,13 @@
*/
public class StreamWriteITCase extends TestLogger {

private static final Map<String, String> EXPECTED = new HashMap<>();
private static final Map<String, List<String>> EXPECTED = new HashMap<>();

static {
EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
}

@TempDir
Expand All @@ -85,6 +86,7 @@ public void testWriteToHoodie() throws Exception {
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// Read from file source
RowType rowType =
Expand Down Expand Up @@ -137,7 +139,7 @@ public void testWriteToHoodie() throws Exception {
}
}

TestData.checkWrittenData(tempFile, EXPECTED);
TestData.checkWrittenFullData(tempFile, EXPECTED);
}

@Test
Expand Down Expand Up @@ -215,6 +217,6 @@ public void testWriteToHoodieLegacy() throws Exception {
}
}

TestData.checkWrittenData(tempFile, EXPECTED);
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

package org.apache.hudi.operator.utils;

import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -49,6 +55,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Data set for testing, also some utilities to check the results. */
public class TestData {
Expand Down Expand Up @@ -105,7 +112,7 @@ public class TestData {
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
* @param baseFile The file base to check, should be a directly
* @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
*/
public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
Expand All @@ -117,7 +124,7 @@ public static void checkWrittenData(File baseFile, Map<String, String> expected)
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
* @param baseFile The file base to check, should be a directly
* @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
* @param partitions The expected partition number
*/
Expand Down Expand Up @@ -149,6 +156,51 @@ public static void checkWrittenData(
}
}

/**
* Checks the source data are written as expected.
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
* @param basePath The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
*/
public static void checkWrittenFullData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lamber-ken It seems this mechanism looks more reasonable. So, a question is this method can fully replace checkWrittenData?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lamber-ken It seems this mechanism looks more reasonable. So, a question is this method can fully replace checkWrittenData?

No, because the tests in StreamWriteFunctionTest can use TestData#checkWrittenData to check data for each checkpoint manually, like

image

File basePath,
Map<String, List<String>> expected) throws IOException {

// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);

// 2. check each partition data
expected.forEach((partition, partitionDataSet) -> {

List<String> readBuffer = new ArrayList<>();

table.getFileSystemView().getAllFileGroups(partition)
.forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
String path = baseFile.getPath();
try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
GenericRecord nextRecord = reader.read();
while (nextRecord != null) {
readBuffer.add(filterOutVariables(nextRecord));
nextRecord = reader.read();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer));

});

}

/**
* Filter out the variables like file name.
*/
Expand Down